No change in logic, just mark sequences as sync only

objects_with_metadata_sync
Onder Kalaci 2021-08-02 12:58:00 +02:00
parent ef6a8604ba
commit 6ccbdbd2f5
5 changed files with 58 additions and 1 deletions

View File

@ -670,6 +670,7 @@ MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
MarkObjectDistributed(&sequenceAddress);
UpdateMetadataSyncedOnlyForDistObject(&sequenceAddress);
}

View File

@ -1270,6 +1270,57 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid)
}
/*
* TODO: make the column enum, not bool for future extensibility.
* TODO: probably better to move distobject.c
*/
void
UpdateMetadataSyncedOnlyForDistObject(const ObjectAddress *distAddress, bool metadataSync)
{
ScanKeyData pgDistObjectKey[3];
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
TupleDesc pgDistObjectTupleDesc = RelationGetDescr(pgDistObjectRel);
ScanKeyInit(&pgDistObjectKey[0], Anum_pg_dist_object_classid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distAddress->classId));
ScanKeyInit(&pgDistObjectKey[1], Anum_pg_dist_object_objid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distAddress->objectId));
ScanKeyInit(&pgDistObjectKey[2], Anum_pg_dist_object_objsubid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(distAddress->objectSubId));
SysScanDesc pgDistObjectScan = systable_beginscan(pgDistObjectRel,
DistObjectPrimaryKeyIndexId(),
true, NULL, 3, pgDistObjectKey);
HeapTuple pgDistObjectTup = systable_getnext(pgDistObjectScan);
if (HeapTupleIsValid(pgDistObjectTup))
{
Datum values[Natts_pg_dist_object];
bool isnull[Natts_pg_dist_object];
bool replace[Natts_pg_dist_object];
memset(replace, 0, sizeof(replace));
replace[Anum_pg_dist_object_sync_metadata_node - 1] = true;
/* update the colocationId to the new one */
values[Anum_pg_dist_object_sync_metadata_node - 1] = BoolGetDatum(metadataSync);
isnull[Anum_pg_dist_object_sync_metadata_node - 1] = false;
pgDistObjectTup = heap_modify_tuple(pgDistObjectTup, pgDistObjectTupleDesc,
values, isnull,
replace);
CatalogTupleUpdate(pgDistObjectRel, &pgDistObjectTup->t_self, pgDistObjectTup);
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
}
systable_endscan(pgDistObjectScan);
table_close(pgDistObjectRel, NoLock);
CommandCounterIncrement();
}
/*
* BuildCitusTableCacheEntry is a helper routine for
* LookupCitusTableCacheEntry() for building the cache contents.

View File

@ -8,6 +8,8 @@ GRANT ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regc
-- the same shard cannot have placements on different nodes
ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupid_unique_index UNIQUE (shardid, groupid);
ALTER TABLE citus.pg_dist_object ADD COLUMN sync_to_metadata_nodes bool;
#include "udfs/stop_metadata_sync_to_node/10.2-1.sql"
#include "../../columnar/sql/columnar--10.1-1--10.2-1.sql"
#include "udfs/citus_internal_add_partition_metadata/10.2-1.sql";

View File

@ -49,7 +49,7 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
* compiler constants for pg_dist_object
* ----------------
*/
#define Natts_pg_dist_object 8
#define Natts_pg_dist_object 9
#define Anum_pg_dist_object_classid 1
#define Anum_pg_dist_object_objid 2
#define Anum_pg_dist_object_objsubid 3
@ -58,5 +58,6 @@ typedef FormData_pg_dist_object *Form_pg_dist_object;
#define Anum_pg_dist_object_object_args 6
#define Anum_pg_dist_object_distribution_argument_index 7
#define Anum_pg_dist_object_colocationid 8
#define Anum_pg_dist_object_sync_metadata_node 9
#endif /* PG_DIST_OBJECT_H */

View File

@ -160,6 +160,8 @@ extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId)
extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid);
extern void UpdateMetadataSyncedOnlyForDistObject(const ObjectAddress *distAddress,
bool metadataSync);
extern int32 GetLocalGroupId(void);
extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);