From 29ef9117e62cd63b31e29100fd5f063280f5df75 Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Wed, 22 Mar 2023 05:09:09 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=204?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add new metadata sync methods which uses MemorySyncContext api so that during the sync we can - free memory to prevent OOM, - use either transactional or nontransactional modes according to the GUC . --- .../distributed/commands/dependencies.c | 5 +- .../distributed/metadata/metadata_sync.c | 594 ++++++++++++++++-- src/include/distributed/metadata/dependency.h | 4 + src/include/distributed/metadata_sync.h | 16 +- .../expected/upgrade_post_11_after.out | 6 +- .../regress/sql/upgrade_post_11_after.sql | 2 +- 6 files changed, 570 insertions(+), 57 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 01653c3c8..af84d2eb5 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -29,13 +29,10 @@ #include "storage/lmgr.h" #include "utils/lsyscache.h" -typedef bool (*AddressPredicate)(const ObjectAddress *); static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress); static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress); static int ObjectAddressComparator(const void *a, const void *b); -static List * FilterObjectAddressListByPredicate(List *objectAddressList, - AddressPredicate predicate); static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static bool ShouldPropagateObject(const ObjectAddress *address); @@ -749,7 +746,7 @@ ShouldPropagateAnyObject(List *addresses) * FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list * only containing the ObjectAddress *'s for which the predicate returned true. */ -static List * +List * FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate) { List *result = NIL; diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 78ba24e97..0b126f13f 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -613,6 +613,25 @@ ShouldSyncTableMetadataViaCatalog(Oid relationId) } +/* + * FetchRelationIdFromPgPartitionHeapTuple returns relation id from given heap tuple. + */ +Oid +FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc) +{ + Assert(heapTuple->t_tableOid == DistPartitionRelationId()); + + bool isNullArray[Natts_pg_dist_partition]; + Datum datumArray[Natts_pg_dist_partition]; + heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); + + Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; + Oid relationId = DatumGetObjectId(relationIdDatum); + + return relationId; +} + + /* * ShouldSyncTableMetadataInternal decides whether we should sync the metadata for a table * based on whether it is a hash distributed table, or a citus table with no distribution @@ -4218,36 +4237,7 @@ EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context) bareConnectionList = lappend(bareConnectionList, connection); } - context->activatedWorkerConnections = bareConnectionList; -} - - -/* - * EstablishAndSetMetadataSyncCoordinatedConnections establishes and sets - * connections used throughout transactional metadata sync. - */ -void -EstablishAndSetMetadataSyncCoordinatedConnections(MetadataSyncContext *context) -{ - Assert(MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL); - - int connectionFlags = REQUIRE_METADATA_CONNECTION; - - /* establish coordinated connections to activated worker nodes */ - List *coordinatedConnectionList = NIL; - WorkerNode *node = NULL; - foreach_ptr(node, context->activatedWorkerNodeList) - { - MultiConnection *connection = - StartNodeConnection(connectionFlags, node->workerName, node->workerPort); - - MarkRemoteTransactionCritical(connection); - - Assert(connection != NULL); - coordinatedConnectionList = lappend(coordinatedConnectionList, connection); - } - - context->activatedWorkerConnections = coordinatedConnectionList; + context->activatedWorkerBareConnections = bareConnectionList; } @@ -4277,12 +4267,11 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands) /* filter the nodes that needs to be activated from given node list */ SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList); - /* establish connections */ - if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL) - { - EstablishAndSetMetadataSyncCoordinatedConnections(metadataSyncContext); - } - else if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) + /* + * establish connections only for nontransactional mode to prevent connection + * open-close for each command + */ + if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) { EstablishAndSetMetadataSyncBareConnections(metadataSyncContext); } @@ -4363,8 +4352,7 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com */ if (MetadataSyncCollectsCommands(context)) { - context->collectedCommands = list_concat(context->collectedCommands, - commands); + context->collectedCommands = list_concat(context->collectedCommands, commands); return; } @@ -4406,8 +4394,7 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm */ if (MetadataSyncCollectsCommands(context)) { - context->collectedCommands = list_concat(context->collectedCommands, - commands); + context->collectedCommands = list_concat(context->collectedCommands, commands); return; } @@ -4450,18 +4437,13 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command */ if (MetadataSyncCollectsCommands(context)) { - context->collectedCommands = list_concat(context->collectedCommands, - commands); + context->collectedCommands = list_concat(context->collectedCommands, commands); return; } /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - List *workerConnections = context->activatedWorkerConnections; - Assert(nodeIdx < list_length(workerConnections)); - MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx); - if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *workerNodes = context->activatedWorkerNodeList; @@ -4502,3 +4484,523 @@ WorkerDropAllShellTablesCommand(bool singleTransaction) singleTransactionString); return removeAllShellTablesCommand->data; } + + +/* + * PropagateNodeWideObjectsCommandList is called during node activation to + * propagate any object that should be propagated for every node. These are + * generally not linked to any distributed object but change system wide behaviour. + */ +static List * +PropagateNodeWideObjectsCommandList(void) +{ + /* collect all commands */ + List *ddlCommands = NIL; + + if (EnableAlterRoleSetPropagation) + { + /* + * Get commands for database and postgres wide settings. Since these settings are not + * linked to any role that can be distributed we need to distribute them seperately + */ + List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); + ddlCommands = list_concat(ddlCommands, alterRoleSetCommands); + } + + return ddlCommands; +} + + +/* + * SyncDistributedObjects sync the distributed objects to the nodes in metadataSyncContext + * with transactional or nontransactional mode according to transactionMode inside + * metadataSyncContext. + * + * Transactions should be ordered like below: + * - Nodewide objects (only roles for now), + * - Deletion of sequence and shell tables and metadata entries + * - All dependencies (e.g., types, schemas, sequences) and all shell distributed + * table and their pg_dist_xx metadata entries + * - Inter relation between those shell tables + * + * Note that we do not create the distributed dependencies on the coordinator + * since all the dependencies should be present in the coordinator already. + */ +void +SyncDistributedObjects(MetadataSyncContext *context) +{ + if (context->activatedWorkerNodeList == NIL) + { + return; + } + + EnsureSequentialModeMetadataOperations(); + + Assert(ShouldPropagate()); + + /* Send systemwide objects, only roles for now */ + SendNodeWideObjectsSyncCommands(context); + + /* + * Break dependencies between sequences-shell tables, then remove shell tables, + * and metadata tables respectively. + * We should delete shell tables before metadata entries as we look inside + * pg_dist_partition to figure out shell tables. + */ + SendShellTableDeletionCommands(context); + SendMetadataDeletionCommands(context); + + /* + * Commands to insert pg_dist_colocation entries. + * Replicating dist objects and their metadata depends on this step. + */ + SendColocationMetadataCommands(context); + + /* + * Replicate all objects of the pg_dist_object to the remote node and + * create metadata entries for Citus tables (pg_dist_shard, pg_dist_shard_placement, + * pg_dist_partition, pg_dist_object). + */ + SendDependencyCreationCommands(context); + SendDistTableMetadataCommands(context); + SendDistObjectCommands(context); + + /* + * After creating each table, handle the inter table relationship between + * those tables. + */ + SendInterTableRelationshipCommands(context); +} + + +/* + * SendNodeWideObjectsSyncCommands sends systemwide objects to workers with + * transactional or nontransactional mode according to transactionMode inside + * metadataSyncContext. + */ +void +SendNodeWideObjectsSyncCommands(MetadataSyncContext *context) +{ + /* propagate node wide objects. It includes only roles for now. */ + List *commandList = PropagateNodeWideObjectsCommandList(); + + if (commandList == NIL) + { + return; + } + + commandList = lcons(DISABLE_DDL_PROPAGATION, commandList); + commandList = lappend(commandList, ENABLE_DDL_PROPAGATION); + SendOrCollectCommandListToActivatedNodes(context, commandList); +} + + +/* + * SendShellTableDeletionCommands sends sequence, and shell table deletion + * commands to workers with transactional or nontransactional mode according to + * transactionMode inside metadataSyncContext. + */ +void +SendShellTableDeletionCommands(MetadataSyncContext *context) +{ + /* break all sequence deps for citus tables and remove all shell tables */ + char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND; + SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand)); + + /* remove shell tables */ + bool singleTransaction = (context->transactionMode == METADATA_SYNC_TRANSACTIONAL); + char *dropShellTablesCommand = WorkerDropAllShellTablesCommand(singleTransaction); + SendOrCollectCommandListToActivatedNodes(context, list_make1(dropShellTablesCommand)); +} + + +/* + * SendMetadataDeletionCommands sends metadata entry deletion commands to workers + * with transactional or nontransactional mode according to transactionMode inside + * metadataSyncContext. + */ +void +SendMetadataDeletionCommands(MetadataSyncContext *context) +{ + /* remove pg_dist_partition entries */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PARTITIONS)); + + /* remove pg_dist_shard entries */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_SHARDS)); + + /* remove pg_dist_placement entries */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PLACEMENTS)); + + /* remove pg_dist_object entries */ + SendOrCollectCommandListToActivatedNodes(context, + list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS)); + + /* remove pg_dist_colocation entries */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_COLOCATION)); +} + + +/* + * SendColocationMetadataCommands sends colocation metadata with transactional or + * nontransactional mode according to transactionMode inside metadataSyncContext. + */ +void +SendColocationMetadataCommands(MetadataSyncContext *context) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + + Relation relation = table_open(DistColocationRelationId(), AccessShareLock); + SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + HeapTuple nextTuple = NULL; + while (true) + { + ResetMetadataSyncMemoryContext(context); + + nextTuple = systable_getnext(scanDesc); + if (!HeapTupleIsValid(nextTuple)) + { + break; + } + + StringInfo colocationGroupCreateCommand = makeStringInfo(); + appendStringInfo(colocationGroupCreateCommand, + "WITH colocation_group_data (colocationid, shardcount, " + "replicationfactor, distributioncolumntype, " + "distributioncolumncollationname, " + "distributioncolumncollationschema) AS (VALUES "); + + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(nextTuple); + + appendStringInfo(colocationGroupCreateCommand, + "(%d, %d, %d, %s, ", + colocationForm->colocationid, + colocationForm->shardcount, + colocationForm->replicationfactor, + RemoteTypeIdExpression(colocationForm->distributioncolumntype)); + + /* + * For collations, include the names in the VALUES section and then + * join with pg_collation. + */ + Oid distributionColumCollation = colocationForm->distributioncolumncollation; + if (distributionColumCollation != InvalidOid) + { + Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation); + HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); + if (HeapTupleIsValid(collationTuple)) + { + Form_pg_collation collationform = + (Form_pg_collation) GETSTRUCT(collationTuple); + char *collationName = NameStr(collationform->collname); + char *collationSchemaName = + get_namespace_name(collationform->collnamespace); + appendStringInfo(colocationGroupCreateCommand, + "%s, %s)", + quote_literal_cstr(collationName), + quote_literal_cstr(collationSchemaName)); + ReleaseSysCache(collationTuple); + } + else + { + appendStringInfo(colocationGroupCreateCommand, + "NULL, NULL)"); + } + } + else + { + appendStringInfo(colocationGroupCreateCommand, + "NULL, NULL)"); + } + + appendStringInfo(colocationGroupCreateCommand, + ") SELECT pg_catalog.citus_internal_add_colocation_metadata(" + "colocationid, shardcount, replicationfactor, " + "distributioncolumntype, coalesce(c.oid, 0)) " + "FROM colocation_group_data d LEFT JOIN pg_collation c " + "ON (d.distributioncolumncollationname = c.collname " + "AND d.distributioncolumncollationschema::regnamespace" + " = c.collnamespace)"); + + List *commandList = list_make1(colocationGroupCreateCommand->data); + SendOrCollectCommandListToActivatedNodes(context, commandList); + } + MemoryContextSwitchTo(oldContext); + + systable_endscan(scanDesc); + table_close(relation, AccessShareLock); +} + + +/* + * SendDependencyCreationCommands sends dependency creation commands to workers + * with transactional or nontransactional mode according to transactionMode + * inside metadataSyncContext. + */ +void +SendDependencyCreationCommands(MetadataSyncContext *context) +{ + /* disable ddl propagation */ + SendOrCollectCommandListToActivatedNodes(context, + list_make1(DISABLE_DDL_PROPAGATION)); + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + + /* collect all dependencies in creation order and get their ddl commands */ + List *dependencies = GetDistributedObjectAddressList(); + + /* + * Depending on changes in the environment, such as the enable_metadata_sync guc + * there might be objects in the distributed object address list that should currently + * not be propagated by citus as they are 'not supported'. + */ + dependencies = FilterObjectAddressListByPredicate(dependencies, + &SupportedDependencyByCitus); + + dependencies = OrderObjectAddressListInDependencyOrder(dependencies); + + /* + * We need to create a subcontext as we reset the context after each dependency + * creation but we want to preserve all dependency objects at metadataSyncContext. + */ + MemoryContext commandsContext = AllocSetContextCreate(context->context, + "dependency commands context", + ALLOCSET_DEFAULT_SIZES); + MemoryContextSwitchTo(commandsContext); + ObjectAddress *dependency = NULL; + foreach_ptr(dependency, dependencies) + { + if (!MetadataSyncCollectsCommands(context)) + { + MemoryContextReset(commandsContext); + } + + if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL)) + { + /* + * We expect extension-owned objects to be created as a result + * of the extension being created. + */ + continue; + } + + /* dependency creation commands */ + List *ddlCommands = GetAllDependencyCreateDDLCommands(list_make1(dependency)); + SendOrCollectCommandListToActivatedNodes(context, ddlCommands); + } + MemoryContextSwitchTo(oldContext); + + if (!MetadataSyncCollectsCommands(context)) + { + MemoryContextDelete(commandsContext); + } + ResetMetadataSyncMemoryContext(context); + + /* enable ddl propagation */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); +} + + +/* + * SendDistTableMetadataCommands sends commands related to pg_dist_shard and, + * pg_dist_shard_placement entries to workers with transactional or nontransactional + * mode according to transactionMode inside metadataSyncContext. + */ +void +SendDistTableMetadataCommands(MetadataSyncContext *context) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + + Relation relation = table_open(DistPartitionRelationId(), AccessShareLock); + TupleDesc tupleDesc = RelationGetDescr(relation); + + SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + HeapTuple nextTuple = NULL; + while (true) + { + ResetMetadataSyncMemoryContext(context); + + nextTuple = systable_getnext(scanDesc); + if (!HeapTupleIsValid(nextTuple)) + { + break; + } + + /* + * Create Citus table metadata commands (pg_dist_shard, pg_dist_shard_placement, + * pg_dist_partition). Only Citus tables have shard metadata. + */ + Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc); + if (!ShouldSyncTableMetadata(relationId)) + { + continue; + } + + List *commandList = CitusTableMetadataCreateCommandList(relationId); + SendOrCollectCommandListToActivatedNodes(context, commandList); + } + MemoryContextSwitchTo(oldContext); + + systable_endscan(scanDesc); + table_close(relation, AccessShareLock); +} + + +/* + * SendDistObjectCommands sends commands related to pg_dist_object entries to + * workers with transactional or nontransactional mode according to transactionMode + * inside metadataSyncContext. + */ +void +SendDistObjectCommands(MetadataSyncContext *context) +{ + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + + Relation relation = table_open(DistObjectRelationId(), AccessShareLock); + TupleDesc tupleDesc = RelationGetDescr(relation); + + SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + HeapTuple nextTuple = NULL; + while (true) + { + ResetMetadataSyncMemoryContext(context); + + nextTuple = systable_getnext(scanDesc); + if (!HeapTupleIsValid(nextTuple)) + { + break; + } + + Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(nextTuple); + + ObjectAddress *address = palloc(sizeof(ObjectAddress)); + + ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid, + pg_dist_object->objsubid); + + bool distributionArgumentIndexIsNull = false; + Datum distributionArgumentIndexDatum = + heap_getattr(nextTuple, + Anum_pg_dist_object_distribution_argument_index, + tupleDesc, + &distributionArgumentIndexIsNull); + int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum); + + bool colocationIdIsNull = false; + Datum colocationIdDatum = + heap_getattr(nextTuple, + Anum_pg_dist_object_colocationid, + tupleDesc, + &colocationIdIsNull); + int32 colocationId = DatumGetInt32(colocationIdDatum); + + bool forceDelegationIsNull = false; + Datum forceDelegationDatum = + heap_getattr(nextTuple, + Anum_pg_dist_object_force_delegation, + tupleDesc, + &forceDelegationIsNull); + bool forceDelegation = DatumGetBool(forceDelegationDatum); + + if (distributionArgumentIndexIsNull) + { + distributionArgumentIndex = INVALID_DISTRIBUTION_ARGUMENT_INDEX; + } + + if (colocationIdIsNull) + { + colocationId = INVALID_COLOCATION_ID; + } + + if (forceDelegationIsNull) + { + forceDelegation = NO_FORCE_PUSHDOWN; + } + + char *workerMetadataUpdateCommand = + MarkObjectsDistributedCreateCommand(list_make1(address), + list_make1_int(distributionArgumentIndex), + list_make1_int(colocationId), + list_make1_int(forceDelegation)); + SendOrCollectCommandListToActivatedNodes(context, + list_make1(workerMetadataUpdateCommand)); + } + MemoryContextSwitchTo(oldContext); + + systable_endscan(scanDesc); + relation_close(relation, NoLock); +} + + +/* + * SendInterTableRelationshipCommands sends inter-table relationship commands + * (e.g. constraints, attach partitions) to workers with transactional or + * nontransactional mode per inter table relationship according to transactionMode + * inside metadataSyncContext. + */ +void +SendInterTableRelationshipCommands(MetadataSyncContext *context) +{ + /* disable ddl propagation */ + SendOrCollectCommandListToActivatedNodes(context, + list_make1(DISABLE_DDL_PROPAGATION)); + + ScanKeyData scanKey[1]; + int scanKeyCount = 0; + + Relation relation = table_open(DistPartitionRelationId(), AccessShareLock); + TupleDesc tupleDesc = RelationGetDescr(relation); + + SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, + scanKeyCount, scanKey); + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + HeapTuple nextTuple = NULL; + while (true) + { + ResetMetadataSyncMemoryContext(context); + + nextTuple = systable_getnext(scanDesc); + if (!HeapTupleIsValid(nextTuple)) + { + break; + } + + Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc); + if (!ShouldSyncTableMetadata(relationId)) + { + continue; + } + + /* + * Skip foreign key and partition creation when the Citus table is + * owned by an extension. + */ + if (IsTableOwnedByExtension(relationId)) + { + continue; + } + + List *commandList = InterTableRelationshipOfRelationCommandList(relationId); + SendOrCollectCommandListToActivatedNodes(context, commandList); + } + MemoryContextSwitchTo(oldContext); + + systable_endscan(scanDesc); + table_close(relation, AccessShareLock); + + /* enable ddl propagation */ + SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); +} diff --git a/src/include/distributed/metadata/dependency.h b/src/include/distributed/metadata/dependency.h index c5a65319e..2d3759e1f 100644 --- a/src/include/distributed/metadata/dependency.h +++ b/src/include/distributed/metadata/dependency.h @@ -19,6 +19,8 @@ #include "distributed/errormessage.h" #include "nodes/pg_list.h" +typedef bool (*AddressPredicate)(const ObjectAddress *); + extern List * GetUniqueDependenciesList(List *objectAddressesList); extern List * GetDependenciesForObject(const ObjectAddress *target); extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target); @@ -33,5 +35,7 @@ extern List * GetPgDependTuplesForDependingObjects(Oid targetObjectClassId, Oid targetObjectId); extern List * GetDependingViews(Oid relationId); extern Oid GetDependingView(Form_pg_depend pg_depend); +extern List * FilterObjectAddressListByPredicate(List *objectAddressList, + AddressPredicate predicate); #endif /* CITUS_DEPENDENCY_H */ diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 608dbbacc..2c673b11a 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -36,7 +36,7 @@ extern int MetadataSyncTransMode; typedef struct MetadataSyncContext { List *activatedWorkerNodeList; /* activated worker nodes */ - List *activatedWorkerConnections; /* connections to activated worker nodes */ + List *activatedWorkerBareConnections; /* bare connections to activated worker nodes */ MemoryContext context; /* memory context for all allocations */ MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */ bool collectCommands; /* flag to collect commands instead of sending and resetting */ @@ -81,6 +81,8 @@ extern char * LocalGroupIdUpdateCommand(int32 groupId); extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress); extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); +extern Oid FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, + TupleDesc tupleDesc); extern bool ShouldSyncSequenceMetadata(Oid relationId); extern List * NodeMetadataCreateCommands(void); extern List * DistributedObjectMetadataSyncCommandList(void); @@ -140,8 +142,6 @@ extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool testMode); extern void DestroyMetadataSyncContext(MetadataSyncContext *context); extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context); -extern void EstablishAndSetMetadataSyncCoordinatedConnections( - MetadataSyncContext *context); extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList); extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context); @@ -155,6 +155,16 @@ extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, extern char * WorkerDropAllShellTablesCommand(bool singleTransaction); +extern void SyncDistributedObjects(MetadataSyncContext *context); +extern void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context); +extern void SendShellTableDeletionCommands(MetadataSyncContext *context); +extern void SendMetadataDeletionCommands(MetadataSyncContext *context); +extern void SendColocationMetadataCommands(MetadataSyncContext *context); +extern void SendDependencyCreationCommands(MetadataSyncContext *context); +extern void SendDistTableMetadataCommands(MetadataSyncContext *context); +extern void SendDistObjectCommands(MetadataSyncContext *context); +extern void SendInterTableRelationshipCommands(MetadataSyncContext *context); + #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" #define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard" diff --git a/src/test/regress/expected/upgrade_post_11_after.out b/src/test/regress/expected/upgrade_post_11_after.out index d7d7c46b0..cf41da8e1 100644 --- a/src/test/regress/expected/upgrade_post_11_after.out +++ b/src/test/regress/expected/upgrade_post_11_after.out @@ -25,11 +25,11 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.p (19 rows) -- on all nodes -SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1; +SELECT run_command_on_workers($$SELECT array_agg(worker_object) FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) worker_object FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1) worker_objects;$$) ORDER BY 1; run_command_on_workers --------------------------------------------------------------------- - (localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") - (localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") + (localhost,57636,t,"{""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})"",""(type,{post_11_upgrade.my_type},{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})""}") + (localhost,57637,t,"{""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})"",""(type,{post_11_upgrade.my_type},{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})""}") (2 rows) -- Create the necessary test utility function diff --git a/src/test/regress/sql/upgrade_post_11_after.sql b/src/test/regress/sql/upgrade_post_11_after.sql index e38491593..946c52ae2 100644 --- a/src/test/regress/sql/upgrade_post_11_after.sql +++ b/src/test/regress/sql/upgrade_post_11_after.sql @@ -4,7 +4,7 @@ SET search_path = post_11_upgrade; SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1; -- on all nodes -SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1; +SELECT run_command_on_workers($$SELECT array_agg(worker_object) FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) worker_object FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1) worker_objects;$$) ORDER BY 1; -- Create the necessary test utility function CREATE OR REPLACE FUNCTION activate_node_snapshot()