From 6ccbdbd2f551a2be4ddff73522439a42e8fe6a8a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 2 Aug 2021 12:58:00 +0200 Subject: [PATCH] No change in logic, just mark sequences as sync only --- .../commands/create_distributed_table.c | 1 + .../distributed/metadata/metadata_cache.c | 51 +++++++++++++++++++ .../distributed/sql/citus--10.1-1--10.2-1.sql | 2 + .../distributed/metadata/pg_dist_object.h | 3 +- src/include/distributed/metadata_cache.h | 2 + 5 files changed, 58 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 111181a0c..dc63e036b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -670,6 +670,7 @@ MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid) ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); EnsureDependenciesExistOnAllNodes(&sequenceAddress); MarkObjectDistributed(&sequenceAddress); + UpdateMetadataSyncedOnlyForDistObject(&sequenceAddress); } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index fa01aa42a..6b2a8556c 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -1270,6 +1270,57 @@ LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid) } +/* + * TODO: make the column enum, not bool for future extensibility. + * TODO: probably better to move distobject.c + */ +void +UpdateMetadataSyncedOnlyForDistObject(const ObjectAddress *distAddress, bool metadataSync) +{ + ScanKeyData pgDistObjectKey[3]; + Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock); + TupleDesc pgDistObjectTupleDesc = RelationGetDescr(pgDistObjectRel); + + ScanKeyInit(&pgDistObjectKey[0], Anum_pg_dist_object_classid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distAddress->classId)); + ScanKeyInit(&pgDistObjectKey[1], Anum_pg_dist_object_objid, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distAddress->objectId)); + ScanKeyInit(&pgDistObjectKey[2], Anum_pg_dist_object_objsubid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(distAddress->objectSubId)); + + SysScanDesc pgDistObjectScan = systable_beginscan(pgDistObjectRel, + DistObjectPrimaryKeyIndexId(), + true, NULL, 3, pgDistObjectKey); + HeapTuple pgDistObjectTup = systable_getnext(pgDistObjectScan); + + if (HeapTupleIsValid(pgDistObjectTup)) + { + Datum values[Natts_pg_dist_object]; + bool isnull[Natts_pg_dist_object]; + bool replace[Natts_pg_dist_object]; + memset(replace, 0, sizeof(replace)); + + replace[Anum_pg_dist_object_sync_metadata_node - 1] = true; + + /* update the colocationId to the new one */ + values[Anum_pg_dist_object_sync_metadata_node - 1] = BoolGetDatum(metadataSync); + + isnull[Anum_pg_dist_object_sync_metadata_node - 1] = false; + + pgDistObjectTup = heap_modify_tuple(pgDistObjectTup, pgDistObjectTupleDesc, + values, isnull, + replace); + + CatalogTupleUpdate(pgDistObjectRel, &pgDistObjectTup->t_self, pgDistObjectTup); + CitusInvalidateRelcacheByRelid(DistObjectRelationId()); + } + + systable_endscan(pgDistObjectScan); + table_close(pgDistObjectRel, NoLock); + CommandCounterIncrement(); +} + + /* * BuildCitusTableCacheEntry is a helper routine for * LookupCitusTableCacheEntry() for building the cache contents. diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index b56a3296f..50e6b8a3f 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -8,6 +8,8 @@ GRANT ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regc -- the same shard cannot have placements on different nodes ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupid_unique_index UNIQUE (shardid, groupid); +ALTER TABLE citus.pg_dist_object ADD COLUMN sync_to_metadata_nodes bool; + #include "udfs/stop_metadata_sync_to_node/10.2-1.sql" #include "../../columnar/sql/columnar--10.1-1--10.2-1.sql" #include "udfs/citus_internal_add_partition_metadata/10.2-1.sql"; diff --git a/src/include/distributed/metadata/pg_dist_object.h b/src/include/distributed/metadata/pg_dist_object.h index 735e20819..1b9e0a45d 100644 --- a/src/include/distributed/metadata/pg_dist_object.h +++ b/src/include/distributed/metadata/pg_dist_object.h @@ -49,7 +49,7 @@ typedef FormData_pg_dist_object *Form_pg_dist_object; * compiler constants for pg_dist_object * ---------------- */ -#define Natts_pg_dist_object 8 +#define Natts_pg_dist_object 9 #define Anum_pg_dist_object_classid 1 #define Anum_pg_dist_object_objid 2 #define Anum_pg_dist_object_objsubid 3 @@ -58,5 +58,6 @@ typedef FormData_pg_dist_object *Form_pg_dist_object; #define Anum_pg_dist_object_object_args 6 #define Anum_pg_dist_object_distribution_argument_index 7 #define Anum_pg_dist_object_colocationid 8 +#define Anum_pg_dist_object_sync_metadata_node 9 #endif /* PG_DIST_OBJECT_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 519057ddd..888e36e0f 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -160,6 +160,8 @@ extern CitusTableCacheEntry * GetCitusTableCacheEntry(Oid distributedRelationId) extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid); +extern void UpdateMetadataSyncedOnlyForDistObject(const ObjectAddress *distAddress, + bool metadataSync); extern int32 GetLocalGroupId(void); extern void CitusTableCacheFlushInvalidatedEntries(void); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);