diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 812f23962..9b0a79e22 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -427,16 +427,24 @@ ProcessUtilityInternal(PlannedStmt *pstmt, parsetree = ProcessCreateSubscriptionStmt(createSubStmt); } + /* + * For security and reliability reasons we disallow altering and dropping + * subscriptions created by citus by non superusers. We could probably + * disallow this for all subscriptions without issues. But out of an + * abundance of caution for breaking subscription logic created by users + * for other purposes, we only disallow it for the subscriptions that we + * create i.e. ones that start with "citus_". + */ if (IsA(parsetree, AlterSubscriptionStmt)) { AlterSubscriptionStmt *alterSubStmt = (AlterSubscriptionStmt *) parsetree; if (!superuser() && - StringStartsWith(alterSubStmt->subname, - SHARD_MOVE_SUBSCRIPTION_PREFIX)) + StringStartsWith(alterSubStmt->subname, "citus_")) { ereport(ERROR, ( errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("Only superusers can alter shard move subscriptions"))); + errmsg( + "Only superusers can alter subscriptions that are created by citus"))); } } @@ -444,11 +452,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt, { DropSubscriptionStmt *dropSubStmt = (DropSubscriptionStmt *) parsetree; if (!superuser() && - StringStartsWith(dropSubStmt->subname, SHARD_MOVE_SUBSCRIPTION_PREFIX)) + StringStartsWith(dropSubStmt->subname, "citus_")) { ereport(ERROR, ( errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), - errmsg("Only superusers can drop shard move subscriptions"))); + errmsg( + "Only superusers can drop subscriptions that are created by citus"))); } } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 6ef99a321..f446bbadd 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -21,6 +21,7 @@ #include "distributed/shared_library_init.h" #include "distributed/adaptive_executor.h" #include "distributed/colocation_utils.h" +#include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" #include "distributed/shardinterval_utils.h" #include "distributed/coordinator_protocol.h" @@ -51,6 +52,17 @@ typedef struct ShardCreatedByWorkflowEntry WorkerNode *workerNodeValue; } ShardCreatedByWorkflowEntry; +/* + * Entry for map that trackes dummy shards. + * Key: node + owner + * Value: List of dummy shards for that node + owner + */ +typedef struct GroupedDummyShards +{ + NodeAndOwner key; + List *shardIntervals; +} GroupedDummyShards; + /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, @@ -74,9 +86,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList, bool includeReplicaIdentity); -static void CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); +static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement); static void CreateObjectOnPlacement(List *objectCreationCommandList, WorkerNode *workerNode); static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, @@ -114,10 +124,6 @@ static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *work static StringInfo CreateSplitShardReplicationSetupUDF( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -static char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, - WorkerNode *sourceWorkerNode, - MultiConnection ** - templateSlotConnection); static List * ParseReplicationSlotInfoFromResult(PGresult *result); static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, @@ -1101,17 +1107,16 @@ CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, /* * Iterate on split shards list for a given shard and create constraints. */ - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) + forboth_ptr(shardInterval, shardIntervalList, + workerPlacementNode, workersForPlacementList) { List *shardForeignConstraintCommandList = NIL; List *referenceTableForeignConstraintList = NIL; - CopyShardForeignConstraintCommandListGrouped(shardInterval, - & - shardForeignConstraintCommandList, - & - referenceTableForeignConstraintList); + CopyShardForeignConstraintCommandListGrouped( + shardInterval, + &shardForeignConstraintCommandList, + &referenceTableForeignConstraintList); List *constraintCommandList = NIL; constraintCommandList = list_concat(constraintCommandList, @@ -1271,29 +1276,30 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardIntervalToSplit->shardId); /* Create hashmap to group shards for publication-subscription management */ - HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( + HTAB *publicationInfoHash = CreateShardSplitInfoMapForPublication( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); - DropAllShardSplitLeftOvers(sourceShardToCopyNode, shardSplitHashMapForPublication); + DropAllLogicalReplicationLeftovers(SHARD_SPLIT); int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceShardToCopyNode - -> - workerName, - sourceShardToCopyNode - -> - workerPort, - superUser, - databaseName); + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection( + connectionFlags, + sourceShardToCopyNode->workerName, + sourceShardToCopyNode->workerPort, + superUser, + databaseName); ClaimConnectionExclusively(sourceConnection); HTAB *mapOfShardToPlacementCreatedByWorkflow = CreateEmptyMapForShardsCreatedByWorkflow(); - HTAB *mapOfDummyShardToPlacement = SetupHashMapForShardInfo(); + HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner, + GroupedShardSplitInfos); + MultiConnection *sourceReplicationConnection = + GetReplicationConnection(sourceShardToCopyNode->workerName, + sourceShardToCopyNode->workerPort); /* Non-Blocking shard split workflow starts here */ PG_TRY(); @@ -1315,27 +1321,22 @@ NonBlockingShardSplit(SplitOperation splitOperation, sourceShardToCopyNode, workersForPlacementList); - CreateReplicaIdentities(mapOfDummyShardToPlacement, - shardGroupSplitIntervalListList, workersForPlacementList); - - /* 3) Create Publications. */ - CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* - * 4) Create template replication Slot. It returns a snapshot. The snapshot remains - * valid till the lifetime of the session that creates it. The connection is closed - * at the end of the workflow. + * 3) Create replica identities on dummy shards. This needs to be done + * before the subscriptions are created. Otherwise the subscription + * creation will get stuck waiting for the publication to send a + * replica identity. Since we never actually write data into these + * dummy shards there's no point in creating these indexes after the + * initial COPY phase, like we do for the replica identities on the + * target shards. */ - MultiConnection *templateSlotConnection = NULL; - char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( - shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); + CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement); - /* 5) Do snapshotted Copy */ - DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, workersForPlacementList, - snapShotName); + /* 4) Create Publications. */ + CreatePublications(sourceConnection, publicationInfoHash); - /* 6) Execute 'worker_split_shard_replication_setup UDF */ + + /* 5) Execute 'worker_split_shard_replication_setup UDF */ List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( sourceShardToCopyNode, sourceColocatedShardIntervalList, @@ -1346,80 +1347,98 @@ NonBlockingShardSplit(SplitOperation splitOperation, * Subscriber flow starts from here. * Populate 'ShardSplitSubscriberMetadata' for subscription management. */ - List *shardSplitSubscribersMetadataList = + List *logicalRepTargetList = PopulateShardSplitSubscriptionsMetadataList( - shardSplitHashMapForPublication, replicationSlotInfoList); + publicationInfoHash, replicationSlotInfoList, + shardGroupSplitIntervalListList, workersForPlacementList); + + HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash( + logicalRepTargetList); /* Create connections to the target nodes */ - List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( - shardSplitSubscribersMetadataList, - connectionFlags, + CreateGroupedLogicalRepTargetsConnections( + groupedLogicalRepTargetsHash, superUser, databaseName); - /* 7) Create copies of template replication slot */ - char *templateSlotName = ShardSplitTemplateReplicationSlotName( - shardIntervalToSplit->shardId); - CreateReplicationSlots(sourceConnection, templateSlotName, - shardSplitSubscribersMetadataList); + char *logicalRepDecoderPlugin = "citus"; + + /* + * 6) Create replication slots and keep track of their snapshot. + */ + char *snapshot = CreateReplicationSlots( + sourceConnection, + sourceReplicationConnection, + logicalRepTargetList, + logicalRepDecoderPlugin); + + /* + * 7) Create subscriptions. This isn't strictly needed yet at this + * stage, but this way we error out quickly if it fails. + */ + CreateSubscriptions( + sourceConnection, + databaseName, + logicalRepTargetList); + + /* 8) Do snapshotted Copy */ + DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, workersForPlacementList, + snapshot); - /* 8) Create subscriptions on target nodes */ - CreateShardSplitSubscriptions(targetNodeConnectionList, - shardSplitSubscribersMetadataList, - sourceShardToCopyNode, - superUser, - databaseName); /* Used for testing */ ConflictOnlyWithIsolationTesting(); - /* 9) Wait for subscriptions to be ready */ - WaitForShardSplitRelationSubscriptionsBecomeReady( - shardSplitSubscribersMetadataList); + /* + * 9) Create replica identities, this needs to be done before enabling + * the subscriptions. + */ + CreateReplicaIdentities(logicalRepTargetList); - /* 10) Wait for subscribers to catchup till source LSN */ - XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); + /* + * 10) Enable the subscriptions: Start the catchup phase + */ + EnableSubscriptions(logicalRepTargetList); - /* 11) Create Auxilary structures */ + /* 11) Wait for subscriptions to be ready */ + WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); + + /* 12) Wait for subscribers to catchup till source LSN */ + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); + + /* 13) Create Auxilary structures */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList, false /* includeReplicaIdentity*/); - /* 12) Wait for subscribers to catchup till source LSN */ - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); + /* 14) Wait for subscribers to catchup till source LSN */ + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - /* 13) Block writes on source shards */ + /* 15) Block writes on source shards */ BlockWritesToShardList(sourceColocatedShardIntervalList); - /* 14) Wait for subscribers to catchup till source LSN */ - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); + /* 16) Wait for subscribers to catchup till source LSN */ + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); - /* 15) Drop Subscribers */ - DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); + /* 17) Drop Subscribers */ + DropSubscriptions(logicalRepTargetList); - /* 16) Drop Publications */ - DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - - /* 17) Drop replication slots - * Drop template and subscriber replication slots + /* 18) Drop replication slots */ - DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName( - shardIntervalToSplit->shardId)); - DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList); + DropReplicationSlots(sourceConnection, logicalRepTargetList); + + /* 19) Drop Publications */ + DropPublications(sourceConnection, publicationInfoHash); + /* - * 18) Drop old shards and delete related metadata. Have to do that before + * 20) Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks * preventing inconsistent metadata (like overlapping shards). */ DropShardList(sourceColocatedShardIntervalList); - /* 19) Insert new shard and placement metdata */ + /* 21) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); @@ -1427,7 +1446,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 20) Create foreign keys if exists after the metadata changes happening in + * 22) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ @@ -1435,18 +1454,18 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /* - * 21) Drop dummy shards. + * 23) Drop dummy shards. */ DropDummyShards(mapOfDummyShardToPlacement); - /* 22) Close source connection */ + /* 24) Close source connection */ CloseConnection(sourceConnection); - /* 23) Close all subscriber connections */ - CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); + /* 25) Close all subscriber connections */ + CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); - /* 24) Close connection of template replication slot */ - CloseConnection(templateSlotConnection); + /* 26) Close connection of template replication slot */ + CloseConnection(sourceReplicationConnection); } PG_CATCH(); { @@ -1456,8 +1475,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Do a best effort cleanup of shards created on workers in the above block */ TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); - DropAllShardSplitLeftOvers(sourceShardToCopyNode, - shardSplitHashMapForPublication); + DropAllLogicalReplicationLeftovers(SHARD_SPLIT); DropDummyShards(mapOfDummyShardToPlacement); @@ -1603,44 +1621,6 @@ CreateWorkerForPlacementSet(List *workersForPlacementList) } -/* - * CreateTemplateReplicationSlotAndReturnSnapshot creates a replication slot - * and returns its snapshot. This slot acts as a 'Template' for creating - * replication slot copies used for logical replication. - * - * The snapshot remains valid till the lifetime of the session that creates it. - */ -char * -CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, - WorkerNode *sourceWorkerNode, - MultiConnection **templateSlotConnection) -{ - /*Create Template replication slot */ - int connectionFlags = FORCE_NEW_CONNECTION; - connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM; - - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceWorkerNode-> - workerName, - sourceWorkerNode-> - workerPort, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - ClaimConnectionExclusively(sourceConnection); - - /* - * Try to drop leftover template replication slot if any from previous operation - * and create new one. - */ - char *snapShotName = CreateTemplateReplicationSlot(shardInterval, - sourceConnection); - *templateSlotConnection = sourceConnection; - - return snapShotName; -} - - /* * ExecuteSplitShardReplicationSetupUDF executes * 'worker_split_shard_replication_setup' UDF on source shard node @@ -1796,22 +1776,23 @@ ParseReplicationSlotInfoFromResult(PGresult *result) List *replicationSlotInfoList = NIL; for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( + ReplicationSlotInfo *replicationSlot = (ReplicationSlotInfo *) palloc0( sizeof(ReplicationSlotInfo)); char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/); - replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); + replicationSlot->targetNodeId = strtoul(targeNodeIdString, NULL, 10); - /* We're using the pstrdup to copy the data into the current memory context */ - replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, - 1 /* table owner name column */)); + bool missingOk = false; + replicationSlot->tableOwnerId = get_role_oid( + PQgetvalue(result, rowIndex, 1 /* table owner name column */), + missingOk); /* Replication slot name */ - replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, - 2 /* slot name column */)); + replicationSlot->name = pstrdup(PQgetvalue(result, rowIndex, + 2 /* slot name column */)); - replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); + replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlot); } return replicationSlotInfoList; @@ -1829,22 +1810,22 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, ShardInterval *shardInterval) { - NodeShardMappingKey key; + NodeAndOwner key; key.nodeId = targetNodeId; key.tableOwnerId = TableOwnerOid(shardInterval->relationId); bool found = false; - NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(mapOfDummyShardToPlacement, &key, - HASH_ENTER, - &found); + GroupedDummyShards *nodeMappingEntry = + (GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key, + HASH_ENTER, + &found); if (!found) { - nodeMappingEntry->shardSplitInfoList = NIL; + nodeMappingEntry->shardIntervals = NIL; } - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + nodeMappingEntry->shardIntervals = + lappend(nodeMappingEntry->shardIntervals, shardInterval); } @@ -1858,8 +1839,8 @@ DropDummyShards(HTAB *mapOfDummyShardToPlacement) HASH_SEQ_STATUS status; hash_seq_init(&status, mapOfDummyShardToPlacement); - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + GroupedDummyShards *entry = NULL; + while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL) { uint32 nodeId = entry->key.nodeId; WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, @@ -1874,7 +1855,7 @@ DropDummyShards(HTAB *mapOfDummyShardToPlacement) CurrentUserName(), NULL /* databaseName */); - List *dummyShardIntervalList = entry->shardSplitInfoList; + List *dummyShardIntervalList = entry->shardIntervals; ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, dummyShardIntervalList) { @@ -1911,51 +1892,27 @@ DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval) /* - * CreateReplicaIdentities creates replica indentities for split children and dummy shards. + * CreateReplicaIdentitiesForDummyShards creates replica indentities for split + * dummy shards. */ static void -CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList) +CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement) { - /* - * Create Replica Identities for actual child shards. - */ - List *shardIntervalList = NIL; - foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) - { - ShardInterval *shardInterval = NULL; - WorkerNode *workerPlacementNode = NULL; - - /* - * Iterate on split shard interval list for given shard and create tasks - * for every single split shard in a shard group. - */ - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) - { - List *shardList = NIL; - shardList = lappend(shardList, shardInterval); - - CreateReplicaIdentity(shardList, workerPlacementNode->workerName, - workerPlacementNode->workerPort); - } - } - /* Create Replica Identities for dummy shards */ HASH_SEQ_STATUS status; hash_seq_init(&status, mapOfDummyShardToPlacement); - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + GroupedDummyShards *entry = NULL; + while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL) { uint32 nodeId = entry->key.nodeId; WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, false /* missingOk */); - List *dummyShardIntervalList = entry->shardSplitInfoList; - CreateReplicaIdentity(dummyShardIntervalList, shardToBeDroppedNode->workerName, - shardToBeDroppedNode->workerPort); + List *dummyShardIntervalList = entry->shardIntervals; + CreateReplicaIdentitiesOnNode(dummyShardIntervalList, + shardToBeDroppedNode->workerName, + shardToBeDroppedNode->workerPort); } } diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index 6b039283c..98d14c857 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -13,6 +13,7 @@ #include "postmaster/postmaster.h" #include "common/hashfn.h" #include "distributed/distribution_column.h" +#include "distributed/hash_helpers.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_utils.h" #include "distributed/shardsplit_shared_memory.h" @@ -104,7 +105,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) } /* SetupMap */ - ShardInfoHashMap = SetupHashMapForShardInfo(); + ShardInfoHashMap = CreateSimpleHash(NodeAndOwner, GroupedShardSplitInfos); int shardSplitInfoCount = 0; @@ -154,30 +155,6 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) } -/* - * SetupHashMapForShardInfo initializes a hash map to store shard split - * information by grouping them node id wise. The key of the hash table - * is 'nodeId' and value is a list of ShardSplitInfo that are placed on - * this particular node. - */ -HTAB * -SetupHashMapForShardInfo() -{ - HASHCTL info; - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(NodeShardMappingKey); - info.entrysize = sizeof(NodeShardMappingEntry); - info.hash = NodeShardMappingHash; - info.match = NodeShardMappingHashCompare; - info.hcxt = CurrentMemoryContext; - - int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE); - - HTAB *shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags); - return shardInfoMap; -} - - /* * CreateShardSplitInfo function constructs ShardSplitInfo data structure * with appropriate OIs' for source and destination relation. @@ -267,21 +244,21 @@ CreateShardSplitInfo(uint64 sourceShardIdToSplit, static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo) { - NodeShardMappingKey key; + NodeAndOwner key; key.nodeId = shardSplitInfo->nodeId; key.tableOwnerId = TableOwnerOid(shardSplitInfo->distributedTableOid); bool found = false; - NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(ShardInfoHashMap, &key, HASH_ENTER, - &found); + GroupedShardSplitInfos *groupedInfos = + (GroupedShardSplitInfos *) hash_search(ShardInfoHashMap, &key, HASH_ENTER, + &found); if (!found) { - nodeMappingEntry->shardSplitInfoList = NIL; + groupedInfos->shardSplitInfoList = NIL; } - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo); + groupedInfos->shardSplitInfoList = + lappend(groupedInfos->shardSplitInfoList, (ShardSplitInfo *) shardSplitInfo); } @@ -298,14 +275,13 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); - NodeShardMappingEntry *entry = NULL; + GroupedShardSplitInfos *entry = NULL; int splitInfoIndex = 0; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + while ((entry = (GroupedShardSplitInfos *) hash_seq_search(&status)) != NULL) { uint32_t nodeId = entry->key.nodeId; uint32_t tableOwnerId = entry->key.tableOwnerId; - char *derivedSlotName = - EncodeReplicationSlot(nodeId, tableOwnerId); + char *derivedSlotName = ReplicationSlotName(SHARD_SPLIT, nodeId, tableOwnerId); List *shardSplitInfoList = entry->shardSplitInfoList; ShardSplitInfo *splitShardInfo = NULL; @@ -321,41 +297,6 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader) } -/* - * NodeShardMappingHash returns hash value by combining hash of node id - * and tableowner Id. - */ -uint32 -NodeShardMappingHash(const void *key, Size keysize) -{ - NodeShardMappingKey *entry = (NodeShardMappingKey *) key; - uint32 hash = hash_uint32(entry->nodeId); - hash = hash_combine(hash, hash_uint32(entry->tableOwnerId)); - return hash; -} - - -/* - * Comparator function for hash keys - */ -int -NodeShardMappingHashCompare(const void *left, const void *right, Size keysize) -{ - NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left; - NodeShardMappingKey *rightKey = (NodeShardMappingKey *) right; - - if (leftKey->nodeId != rightKey->nodeId || - leftKey->tableOwnerId != rightKey->tableOwnerId) - { - return 1; - } - else - { - return 0; - } -} - - /* * ParseShardSplitInfoFromDatum deserializes individual fields of 'pg_catalog.split_shard_info' * datatype. @@ -434,8 +375,8 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMap); - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + GroupedShardSplitInfos *entry = NULL; + while ((entry = (GroupedShardSplitInfos *) hash_seq_search(&status)) != NULL) { Datum values[3]; bool nulls[3]; @@ -448,8 +389,8 @@ ReturnReplicationSlotInfo(Tuplestorestate *tupleStore, TupleDesc char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); values[1] = CStringGetTextDatum(tableOwnerName); - char *slotName = EncodeReplicationSlot(entry->key.nodeId, - entry->key.tableOwnerId); + char *slotName = ReplicationSlotName(SHARD_SPLIT, entry->key.nodeId, + entry->key.tableOwnerId); values[2] = CStringGetTextDatum(slotName); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 25754699b..794481b9a 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -24,14 +24,16 @@ #include "access/htup_details.h" #include "access/sysattr.h" #include "access/xact.h" -#include "catalog/pg_subscription_rel.h" #include "commands/dbcommands.h" +#include "common/hashfn.h" +#include "catalog/pg_subscription_rel.h" #include "catalog/namespace.h" #include "catalog/pg_constraint.h" #include "distributed/adaptive_executor.h" #include "distributed/citus_safe_lib.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" +#include "distributed/hash_helpers.h" #include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" @@ -62,6 +64,8 @@ #include "utils/ruleutils.h" #include "utils/syscache.h" +#define STR_ERRCODE_UNDEFINED_OBJECT "42704" + #define REPLICATION_SLOT_CATALOG_TABLE_NAME "pg_replication_slots" #define CURRENT_LOG_POSITION_COMMAND "SELECT pg_current_wal_lsn()" @@ -72,6 +76,31 @@ /* decimal representation of Adler-16 hash value of citus_shard_move_subscription */ #define SHARD_MOVE_ADVISORY_LOCK_SECOND_KEY 55152 +static const char *publicationPrefix[] = { + [SHARD_MOVE] = "citus_shard_move_publication_", + [SHARD_SPLIT] = "citus_shard_split_publication_", +}; + +static const char *replicationSlotPrefix[] = { + [SHARD_MOVE] = "citus_shard_move_slot_", + [SHARD_SPLIT] = "citus_shard_split_slot_", +}; + +/* + * IMPORTANT: All the subscription names should start with "citus_". Otherwise + * our utility hook does not defend against non-superusers altering or dropping + * them, which is important for security purposes. + */ +static const char *subscriptionPrefix[] = { + [SHARD_MOVE] = "citus_shard_move_subscription_", + [SHARD_SPLIT] = "citus_shard_split_subscription_", +}; + +static const char *subscriptionRolePrefix[] = { + [SHARD_MOVE] = "citus_shard_move_subscription_role_", + [SHARD_SPLIT] = "citus_shard_split_subscription_role_", +}; + /* GUC variable, defaults to 2 hours */ int LogicalReplicationTimeout = 2 * 60 * 60 * 1000; @@ -80,15 +109,12 @@ int LogicalReplicationTimeout = 2 * 60 * 60 * 1000; /* see the comment in master_move_shard_placement */ bool PlacementMovedUsingLogicalReplicationInTX = false; - /* report in every 10 seconds */ static int logicalReplicationProgressReportTimeout = 10 * 1000; -static void CreateForeignConstraintsToReferenceTable(List *shardList, - MultiConnection *targetConnection); +static void CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList); static List * PrepareReplicationSubscriptionList(List *shardList); -static Bitmapset * TableOwnerIds(List *shardList); static List * GetReplicaIdentityCommandListForShard(Oid relationId, uint64 shardId); static List * GetIndexCommandListForShardBackingReplicaIdentity(Oid relationId, uint64 shardId); @@ -114,38 +140,34 @@ static void CreatePartitioningHierarchy(List *shardList, char *targetNodeName, int targetNodePort); static void CreateColocatedForeignKeys(List *shardList, char *targetNodeName, int targetNodePort); -static void DropShardMovePublications(MultiConnection *connection, - Bitmapset *tableOwnerIds); -static void DropShardMoveSubscriptions(MultiConnection *connection, - Bitmapset *tableOwnerIds); -static void CreateShardMovePublications(MultiConnection *connection, List *shardList, - Bitmapset *tableOwnerIds); -static MultiConnection * GetReplicationConnection(char *nodeName, int nodePort); -static char * CreateShardMoveSubscriptions(MultiConnection *sourceConnection, - MultiConnection *targetConnection, - MultiConnection *sourceReplicationConnection, - char *databaseName, - Bitmapset *tableOwnerIds); -static void EnableShardMoveSubscriptions(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds); static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); -static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, - char *operationPrefix); +static bool RelationSubscriptionsAreReady( + GroupedLogicalRepTargets *groupedLogicalRepTargets); static void WaitForMiliseconds(long timeout); -static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, - Bitmapset *tableOwnerIds, - char *operationPrefix); -static char * ShardMovePublicationName(Oid ownerId); +static XLogRecPtr GetSubscriptionPosition( + GroupedLogicalRepTargets *groupedLogicalRepTargets); static void AcquireLogicalReplicationLock(void); -static void DropAllShardMoveLeftovers(void); -static void DropAllShardMoveSubscriptions(MultiConnection *connection); -static void DropAllShardMoveReplicationSlots(MultiConnection *connection); -static void DropAllShardMovePublications(MultiConnection *connection); -static void DropAllShardMoveUsers(MultiConnection *connection); -static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, - char *operationPrefix); +static void DropSubscription(MultiConnection *connection, + char *subscriptionName); +static void DropPublication(MultiConnection *connection, char *publicationName); + +static void DropUser(MultiConnection *connection, char *username); +static void DropReplicationSlot(MultiConnection *connection, + char *publicationName); +static void DropAllSubscriptions(MultiConnection *connection, LogicalRepType type); +static void DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type); +static void DropAllPublications(MultiConnection *connection, LogicalRepType type); +static void DropAllUsers(MultiConnection *connection, LogicalRepType type); +static HTAB * CreateShardMovePublicationInfoHash(WorkerNode *targetNode, + List *shardIntervals); +static List * CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, + List *shardList); +static void WaitForGroupedLogicalRepTargetsToBecomeReady( + GroupedLogicalRepTargets *groupedLogicalRepTargets); +static void WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, + GroupedLogicalRepTargets * + groupedLogicalRepTargets); /* * LogicallyReplicateShards replicates a list of shards from one node to another @@ -173,45 +195,57 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo return; } - Bitmapset *tableOwnerIds = TableOwnerIds(replicationSubscriptionList); - - DropAllShardMoveLeftovers(); + DropAllLogicalReplicationLeftovers(SHARD_MOVE); MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, sourceNodeName, sourceNodePort, superUser, databaseName); - MultiConnection *targetConnection = - GetNodeUserDatabaseConnection(connectionFlags, targetNodeName, targetNodePort, - superUser, databaseName); + + /* + * Operations on publications and replication slots cannot run in a + * transaction block. We claim the connections exclusively to ensure they + * do not get used for metadata syncing, which does open a transaction + * block. + */ + ClaimConnectionExclusively(sourceConnection); + MultiConnection *sourceReplicationConnection = GetReplicationConnection(sourceNodeName, sourceNodePort); - /* - * Operations on publications and subscriptions cannot run in a transaction - * block. Claim the connections exclusively to ensure they do not get used - * for metadata syncing, which does open a transaction block. - */ - ClaimConnectionExclusively(sourceConnection); - ClaimConnectionExclusively(targetConnection); + WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); + WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); + + HTAB *publicationInfoHash = CreateShardMovePublicationInfoHash( + targetNode, replicationSubscriptionList); + + List *logicalRepTargetList = CreateShardMoveLogicalRepTargetList(publicationInfoHash, + shardList); + + HTAB *groupedLogicalRepTargetsHash = CreateGroupedLogicalRepTargetsHash( + logicalRepTargetList); + + CreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, superUser, + databaseName); PG_TRY(); { /* set up the publication on the source and subscription on the target */ - CreateShardMovePublications(sourceConnection, replicationSubscriptionList, - tableOwnerIds); - char *snapshot = CreateShardMoveSubscriptions( + CreatePublications(sourceConnection, publicationInfoHash); + char *snapshot = CreateReplicationSlots( sourceConnection, - targetConnection, sourceReplicationConnection, + logicalRepTargetList, + "pgoutput"); + + CreateSubscriptions( + sourceConnection, databaseName, - tableOwnerIds); + logicalRepTargetList); /* only useful for isolation testing, see the function comment for the details */ ConflictOnlyWithIsolationTesting(); - WorkerNode *sourceNode = FindWorkerNode(sourceNodeName, sourceNodePort); - WorkerNode *targetNode = FindWorkerNode(targetNodeName, targetNodePort); CopyShardsToNode(sourceNode, targetNode, shardList, snapshot); /* @@ -234,10 +268,10 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * incrementally. So this way we create the primary key index in one go * for all data from the initial COPY. */ - CreateReplicaIdentity(shardList, targetNodeName, targetNodePort); + CreateReplicaIdentities(logicalRepTargetList); /* Start applying the changes from the replication slots to catch up. */ - EnableShardMoveSubscriptions(targetConnection, tableOwnerIds); + EnableSubscriptions(logicalRepTargetList); /* * The following check is a leftover from when used subscriptions with @@ -249,16 +283,13 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * the subscription is not in the ready state yet, because so far it * never had to. */ - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, - SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForAllSubscriptionsToBecomeReady(groupedLogicalRepTargetsHash); /* - * Wait until the subscription is caught up to changes that has happened - * after the initial COPY on the shards. + * Wait until all the subscriptions are caught up to changes that + * happened after the initial COPY on the shards. */ - XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, - SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); /* * Now lets create the post-load objects, such as the indexes, constraints @@ -267,13 +298,12 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName, targetNodePort); - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, - SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); /* * We're almost done, we'll block the writes to the shards that we're - * replicating and expect the subscription to catch up quickly afterwards. + * replicating and expect all the subscription to catch up quickly + * afterwards. * * Notice that although shards in partitioned relation are excluded from * logical replication, they are still locked against modification, and @@ -281,9 +311,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ BlockWritesToShardList(shardList); - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, - SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForAllSubscriptionsToCatchUp(sourceConnection, groupedLogicalRepTargetsHash); /* * We're creating the foreign constraints to reference tables after the @@ -294,11 +322,12 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * cascade to the hash distributed tables' shards if we had created * the constraints earlier. */ - CreateForeignConstraintsToReferenceTable(shardList, targetConnection); + CreateForeignConstraintsToReferenceTable(logicalRepTargetList); /* we're done, cleanup the publication and subscription */ - DropShardMoveSubscriptions(targetConnection, tableOwnerIds); - DropShardMovePublications(sourceConnection, tableOwnerIds); + DropSubscriptions(logicalRepTargetList); + DropReplicationSlots(sourceConnection, logicalRepTargetList); + DropPublications(sourceConnection, publicationInfoHash); /* * We use these connections exclusively for subscription management, @@ -306,7 +335,7 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * these connections instead of the connections that were used to * grab locks in BlockWritesToShardList. */ - CloseConnection(targetConnection); + CloseGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash); CloseConnection(sourceConnection); } PG_CATCH(); @@ -320,15 +349,10 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo */ /* reconnect if the connection failed or is waiting for a command */ - if (PQstatus(targetConnection->pgConn) != CONNECTION_OK || - PQisBusy(targetConnection->pgConn)) - { - targetConnection = GetNodeUserDatabaseConnection(connectionFlags, - targetNodeName, - targetNodePort, - superUser, databaseName); - } - DropShardMoveSubscriptions(targetConnection, tableOwnerIds); + RecreateGroupedLogicalRepTargetsConnections(groupedLogicalRepTargetsHash, + superUser, databaseName); + + DropSubscriptions(logicalRepTargetList); /* reconnect if the connection failed or is waiting for a command */ if (PQstatus(sourceConnection->pgConn) != CONNECTION_OK || @@ -339,7 +363,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo sourceNodePort, superUser, databaseName); } - DropShardMovePublications(sourceConnection, tableOwnerIds); + DropReplicationSlots(sourceConnection, logicalRepTargetList); + DropPublications(sourceConnection, publicationInfoHash); /* We don't need to UnclaimConnections since we're already erroring out */ @@ -349,6 +374,132 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo } +/* + * CreateGroupedLogicalRepTargetsHash creates a hashmap that groups the subscriptions + * logicalRepTargetList by node. This is useful for cases where we want to + * iterate the subscriptions by node, so we can batch certain operations, such + * as checking subscription readiness. + */ +HTAB * +CreateGroupedLogicalRepTargetsHash(List *logicalRepTargetList) +{ + HTAB *logicalRepTargetsHash = CreateSimpleHash(uint32, GroupedLogicalRepTargets); + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) + { + bool found = false; + GroupedLogicalRepTargets *groupedLogicalRepTargets = + (GroupedLogicalRepTargets *) hash_search( + logicalRepTargetsHash, + &target->replicationSlot->targetNodeId, + HASH_ENTER, + &found); + if (!found) + { + groupedLogicalRepTargets->logicalRepTargetList = NIL; + groupedLogicalRepTargets->superuserConnection = NULL; + } + groupedLogicalRepTargets->logicalRepTargetList = + lappend(groupedLogicalRepTargets->logicalRepTargetList, target); + } + return logicalRepTargetsHash; +} + + +/* + * CreateShardMovePublicationInfoHash creates hashmap of PublicationInfos for a + * shard move. Even though we only support moving a shard to a single target + * node, the resulting hashmap can have multiple PublicationInfos in it. + * The reason for that is that we need a separate publication for each + * distributed table owning user in the shard group. + */ +static HTAB * +CreateShardMovePublicationInfoHash(WorkerNode *targetNode, List *shardIntervals) +{ + HTAB *publicationInfoHash = CreateSimpleHash(NodeAndOwner, PublicationInfo); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervals) + { + NodeAndOwner key; + key.nodeId = targetNode->nodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); + bool found = false; + PublicationInfo *publicationInfo = + (PublicationInfo *) hash_search(publicationInfoHash, &key, + HASH_ENTER, + &found); + if (!found) + { + publicationInfo->name = PublicationName(SHARD_MOVE, key.nodeId, + key.tableOwnerId); + publicationInfo->shardIntervals = NIL; + } + publicationInfo->shardIntervals = + lappend(publicationInfo->shardIntervals, shardInterval); + } + return publicationInfoHash; +} + + +/* + * CreateShardMoveLogicalRepTargetList creates the list containing all the + * subscriptions that should be connected to the publications in the given + * publicationHash. + */ +static List * +CreateShardMoveLogicalRepTargetList(HTAB *publicationInfoHash, List *shardList) +{ + List *logicalRepTargetList = NIL; + + HASH_SEQ_STATUS status; + hash_seq_init(&status, publicationInfoHash); + Oid nodeId = InvalidOid; + + PublicationInfo *publication = NULL; + while ((publication = (PublicationInfo *) hash_seq_search(&status)) != NULL) + { + Oid ownerId = publication->key.tableOwnerId; + nodeId = publication->key.nodeId; + LogicalRepTarget *target = palloc0(sizeof(LogicalRepTarget)); + target->subscriptionName = SubscriptionName(SHARD_MOVE, ownerId); + target->tableOwnerId = ownerId; + target->publication = publication; + publication->target = target; + target->newShards = NIL; + target->subscriptionOwnerName = SubscriptionRoleName(SHARD_MOVE, ownerId); + target->replicationSlot = palloc0(sizeof(ReplicationSlotInfo)); + target->replicationSlot->name = ReplicationSlotName(SHARD_MOVE, + nodeId, + ownerId); + target->replicationSlot->targetNodeId = nodeId; + target->replicationSlot->tableOwnerId = ownerId; + logicalRepTargetList = lappend(logicalRepTargetList, target); + } + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardList) + { + NodeAndOwner key; + key.nodeId = nodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); + + bool found = false; + publication = (PublicationInfo *) hash_search( + publicationInfoHash, + &key, + HASH_FIND, + &found); + if (!found) + { + ereport(ERROR, errmsg("Could not find publication matching a split")); + } + publication->target->newShards = lappend( + publication->target->newShards, shardInterval); + } + return logicalRepTargetList; +} + + /* * AcquireLogicalReplicationLock tries to acquire a lock for logical * replication. We need this lock, because at the start of logical replication @@ -369,26 +520,27 @@ AcquireLogicalReplicationLock(void) /* - * DropAllShardMoveLeftovers drops shard move subscriptions, publications, roles - * and replication slots on all nodes. These might have been left there after - * the coordinator crashed during a shard move. It's important to delete them - * for two reasons: + * DropAllLogicalReplicationLeftovers drops all subscriptions, publications, + * roles and replication slots on all nodes that were related to this + * LogicalRepType. These might have been left there after the coordinator + * crashed during a shard move/split. It's important to delete them for two + * reasons: * 1. Starting new shard moves will fail when they exist, because it cannot * create them. * 2. Leftover replication slots that are not consumed from anymore make it * impossible for WAL to be dropped. This can cause out-of-disk issues. */ -static void -DropAllShardMoveLeftovers(void) +void +DropAllLogicalReplicationLeftovers(LogicalRepType type) { char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); /* * We open new connections to all nodes. The reason for this is that - * operations on subscriptions and publications cannot be run in a - * transaction. By forcing a new connection we make sure no transaction is - * active on the connection. + * operations on subscriptions, publications and replication slotscannot be + * run in a transaction. By forcing a new connection we make sure no + * transaction is active on the connection. */ int connectionFlags = FORCE_NEW_CONNECTION; @@ -409,8 +561,8 @@ DropAllShardMoveLeftovers(void) superUser, databaseName); cleanupConnectionList = lappend(cleanupConnectionList, cleanupConnection); - DropAllShardMoveSubscriptions(cleanupConnection); - DropAllShardMoveUsers(cleanupConnection); + DropAllSubscriptions(cleanupConnection, type); + DropAllUsers(cleanupConnection, type); } MultiConnection *cleanupConnection = NULL; @@ -420,8 +572,8 @@ DropAllShardMoveLeftovers(void) * If replication slot could not be dropped while dropping the * subscriber, drop it here. */ - DropAllShardMoveReplicationSlots(cleanupConnection); - DropAllShardMovePublications(cleanupConnection); + DropAllReplicationSlots(cleanupConnection, type); + DropAllPublications(cleanupConnection, type); /* * We close all connections that we opened for the dropping here. That @@ -462,34 +614,33 @@ PrepareReplicationSubscriptionList(List *shardList) /* - * TableOwnerIds returns a bitmapset containing all the owners of the tables - * that the given shards belong to. + * CreateReplicaIdentities creates replica identities for all the shards that + * are part of the given subscriptions. */ -static Bitmapset * -TableOwnerIds(List *shardList) +void +CreateReplicaIdentities(List *logicalRepTargetList) { - ShardInterval *shardInterval = NULL; - Bitmapset *tableOwnerIds = NULL; - - foreach_ptr(shardInterval, shardList) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - tableOwnerIds = bms_add_member(tableOwnerIds, TableOwnerOid( - shardInterval->relationId)); + MultiConnection *superuserConnection = target->superuserConnection; + CreateReplicaIdentitiesOnNode( + target->newShards, + superuserConnection->hostname, + superuserConnection->port); } - - return tableOwnerIds; } /* - * CreateReplicaIdentity gets a shardList and creates all the replica identities - * on the shards in the given node. + * CreateReplicaIdentitiesOnNode gets a shardList and creates all the replica + * identities on the shards in the given node. */ void -CreateReplicaIdentity(List *shardList, char *nodeName, int32 nodePort) +CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, int32 nodePort) { MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, - "CreateReplicaIdentity", + "CreateReplicaIdentitiesOnNode", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(localContext); @@ -1030,14 +1181,8 @@ CreateColocatedForeignKeys(List *shardList, char *targetNodeName, int targetNode * from distributed to reference tables in the newly created shard replicas. */ static void -CreateForeignConstraintsToReferenceTable(List *shardList, - MultiConnection *targetConnection) +CreateForeignConstraintsToReferenceTable(List *logicalRepTargetList) { - ereport(DEBUG1, (errmsg("Creating post logical replication objects " - "(foreign keys to reference tables) on node " - "%s:%d", targetConnection->hostname, - targetConnection->port))); - MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "CreateForeignConstraintsToReferenceTable", @@ -1045,21 +1190,31 @@ CreateForeignConstraintsToReferenceTable(List *shardList, MemoryContext oldContext = MemoryContextSwitchTo(localContext); - ListCell *shardCell = NULL; - foreach(shardCell, shardList) + /* + * Iterate over all the shards in the shard group. + */ + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ListCell *commandCell = NULL; - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell); - List *commandList = GetForeignConstraintCommandsToReferenceTable(shardInterval); + ShardInterval *shardInterval = NULL; - /* iterate over the commands and execute them in the same connection */ - foreach(commandCell, commandList) + /* + * Iterate on split shards list for a given shard and create constraints. + */ + foreach_ptr(shardInterval, target->newShards) { - char *commandString = lfirst(commandCell); + List *commandList = GetForeignConstraintCommandsToReferenceTable( + shardInterval); - ExecuteCriticalRemoteCommand(targetConnection, commandString); + char *command = NULL; + + /* iterate over the commands and execute them in the same connection */ + foreach_ptr(command, commandList) + { + ExecuteCriticalRemoteCommand(target->superuserConnection, command); + } + MemoryContextReset(localContext); } - MemoryContextReset(localContext); } MemoryContextSwitchTo(oldContext); @@ -1096,34 +1251,44 @@ ConflictOnlyWithIsolationTesting() /* - * DropShardMovePublication drops the publication used for shard moves over the given - * connection, if it exists. It also drops the replication slot if that slot was not - * dropped while dropping the subscription. + * DropReplicationSlots drops the replication slots used for shard moves/splits + * over the given connection (if they exist). */ -static void -DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) +void +DropReplicationSlots(MultiConnection *sourceConnection, List *logicalRepTargetList) { - int ownerId = -1; - - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - /* - * If replication slot can not be dropped while dropping the subscriber, drop - * it here. - */ - DropShardReplicationSlot(connection, ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)); - DropShardPublication(connection, ShardMovePublicationName(ownerId)); + DropReplicationSlot(sourceConnection, target->replicationSlot->name); } } /* - * DropShardReplicationSlot drops the replication slot with the given name - * if it exists. + * DropPublications drops the publications used for shard moves/splits over the + * given connection (if they exist). */ void -DropShardReplicationSlot(MultiConnection *connection, char *replicationSlotName) +DropPublications(MultiConnection *sourceConnection, HTAB *publicationInfoHash) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, publicationInfoHash); + + PublicationInfo *entry = NULL; + while ((entry = (PublicationInfo *) hash_seq_search(&status)) != NULL) + { + DropPublication(sourceConnection, entry->name); + } +} + + +/* + * DropReplicationSlot drops the replication slot with the given name + * if it exists. + */ +static void +DropReplicationSlot(MultiConnection *connection, char *replicationSlotName) { ExecuteCriticalRemoteCommand( connection, @@ -1136,11 +1301,11 @@ DropShardReplicationSlot(MultiConnection *connection, char *replicationSlotName) /* - * DropShardPublication drops the publication with the given name if it + * DropPublication drops the publication with the given name if it * exists. */ -void -DropShardPublication(MultiConnection *connection, char *publicationName) +static void +DropPublication(MultiConnection *connection, char *publicationName) { ExecuteCriticalRemoteCommand(connection, psprintf( "DROP PUBLICATION IF EXISTS %s", @@ -1149,35 +1314,56 @@ DropShardPublication(MultiConnection *connection, char *publicationName) /* - * ShardMovePublicationName returns the name of the publication for the given + * PublicationName returns the name of the publication for the given node and * table owner. */ -static char * -ShardMovePublicationName(Oid ownerId) +char * +PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId) { - return psprintf("%s%i", SHARD_MOVE_PUBLICATION_PREFIX, ownerId); + return psprintf("%s%u_%u", publicationPrefix[type], nodeId, ownerId); } /* - * ShardSubscriptionName returns the name of the subscription for the given - * owner. + * ReplicationSlotName returns the name of the replication slot for the given + * node and table owner. */ char * -ShardSubscriptionName(Oid ownerId, char *operationPrefix) +ReplicationSlotName(LogicalRepType type, uint32_t nodeId, Oid ownerId) { - return psprintf("%s%i", operationPrefix, ownerId); + StringInfo slotName = makeStringInfo(); + appendStringInfo(slotName, "%s%u_%u", replicationSlotPrefix[type], nodeId, + ownerId); + + if (slotName->len > NAMEDATALEN) + { + ereport(ERROR, + (errmsg( + "Replication Slot name:%s having length:%d is greater than maximum allowed length:%d", + slotName->data, slotName->len, NAMEDATALEN))); + } + return slotName->data; } /* - * ShardSubscriptionRole returns the name of the role used by the + * SubscriptionName returns the name of the subscription for the given owner. + */ +char * +SubscriptionName(LogicalRepType type, Oid ownerId) +{ + return psprintf("%s%i", subscriptionPrefix[type], ownerId); +} + + +/* + * SubscriptionRoleName returns the name of the role used by the * subscription that subscribes to the tables of the given owner. */ char * -ShardSubscriptionRole(Oid ownerId, char *operationPrefix) +SubscriptionRoleName(LogicalRepType type, Oid ownerId) { - return psprintf("%s%i", operationPrefix, ownerId); + return psprintf("%s%i", subscriptionRolePrefix[type], ownerId); } @@ -1232,177 +1418,172 @@ GetQueryResultStringList(MultiConnection *connection, char *query) /* - * DropAllShardMoveSubscriptions drops all the existing subscriptions that + * DropAllSubscriptions drops all the existing subscriptions that * match our shard move naming scheme on the node that the connection points * to. */ static void -DropAllShardMoveSubscriptions(MultiConnection *connection) +DropAllSubscriptions(MultiConnection *connection, LogicalRepType type) { char *query = psprintf( "SELECT subname FROM pg_subscription " "WHERE subname LIKE %s || '%%'", - quote_literal_cstr(SHARD_MOVE_SUBSCRIPTION_PREFIX)); + quote_literal_cstr(subscriptionPrefix[type])); List *subscriptionNameList = GetQueryResultStringList(connection, query); char *subscriptionName; foreach_ptr(subscriptionName, subscriptionNameList) { - DropShardSubscription(connection, subscriptionName); + DropSubscription(connection, subscriptionName); } } /* - * DropAllShardMoveUsers drops all the users that match our shard move naming + * DropAllUsers drops all the users that match our shard move naming * scheme for temporary shard move users on the node that the connection points * to. */ static void -DropAllShardMoveUsers(MultiConnection *connection) +DropAllUsers(MultiConnection *connection, LogicalRepType type) { char *query = psprintf( "SELECT rolname FROM pg_roles " "WHERE rolname LIKE %s || '%%'", - quote_literal_cstr(SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); + quote_literal_cstr(subscriptionRolePrefix[type])); List *usernameList = GetQueryResultStringList(connection, query); char *username; foreach_ptr(username, usernameList) { - DropShardUser(connection, username); + DropUser(connection, username); } } /* - * DropAllShardMoveReplicationSlots drops all the existing replication slots + * DropAllReplicationSlots drops all the existing replication slots * that match our shard move naming scheme on the node that the connection * points to. */ static void -DropAllShardMoveReplicationSlots(MultiConnection *connection) +DropAllReplicationSlots(MultiConnection *connection, LogicalRepType type) { char *query = psprintf( "SELECT slot_name FROM pg_replication_slots " "WHERE slot_name LIKE %s || '%%'", - quote_literal_cstr(SHARD_MOVE_SUBSCRIPTION_PREFIX)); + quote_literal_cstr(replicationSlotPrefix[type])); List *slotNameList = GetQueryResultStringList(connection, query); char *slotName; foreach_ptr(slotName, slotNameList) { - DropShardReplicationSlot(connection, slotName); + DropReplicationSlot(connection, slotName); } } /* - * DropAllShardMovePublications drops all the existing publications that + * DropAllPublications drops all the existing publications that * match our shard move naming scheme on the node that the connection points * to. */ static void -DropAllShardMovePublications(MultiConnection *connection) +DropAllPublications(MultiConnection *connection, LogicalRepType type) { char *query = psprintf( "SELECT pubname FROM pg_publication " "WHERE pubname LIKE %s || '%%'", - quote_literal_cstr(SHARD_MOVE_PUBLICATION_PREFIX)); + quote_literal_cstr(publicationPrefix[type])); List *publicationNameList = GetQueryResultStringList(connection, query); char *publicationName; foreach_ptr(publicationName, publicationNameList) { - DropShardPublication(connection, publicationName); + DropPublication(connection, publicationName); } } /* - * DropShardMoveSubscriptions drops subscriptions from the subscriber node that - * are used to move shards for the given table owners. Note that, it drops the - * replication slots on the publisher node if it can drop the slots as well - * with the DROP SUBSCRIPTION command. Otherwise, only the subscriptions will - * be deleted with DROP SUBSCRIPTION via the connection. In the latter case, - * replication slots will be dropped while cleaning the publisher node when - * calling DropShardMovePublications. - */ -static void -DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds) -{ - int ownerId = -1; - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) - { - DropShardSubscription(connection, ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)); - DropShardUser(connection, ShardSubscriptionRole(ownerId, - SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); - } -} - - -/* - * DropShardSubscription drops subscription with the given name on the - * subscriber node. Note that, it also drops the replication slot on the - * publisher node if it can drop the slot as well with the DROP SUBSCRIPTION - * command. Otherwise, only the subscription will be deleted with DROP - * SUBSCRIPTION via the connection. + * DropSubscriptions drops all the subscriptions in the logicalRepTargetList + * from the subscriber node. It also drops the temporary users that are used as + * owners for of the subscription. This doesn't drop the replication slots on + * the publisher, these should be dropped using DropReplicationSlots. */ void -DropShardSubscription(MultiConnection *connection, char *subscriptionName) +DropSubscriptions(List *logicalRepTargetList) { - PGresult *result = NULL; + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) + { + DropSubscription(target->superuserConnection, + target->subscriptionName); + DropUser(target->superuserConnection, target->subscriptionOwnerName); + } +} - /* - * Instead of ExecuteCriticalRemoteCommand, we use the - * ExecuteOptionalRemoteCommand to fall back into the logic inside the - * if block below in case of any error while sending the command. - */ - int dropCommandResult = ExecuteOptionalRemoteCommand( + +/* + * DropSubscription drops subscription with the given name on the subscriber + * node if it exists. Note that this doesn't drop the replication slot on the + * publisher node. The reason is that sometimes this is not possible. To known + * cases where this is not possible are: + * 1. Due to the node with the replication slot being down. + * 2. Due to a deadlock when the replication is on the same node as the + * subscription, which is the case for shard splits to the local node. + * + * So instead of directly dropping the subscription, including the attached + * replication slot, the subscription is first disconnected from the + * replication slot before dropping it. The replication slot itself should be + * dropped using DropReplicationSlot on the source connection. + */ +static void +DropSubscription(MultiConnection *connection, char *subscriptionName) +{ + int querySent = SendRemoteCommand( connection, - psprintf( - "DROP SUBSCRIPTION IF EXISTS %s", - quote_identifier(subscriptionName)), - &result); - - if (PQstatus(connection->pgConn) != CONNECTION_OK) + psprintf("ALTER SUBSCRIPTION %s DISABLE", quote_identifier(subscriptionName))); + if (querySent == 0) { ReportConnectionError(connection, ERROR); } + bool raiseInterrupts = true; + PGresult *result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + char *errorcode = PQresultErrorField(result, PG_DIAG_SQLSTATE); + if (errorcode != NULL && strcmp(errorcode, STR_ERRCODE_UNDEFINED_OBJECT) == 0) + { + /* + * The subscription doesn't exist, so we can return right away. + * This DropSubscription call is effectively a no-op. + */ + return; + } + else + { + ReportResultError(connection, result, ERROR); + PQclear(result); + ForgetResults(connection); + } + } + PQclear(result); ForgetResults(connection); - /* - * If we can not drop the replication slot using the DROP SUBSCRIPTION command - * then we need to alter the subscription to drop the subscriber only and drop - * the replication slot separately. - */ - if (dropCommandResult != 0) - { - StringInfo alterSubscriptionSlotCommand = makeStringInfo(); - StringInfo alterSubscriptionDisableCommand = makeStringInfo(); + ExecuteCriticalRemoteCommand(connection, psprintf( + "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", + quote_identifier(subscriptionName))); - appendStringInfo(alterSubscriptionDisableCommand, - "ALTER SUBSCRIPTION %s DISABLE", - quote_identifier(subscriptionName)); - ExecuteCriticalRemoteCommand(connection, - alterSubscriptionDisableCommand->data); - - appendStringInfo(alterSubscriptionSlotCommand, - "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", - quote_identifier(subscriptionName)); - ExecuteCriticalRemoteCommand(connection, alterSubscriptionSlotCommand->data); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "DROP SUBSCRIPTION %s", - quote_identifier(subscriptionName))); - } + ExecuteCriticalRemoteCommand(connection, psprintf( + "DROP SUBSCRIPTION %s", + quote_identifier(subscriptionName))); } /* - * DropShardUser drops the user with the given name if it exists. + * DropUser drops the user with the given name if it exists. */ -void -DropShardUser(MultiConnection *connection, char *username) +static void +DropUser(MultiConnection *connection, char *username) { /* * The DROP USER command should not propagate, so we temporarily disable @@ -1418,33 +1599,27 @@ DropShardUser(MultiConnection *connection, char *username) /* - * CreateShardMovePublications creates a set of publications for moving a list - * of shards over the given connection. One publication is created for each of - * the table owners in tableOwnerIds. Each of those publications only contains - * shards that the respective table owner owns. + * CreatePublications creates a the publications defined in the + * publicationInfoHash over the given connection. */ -static void -CreateShardMovePublications(MultiConnection *connection, List *shardList, - Bitmapset *tableOwnerIds) +void +CreatePublications(MultiConnection *connection, + HTAB *publicationInfoHash) { - int ownerId = -1; - - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + HASH_SEQ_STATUS status; + hash_seq_init(&status, publicationInfoHash); + PublicationInfo *entry = NULL; + while ((entry = (PublicationInfo *) hash_seq_search(&status)) != NULL) { StringInfo createPublicationCommand = makeStringInfo(); bool prefixWithComma = false; appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", - ShardMovePublicationName(ownerId)); + entry->name); ShardInterval *shard = NULL; - foreach_ptr(shard, shardList) + foreach_ptr(shard, entry->shardIntervals) { - if (TableOwnerOid(shard->relationId) != ownerId) - { - continue; - } - char *shardName = ConstructQualifiedShardName(shard); if (prefixWithComma) @@ -1468,7 +1643,7 @@ CreateShardMovePublications(MultiConnection *connection, List *shardList, * This connection can be used to send replication commands, such as * CREATE_REPLICATION_SLOT. */ -static MultiConnection * +MultiConnection * GetReplicationConnection(char *nodeName, int nodePort) { int connectionFlags = FORCE_NEW_CONNECTION; @@ -1480,6 +1655,11 @@ GetReplicationConnection(char *nodeName, int nodePort) nodePort, CitusExtensionOwnerName(), get_database_name(MyDatabaseId)); + + /* + * Replication connections are special and don't support all of SQL, so we + * don't want it to be used for other purposes what we create it for. + */ ClaimConnectionExclusively(connection); return connection; } @@ -1494,12 +1674,12 @@ GetReplicationConnection(char *nodeName, int nodePort) * the snapshot name. */ static char * -CreateReplicationSlot(MultiConnection *connection, char *slotname) +CreateReplicationSlot(MultiConnection *connection, char *slotname, char *outputPlugin) { StringInfo createReplicationSlotCommand = makeStringInfo(); appendStringInfo(createReplicationSlotCommand, - "CREATE_REPLICATION_SLOT %s LOGICAL pgoutput EXPORT_SNAPSHOT;", - quote_identifier(slotname)); + "CREATE_REPLICATION_SLOT %s LOGICAL %s EXPORT_SNAPSHOT;", + quote_identifier(slotname), quote_identifier(outputPlugin)); PGresult *result = NULL; int response = ExecuteOptionalRemoteCommand(connection, @@ -1521,36 +1701,72 @@ CreateReplicationSlot(MultiConnection *connection, char *slotname) /* - * CreateShardMoveSubscriptions creates the subscriptions used for shard moves - * over the given connection. One subscription is created for each of the table - * owners in tableOwnerIds. The remote node needs to have appropriate - * pg_dist_authinfo rows for the user such that the apply process can connect. - * Because the generated CREATE SUBSCRIPTION statements uses the host and port - * names directly (rather than looking up any relevant pg_dist_poolinfo rows), - * all such connections remain direct and will not route through any configured - * poolers. - * - * The subscriptions created by this function are created in the disabled - * state. This is done so a data copy can be done manually afterwards. To - * enable the subscriptions you can use EnableShardMoveSubscriptions(). + * CreateReplicationSlots creates the replication slots that the subscriptions + * in the logicalRepTargetList can use. * * This function returns the snapshot name of the replication slots that are * used by the subscription. When using this snapshot name for other * transactions you need to keep the given replication connection open until - * you have used the snapshot name. + * you are finished using the snapshot. */ -static char * -CreateShardMoveSubscriptions(MultiConnection *sourceConnection, - MultiConnection *targetConnection, - MultiConnection *sourceReplicationConnection, - char *databaseName, - Bitmapset *tableOwnerIds) +char * +CreateReplicationSlots(MultiConnection *sourceConnection, + MultiConnection *sourceReplicationConnection, + List *logicalRepTargetList, + char *outputPlugin) { - int ownerId = -1; - char *firstReplicationSlot = NULL; + ReplicationSlotInfo *firstReplicationSlot = NULL; char *snapshot = NULL; - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { + ReplicationSlotInfo *replicationSlot = target->replicationSlot; + + if (!firstReplicationSlot) + { + firstReplicationSlot = replicationSlot; + snapshot = CreateReplicationSlot( + sourceReplicationConnection, + replicationSlot->name, + outputPlugin + ); + } + else + { + ExecuteCriticalRemoteCommand( + sourceConnection, + psprintf("SELECT pg_catalog.pg_copy_logical_replication_slot(%s, %s)", + quote_literal_cstr(firstReplicationSlot->name), + quote_literal_cstr(replicationSlot->name))); + } + } + return snapshot; +} + + +/* + * CreateSubscriptions creates the subscriptions according to their definition + * in the logicalRepTargetList. The remote node(s) needs to have appropriate + * pg_dist_authinfo rows for the superuser such that the apply process can + * connect. Because the generated CREATE SUBSCRIPTION statements use the host + * and port names directly (rather than looking up any relevant + * pg_dist_poolinfo rows), all such connections remain direct and will not + * route through any configured poolers. + * + * The subscriptions created by this function are created in the disabled + * state. This is done so a data copy can be done manually afterwards. To + * enable the subscriptions you can use EnableSubscriptions(). + */ +void +CreateSubscriptions(MultiConnection *sourceConnection, + char *databaseName, + List *logicalRepTargetList) +{ + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) + { + int ownerId = target->tableOwnerId; + /* * The CREATE USER command should not propagate, so we temporarily * disable DDL propagation. @@ -1560,35 +1776,17 @@ CreateShardMoveSubscriptions(MultiConnection *sourceConnection, * This prevents permission escalations. */ SendCommandListToWorkerOutsideTransaction( - targetConnection->hostname, - targetConnection->port, - targetConnection->user, + target->superuserConnection->hostname, + target->superuserConnection->port, + target->superuserConnection->user, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "CREATE USER %s SUPERUSER IN ROLE %s", - ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX), + target->subscriptionOwnerName, GetUserNameFromId(ownerId, false) ))); - if (!firstReplicationSlot) - { - firstReplicationSlot = ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX); - snapshot = CreateReplicationSlot( - sourceReplicationConnection, - firstReplicationSlot); - } - else - { - ExecuteCriticalRemoteCommand( - sourceConnection, - psprintf("SELECT pg_catalog.pg_copy_logical_replication_slot(%s, %s)", - quote_literal_cstr(firstReplicationSlot), - quote_literal_cstr(ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)))); - } - StringInfo conninfo = makeStringInfo(); appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " "connect_timeout=20", @@ -1600,21 +1798,21 @@ CreateShardMoveSubscriptions(MultiConnection *sourceConnection, StringInfo createSubscriptionCommand = makeStringInfo(); appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " - "WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false)", - quote_identifier(ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)), + "WITH (citus_use_authinfo=true, create_slot=false, " + " copy_data=false, enabled=false, slot_name=%s)", + quote_identifier(target->subscriptionName), quote_literal_cstr(conninfo->data), - quote_identifier(ShardMovePublicationName(ownerId))); + quote_identifier(target->publication->name), + quote_identifier(target->replicationSlot->name)); - ExecuteCriticalRemoteCommand(targetConnection, createSubscriptionCommand->data); + ExecuteCriticalRemoteCommand(target->superuserConnection, + createSubscriptionCommand->data); pfree(createSubscriptionCommand->data); pfree(createSubscriptionCommand); - ExecuteCriticalRemoteCommand(targetConnection, psprintf( + ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf( "ALTER SUBSCRIPTION %s OWNER TO %s", - ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX), - ShardSubscriptionRole(ownerId, - SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) + target->subscriptionName, + target->subscriptionOwnerName )); /* @@ -1622,34 +1820,33 @@ CreateShardMoveSubscriptions(MultiConnection *sourceConnection, * disable DDL propagation. */ SendCommandListToWorkerOutsideTransaction( - targetConnection->hostname, - targetConnection->port, - targetConnection->user, + target->superuserConnection->hostname, + target->superuserConnection->port, + target->superuserConnection->user, list_make2( "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "ALTER ROLE %s NOSUPERUSER", - ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) + target->subscriptionOwnerName ))); } - return snapshot; } /* - * EnableShardMoveSubscriptions enables all the the shard move subscriptions - * that belong to the given table owners. + * EnableSubscriptions enables all the the subscriptions in the + * logicalRepTargetList. This means the replication slot will start to be read + * and the catchup phase begins. */ -static void -EnableShardMoveSubscriptions(MultiConnection *targetConnection, Bitmapset *tableOwnerIds) +void +EnableSubscriptions(List *logicalRepTargetList) { - int ownerId = -1; - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { - ExecuteCriticalRemoteCommand(targetConnection, psprintf( + ExecuteCriticalRemoteCommand(target->superuserConnection, psprintf( "ALTER SUBSCRIPTION %s ENABLE", - ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX) + target->subscriptionName )); } } @@ -1746,6 +1943,137 @@ GetRemoteLSN(MultiConnection *connection, char *command) } +/* + * CreateGroupedLogicalRepTargetsConnections creates connections for all of the nodes + * in the groupedLogicalRepTargetsHash. + */ +void +CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, + char *user, + char *databaseName) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + HASH_SEQ_STATUS status; + GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; + foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) + { + WorkerNode *targetWorkerNode = FindNodeWithNodeId( + groupedLogicalRepTargets->nodeId, + false); + MultiConnection *superuserConnection = + GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, + targetWorkerNode->workerPort, + user, + databaseName); + + /* + * Operations on subscriptions cannot run in a transaction block. We + * claim the connections exclusively to ensure they do not get used for + * metadata syncing, which does open a transaction block. + */ + ClaimConnectionExclusively(superuserConnection); + + groupedLogicalRepTargets->superuserConnection = superuserConnection; + + LogicalRepTarget *target = NULL; + foreach_ptr(target, groupedLogicalRepTargets->logicalRepTargetList) + { + target->superuserConnection = superuserConnection; + } + } +} + + +/* + * RecreateGroupedLogicalRepTargetsConnections recreates connections for all of the + * nodes in the groupedLogicalRepTargetsHash where the old connection is broken or + * currently running a query. + */ +void +RecreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, + char *user, + char *databaseName) +{ + int connectionFlags = FORCE_NEW_CONNECTION; + HASH_SEQ_STATUS status; + GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; + foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) + { + if (groupedLogicalRepTargets->superuserConnection && + PQstatus(groupedLogicalRepTargets->superuserConnection->pgConn) == + CONNECTION_OK && + !PQisBusy(groupedLogicalRepTargets->superuserConnection->pgConn) + ) + { + continue; + } + WorkerNode *targetWorkerNode = FindNodeWithNodeId( + groupedLogicalRepTargets->nodeId, + false); + MultiConnection *superuserConnection = + GetNodeUserDatabaseConnection(connectionFlags, + targetWorkerNode->workerName, + targetWorkerNode->workerPort, + user, + databaseName); + + /* + * Operations on subscriptions cannot run in a transaction block. We + * claim the connections exclusively to ensure they do not get used for + * metadata syncing, which does open a transaction block. + */ + ClaimConnectionExclusively(superuserConnection); + + groupedLogicalRepTargets->superuserConnection = superuserConnection; + + LogicalRepTarget *target = NULL; + foreach_ptr(target, groupedLogicalRepTargets->logicalRepTargetList) + { + target->superuserConnection = superuserConnection; + } + } +} + + +/* + * CreateGroupedLogicalRepTargetsConnections closes the connections for all of the + * nodes in the groupedLogicalRepTargetsHash. + */ +void +CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash) +{ + HASH_SEQ_STATUS status; + GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; + foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) + { + CloseConnection(groupedLogicalRepTargets->superuserConnection); + } +} + + +/* + * WaitForRelationSubscriptionsBecomeReady waits until the states of the + * subsriptions in the groupedLogicalRepTargetsHash becomes ready. This should happen + * very quickly, because we don't use the COPY logic from the subscriptions. So + * all that's needed is to start reading from the replication slot. + * + * The function errors if the subscriptions on one of the nodes don't become + * ready within LogicalReplicationErrorTimeout. The function also reports its + * progress every logicalReplicationProgressReportTimeout. + */ +void +WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash) +{ + HASH_SEQ_STATUS status; + GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; + foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) + { + WaitForGroupedLogicalRepTargetsToBecomeReady(groupedLogicalRepTargets); + } + elog(LOG, "The states of all subscriptions have become READY"); +} + + /* * WaitForRelationSubscriptionsBecomeReady waits until the states of the * subsriptions for each shard becomes ready. This should happen very quickly, @@ -1756,13 +2084,14 @@ GetRemoteLSN(MultiConnection *connection, char *command) * LogicalReplicationErrorTimeout. The function also reports its progress in * every logicalReplicationProgressReportTimeout. */ -void -WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char *operationPrefix) +static void +WaitForGroupedLogicalRepTargetsToBecomeReady( + GroupedLogicalRepTargets *groupedLogicalRepTargets) { TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); TimestampTz previousReportTime = GetCurrentTimestamp(); + MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection; /* * We might be in the loop for a while. Since we don't need to preserve @@ -1777,18 +2106,16 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, ALLOCSET_DEFAULT_MAXSIZE); MemoryContext oldContext = MemoryContextSwitchTo(loopContext); - while (true) { /* we're done, all relations are ready */ - if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds, - operationPrefix)) + if (RelationSubscriptionsAreReady(groupedLogicalRepTargets)) { ereport(LOG, (errmsg("The states of the relations belonging to the " "subscriptions became READY on the " "target node %s:%d", - targetConnection->hostname, - targetConnection->port))); + superuserConnection->hostname, + superuserConnection->port))); break; } @@ -1798,8 +2125,10 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, GetCurrentTimestamp(), logicalReplicationProgressReportTimeout)) { - ereport(LOG, (errmsg("Not all subscriptions for the shard move are " - "READY yet"))); + ereport(LOG, (errmsg("Not all subscriptions on target node %s:%d " + "are READY yet", + superuserConnection->hostname, + superuserConnection->port))); previousReportTime = GetCurrentTimestamp(); } @@ -1814,8 +2143,8 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, LogicalReplicationTimeout), errdetail("The subscribed relations haven't become " "ready on the target node %s:%d", - targetConnection->hostname, - targetConnection->port), + superuserConnection->hostname, + superuserConnection->port), errhint( "Logical replication has failed to initialize " "on the target node. If not, consider using " @@ -1825,7 +2154,6 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, /* wait for 1 second (1000 miliseconds) and try again */ WaitForMiliseconds(1000); - MemoryContextReset(loopContext); } @@ -1834,19 +2162,19 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, /* - * ShardSubscriptionNamesValueList returns a SQL value list containing the - * subscription names for all of the given table owner ids. This value list can + * SubscriptionNamesValueList returns a SQL value list containing the + * subscription names from the logicalRepTargetList. This value list can * be used in a query by using the IN operator. */ static char * -ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix) +SubscriptionNamesValueList(List *logicalRepTargetList) { StringInfo subscriptionValueList = makeStringInfo(); appendStringInfoString(subscriptionValueList, "("); - int ownerId = -1; bool first = true; - while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) + LogicalRepTarget *target = NULL; + foreach_ptr(target, logicalRepTargetList) { if (!first) { @@ -1856,9 +2184,8 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix) { first = false; } - appendStringInfoString(subscriptionValueList, - quote_literal_cstr(ShardSubscriptionName(ownerId, - operationPrefix))); + appendStringInfoString(subscriptionValueList, quote_literal_cstr( + target->subscriptionName)); } appendStringInfoString(subscriptionValueList, ")"); return subscriptionValueList->data; @@ -1867,30 +2194,31 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix) /* * RelationSubscriptionsAreReady gets the subscription status for each - * shard and returns false if at least one of them is not ready. + * subscriptions and returns false if at least one of them is not ready. */ static bool -RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char *operationPrefix) +RelationSubscriptionsAreReady(GroupedLogicalRepTargets *groupedLogicalRepTargets) { bool raiseInterrupts = false; - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, - operationPrefix); + List *logicalRepTargetList = groupedLogicalRepTargets->logicalRepTargetList; + MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection; + + char *subscriptionValueList = SubscriptionNamesValueList(logicalRepTargetList); char *query = psprintf( "SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription " "WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s", subscriptionValueList); - int querySent = SendRemoteCommand(targetConnection, query); + int querySent = SendRemoteCommand(superuserConnection, query); if (querySent == 0) { - ReportConnectionError(targetConnection, ERROR); + ReportConnectionError(superuserConnection, ERROR); } - PGresult *result = GetRemoteCommandResult(targetConnection, raiseInterrupts); + PGresult *result = GetRemoteCommandResult(superuserConnection, raiseInterrupts); if (!IsResponseOK(result)) { - ReportResultError(targetConnection, result, ERROR); + ReportResultError(superuserConnection, result, ERROR); } int rowCount = PQntuples(result); @@ -1912,7 +2240,7 @@ RelationSubscriptionsAreReady(MultiConnection *targetConnection, char *resultString = pstrdup(PQgetvalue(result, rowIndex, columnIndex)); PQclear(result); - ForgetResults(targetConnection); + ForgetResults(superuserConnection); int64 resultInt = SafeStringToInt64(resultString); @@ -1921,21 +2249,47 @@ RelationSubscriptionsAreReady(MultiConnection *targetConnection, /* - * WaitForShardSubscriptionToCatchUp waits until the last LSN reported by the subscription. + * WaitForAllSubscriptionToCatchUp waits until the last LSN reported by the + * subscription. * - * The function errors if the target LSN doesn't increase within LogicalReplicationErrorTimeout. - * The function also reports its progress in every logicalReplicationProgressReportTimeout. + * The function errors if the target LSN doesn't increase within + * LogicalReplicationErrorTimeout. The function also reports its progress in + * every logicalReplicationProgressReportTimeout. */ void -WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr - sourcePosition, - Bitmapset *tableOwnerIds, char *operationPrefix) +WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection, + HTAB *groupedLogicalRepTargetsHash) +{ + XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); + HASH_SEQ_STATUS status; + GroupedLogicalRepTargets *groupedLogicalRepTargets = NULL; + foreach_htab(groupedLogicalRepTargets, &status, groupedLogicalRepTargetsHash) + { + WaitForGroupedLogicalRepTargetsToCatchUp(sourcePosition, + groupedLogicalRepTargets); + } +} + + +/* + * WaitForNodeSubscriptionToCatchUp waits until the last LSN reported by the + * subscription. + * + * The function errors if the target LSN doesn't increase within + * LogicalReplicationErrorTimeout. The function also reports its progress in + * every logicalReplicationProgressReportTimeout. + */ +static void +WaitForGroupedLogicalRepTargetsToCatchUp(XLogRecPtr sourcePosition, + GroupedLogicalRepTargets * + groupedLogicalRepTargets) { XLogRecPtr previousTargetPosition = 0; TimestampTz previousLSNIncrementTime = GetCurrentTimestamp(); /* report in the first iteration as well */ TimestampTz previousReportTime = 0; + MultiConnection *superuserConnection = groupedLogicalRepTargets->superuserConnection; /* @@ -1954,16 +2308,14 @@ WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr while (true) { - XLogRecPtr targetPosition = GetSubscriptionPosition(targetConnection, - tableOwnerIds, - operationPrefix); + XLogRecPtr targetPosition = GetSubscriptionPosition(groupedLogicalRepTargets); if (targetPosition >= sourcePosition) { ereport(LOG, (errmsg( "The LSN of the target subscriptions on node %s:%d have " "caught up with the source LSN ", - targetConnection->hostname, - targetConnection->port))); + superuserConnection->hostname, + superuserConnection->port))); break; } @@ -1989,8 +2341,8 @@ WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr ereport(LOG, (errmsg( "The LSN of the target subscriptions on node %s:%d have " "increased from %ld to %ld at %s where the source LSN is %ld ", - targetConnection->hostname, - targetConnection->port, previousTargetBeforeThisLoop, + superuserConnection->hostname, + superuserConnection->port, previousTargetBeforeThisLoop, targetPosition, timestamptz_to_str(previousLSNIncrementTime), sourcePosition))); @@ -2009,8 +2361,8 @@ WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr LogicalReplicationTimeout), errdetail("The LSN on the target subscription hasn't " "caught up ready on the target node %s:%d", - targetConnection->hostname, - targetConnection->port), + superuserConnection->hostname, + superuserConnection->port), errhint( "There might have occurred problems on the target " "node. If not consider using higher values for " @@ -2061,91 +2413,16 @@ WaitForMiliseconds(long timeout) /* - * GetSubscriptionPosition gets the current WAL log position of the subscription, that - * is the WAL log position on the source node up to which the subscription completed - * replication. + * GetSubscriptionPosition gets the minimum WAL log position of the + * subscription given subscriptions: That is the WAL log position on the source + * node up to which the subscription completed replication. */ static XLogRecPtr -GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, - char *operationPrefix) +GetSubscriptionPosition(GroupedLogicalRepTargets *groupedLogicalRepTargets) { - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, - operationPrefix); - return GetRemoteLSN(connection, psprintf( + char *subscriptionValueList = SubscriptionNamesValueList( + groupedLogicalRepTargets->logicalRepTargetList); + return GetRemoteLSN(groupedLogicalRepTargets->superuserConnection, psprintf( "SELECT min(latest_end_lsn) FROM pg_stat_subscription " "WHERE subname IN %s", subscriptionValueList)); } - - -/* - * CreateShardSplitSubscription creates the subscriptions used for shard split - * over the given connection. The subscription is created with 'copy_data' - * set to false and with the given replication slot name. - */ -void -CreateShardSplitSubscription(MultiConnection *connection, char *sourceNodeName, - int sourceNodePort, char *userName, char *databaseName, - char *publicationName, char *slotName, - Oid ownerId) -{ - StringInfo createSubscriptionCommand = makeStringInfo(); - StringInfo conninfo = makeStringInfo(); - - /* - * The CREATE USER command should not propagate, so we temporarily - * disable DDL propagation. - */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, - list_make2( - "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf( - "CREATE USER %s SUPERUSER IN ROLE %s", - ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX), - quote_identifier(GetUserNameFromId(ownerId, false)) - ))); - - appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " - "connect_timeout=20", - escape_param_str(sourceNodeName), sourceNodePort, - escape_param_str(userName), escape_param_str(databaseName)); - - appendStringInfo(createSubscriptionCommand, - "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " - "WITH (citus_use_authinfo=true, enabled=false, create_slot=false, copy_data=false, slot_name='%s')", - quote_identifier(ShardSubscriptionName(ownerId, - SHARD_SPLIT_SUBSCRIPTION_PREFIX)), - quote_literal_cstr(conninfo->data), - quote_identifier(publicationName), - escape_param_str(slotName)); - - ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); - pfree(createSubscriptionCommand->data); - pfree(createSubscriptionCommand); - ExecuteCriticalRemoteCommand(connection, psprintf( - "ALTER SUBSCRIPTION %s OWNER TO %s", - ShardSubscriptionName(ownerId, - SHARD_SPLIT_SUBSCRIPTION_PREFIX), - ShardSubscriptionRole(ownerId, - SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) - )); - - /* - * The ALTER ROLE command should not propagate, so we temporarily - * disable DDL propagation. - */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, - list_make2( - "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf( - "ALTER ROLE %s NOSUPERUSER", - ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) - ))); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "ALTER SUBSCRIPTION %s ENABLE", - ShardSubscriptionName(ownerId, - SHARD_SPLIT_SUBSCRIPTION_PREFIX) - )); -} diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index ef3034c03..e9b703e11 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -13,6 +13,7 @@ #include "miscadmin.h" #include "nodes/pg_list.h" #include "distributed/colocation_utils.h" +#include "distributed/hash_helpers.h" #include "distributed/metadata_cache.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/shardinterval_utils.h" @@ -33,19 +34,9 @@ static HTAB *ShardInfoHashMapForPublications = NULL; static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval); -ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 - nodeId, - List * - replicationSlotInfoList); -static void CreateShardSplitPublicationForNode(MultiConnection *connection, - List *shardList, - uint32_t publicationForTargetNodeId, Oid - tableOwner); -static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); -static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); -static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); -static void DropAllShardSplitUsers(MultiConnection *cleanupConnection); -static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection); +static LogicalRepTarget * CreateLogicalRepTarget(Oid tableOwnerId, + uint32 nodeId, + List *replicationSlotInfoList); /* * CreateShardSplitInfoMapForPublication creates a hashmap that groups @@ -74,7 +65,7 @@ CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) { - ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); + ShardInfoHashMapForPublications = CreateSimpleHash(NodeAndOwner, PublicationInfo); ShardInterval *sourceShardIntervalToCopy = NULL; List *splitChildShardIntervalList = NULL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, @@ -129,28 +120,29 @@ static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval) { - NodeShardMappingKey key; + NodeAndOwner key; key.nodeId = targetNodeId; key.tableOwnerId = TableOwnerOid(shardInterval->relationId); bool found = false; - NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, - HASH_ENTER, - &found); + PublicationInfo *publicationInfo = + (PublicationInfo *) hash_search(ShardInfoHashMapForPublications, &key, + HASH_ENTER, + &found); /* Create a new list for pair */ if (!found) { - nodeMappingEntry->shardSplitInfoList = NIL; + publicationInfo->shardIntervals = NIL; + publicationInfo->name = PublicationName(SHARD_SPLIT, key.nodeId, + key.tableOwnerId); } /* Add child shard interval */ if (isChildShardInterval) { - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, - (ShardInterval *) shardInterval); + publicationInfo->shardIntervals = + lappend(publicationInfo->shardIntervals, shardInterval); /* We return from here as the child interval is only added once in the list */ return; @@ -158,7 +150,7 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, /* Check if parent is already added */ ShardInterval *existingShardInterval = NULL; - foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) + foreach_ptr(existingShardInterval, publicationInfo->shardIntervals) { if (existingShardInterval->shardId == shardInterval->shardId) { @@ -168,127 +160,13 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, } /* Add parent shard Interval */ - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + publicationInfo->shardIntervals = + lappend(publicationInfo->shardIntervals, shardInterval); } /* - * CreateShardSplitPublications creates publications on the source node. - * - * sourceConnection - Connection of source node. - * - * shardInfoHashMapForPublication - ShardIntervals are grouped by key. - * A publication is created for list of - * ShardIntervals mapped by key. - */ -void -CreateShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, shardInfoHashMapForPublication); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - uint32 tableOwnerId = entry->key.tableOwnerId; - List *shardListForPublication = entry->shardSplitInfoList; - - /* Create publication on shard list */ - CreateShardSplitPublicationForNode(sourceConnection, - shardListForPublication, - nodeId, - tableOwnerId); - } -} - - -/* - * CreateShardSplitPublicationForNode creates a publication on source node - * for given shard list. - * We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications - * related to split operations. - */ -static void -CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, - uint32_t publicationForTargetNodeId, Oid ownerId) -{ - StringInfo createPublicationCommand = makeStringInfo(); - bool prefixWithComma = false; - - appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", - ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); - - ShardInterval *shard = NULL; - foreach_ptr(shard, shardList) - { - char *shardName = ConstructQualifiedShardName(shard); - - if (prefixWithComma) - { - appendStringInfoString(createPublicationCommand, ","); - } - - appendStringInfoString(createPublicationCommand, shardName); - prefixWithComma = true; - } - - ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); - pfree(createPublicationCommand->data); - pfree(createPublicationCommand); -} - - -/* - * ShardSplitPublicationName returns publication name for Shard Split operations. - */ -static char * -ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) -{ - return psprintf("%s%u_%u", SHARD_SPLIT_PUBLICATION_PREFIX, nodeId, ownerId); -} - - -/* - * CreateTargetNodeConnectionsForShardSplit creates connections on target nodes. - * These connections are used for subscription managment. They are closed - * at the end of non-blocking split workflow. - */ -List * -CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int - connectionFlags, char *user, char *databaseName) -{ - List *targetNodeConnectionList = NIL; - ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = NULL; - foreach_ptr(shardSplitSubscriberMetadata, shardSplitSubscribersMetadataList) - { - /* slotinfo is expected to be already populated */ - Assert(shardSplitSubscriberMetadata->slotInfo != NULL); - - uint32 targetWorkerNodeId = shardSplitSubscriberMetadata->slotInfo->targetNodeId; - WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); - - MultiConnection *targetConnection = - GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, - targetWorkerNode->workerPort, - user, - databaseName); - ClaimConnectionExclusively(targetConnection); - - targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); - - /* Cache the connections for each subscription */ - shardSplitSubscriberMetadata->targetNodeConnection = targetConnection; - } - - return targetNodeConnectionList; -} - - -/* - * PopulateShardSplitSubscriptionsMetadataList returns a list of 'ShardSplitSubscriberMetadata' + * PopulateShardSplitSubscriptionsMetadataList returns a list of 'LogicalRepTarget' * structure. * * shardSplitInfoHashMap - Shards are grouped by key. @@ -299,483 +177,98 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList */ List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, - List *replicationSlotInfoList) + List *replicationSlotInfoList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardSplitInfoHashMap); - NodeShardMappingEntry *entry = NULL; - List *shardSplitSubscriptionMetadataList = NIL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + PublicationInfo *publication = NULL; + List *logicalRepTargetList = NIL; + while ((publication = (PublicationInfo *) hash_seq_search(&status)) != NULL) { - uint32 nodeId = entry->key.nodeId; - uint32 tableOwnerId = entry->key.tableOwnerId; - ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = - CreateShardSplitSubscriberMetadata(tableOwnerId, nodeId, - replicationSlotInfoList); + uint32 nodeId = publication->key.nodeId; + uint32 tableOwnerId = publication->key.tableOwnerId; + LogicalRepTarget *target = + CreateLogicalRepTarget(tableOwnerId, nodeId, + replicationSlotInfoList); + target->publication = publication; + publication->target = target; - shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList, - shardSplitSubscriberMetadata); + logicalRepTargetList = lappend(logicalRepTargetList, target); } - return shardSplitSubscriptionMetadataList; + List *shardIntervalList = NIL; + foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *shardInterval = NULL; + WorkerNode *workerPlacementNode = NULL; + forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, + workersForPlacementList) + { + NodeAndOwner key; + key.nodeId = workerPlacementNode->nodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); + + bool found = false; + publication = (PublicationInfo *) hash_search( + ShardInfoHashMapForPublications, + &key, + HASH_FIND, + &found); + if (!found) + { + ereport(ERROR, errmsg("Could not find publication matching a split")); + } + publication->target->newShards = lappend( + publication->target->newShards, shardInterval); + } + } + + return logicalRepTargetList; } /* - * Creates a 'ShardSplitSubscriberMetadata' structure for given table owner, node id. + * Creates a 'LogicalRepTarget' structure for given table owner, node id. * It scans the list of 'ReplicationSlotInfo' to identify the corresponding slot * to be used for given tableOwnerId and nodeId. */ -ShardSplitSubscriberMetadata * -CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, - List *replicationSlotInfoList) +static LogicalRepTarget * +CreateLogicalRepTarget(Oid tableOwnerId, uint32 nodeId, + List *replicationSlotInfoList) { - ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0( - sizeof(ShardSplitSubscriberMetadata)); - shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId; + LogicalRepTarget *target = palloc0(sizeof(LogicalRepTarget)); + target->subscriptionName = SubscriptionName(SHARD_SPLIT, tableOwnerId); + target->tableOwnerId = tableOwnerId; + target->subscriptionOwnerName = + SubscriptionRoleName(SHARD_SPLIT, tableOwnerId); + target->superuserConnection = NULL; /* * Each 'ReplicationSlotInfo' belongs to a unique combination of node id and owner. * Traverse the slot list to identify the corresponding slot for given * table owner and node. */ - char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); - ReplicationSlotInfo *replicationSlotInfo = NULL; - foreach_ptr(replicationSlotInfo, replicationSlotInfoList) + ReplicationSlotInfo *replicationSlot = NULL; + foreach_ptr(replicationSlot, replicationSlotInfoList) { - if (nodeId == replicationSlotInfo->targetNodeId && - strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) + if (nodeId == replicationSlot->targetNodeId && + tableOwnerId == replicationSlot->tableOwnerId) { - shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo; + target->replicationSlot = replicationSlot; + break; } } - return shardSplitSubscriberMetadata; -} - - -/* - * CreateShardSplitSubscriptions creates subscriptions for Shard Split operation. - * We follow Shard Split naming scheme for Publication-Subscription management. - * - * targetNodeConnectionList - List of connections to target nodes on which - * subscriptions have to be created. - * - * shardSplitSubscriberMetadataList - List of subscriber metadata. - * - * sourceWorkerNode - Source node. - */ -void -CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitSubscriberMetadataList, - WorkerNode *sourceWorkerNode, - char *superUser, - char *databaseName) -{ - MultiConnection *targetConnection = NULL; - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitSubscriberMetadataList) + if (!target->replicationSlot) { - uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; - CreateShardSplitSubscription(targetConnection, - sourceWorkerNode->workerName, - sourceWorkerNode->workerPort, - superUser, - databaseName, - ShardSplitPublicationName(publicationForNodeId, - ownerId), - shardSplitPubSubMetadata->slotInfo->slotName, - ownerId); - } -} - - -/* - * WaitForShardSplitRelationSubscriptionsBecomeReady waits for a list of subscriptions - * to be come ready. This method invokes 'WaitForRelationSubscriptionsBecomeReady' for each - * subscription. - */ -void -WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) -{ - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = bms_make_singleton( - shardSplitPubSubMetadata->tableOwnerId); - WaitForRelationSubscriptionsBecomeReady( - shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds, - SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } -} - - -/* - * WaitForShardSplitRelationSubscriptionsToBeCaughtUp waits until subscriptions are caught up till - * the source LSN. This method invokes 'WaitForShardSubscriptionToCatchUp' for each subscription. - */ -void -WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List *shardSplitPubSubMetadataList) -{ - ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = bms_make_singleton( - shardSplitPubSubMetadata->tableOwnerId); - WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection, - sourcePosition, - tableOwnerIds, - SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } -} - - -/* - * CreateTemplateReplicationSlot creates a replication slot that acts as a template - * slot for logically replicating split children in the 'catchup' phase of non-blocking split. - * It returns a snapshot name which is used to do snapshotted shard copy in the 'copy' phase - * of nonblocking split workflow. - */ -char * -CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, - MultiConnection *sourceConnection) -{ - StringInfo createReplicationSlotCommand = makeStringInfo(); - appendStringInfo(createReplicationSlotCommand, - "CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;", - ShardSplitTemplateReplicationSlotName( - shardIntervalToSplit->shardId)); - - PGresult *result = NULL; - int response = ExecuteOptionalRemoteCommand(sourceConnection, - createReplicationSlotCommand->data, - &result); - - if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) - { - ReportResultError(sourceConnection, result, ERROR); + ereport(ERROR, errmsg( + "Could not find replication slot matching a subscription %s", + target->subscriptionName)); } - /*'snapshot_name' is second column where index starts from zero. - * We're using the pstrdup to copy the data into the current memory context */ - char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); - return snapShotName; -} - - -/* - * ShardSplitTemplateReplicationSlotName returns name of template replication slot - * following the shard split naming scheme. - */ -char * -ShardSplitTemplateReplicationSlotName(uint64 shardId) -{ - return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId); -} - - -/* - * CreateReplicationSlots creates copies of template replication slot - * on the source node. - * - * sourceNodeConnection - Source node connection. - * - * templateSlotName - Template replication slot name whose copies have to be created. - * This slot holds a LSN from which the logical replication - * begins. - * - * shardSplitSubscriberMetadataList - List of 'ShardSplitSubscriberMetadata. ' - * - * 'ShardSplitSubscriberMetadata' contains replication slot name that is used - * to create copies of template replication slot on source node. These slot names are returned by - * 'worker_split_shard_replication_setup' UDF and each slot is responsible for a specific - * split range. We try multiple attemtps to clean up these replicaton slot copies in the - * below order to be on safer side. - * 1. Clean up before starting shard split workflow. - * 2. Implicitly dropping slots while dropping subscriptions. - * 3. Explicitly dropping slots which would have skipped over from 2. - */ -void -CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName, - List *shardSplitSubscriberMetadataList) -{ - ShardSplitSubscriberMetadata *subscriberMetadata = NULL; - foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList) - { - char *slotName = subscriberMetadata->slotInfo->slotName; - - StringInfo createReplicationSlotCommand = makeStringInfo(); - - appendStringInfo(createReplicationSlotCommand, - "SELECT * FROM pg_catalog.pg_copy_logical_replication_slot (%s, %s)", - quote_literal_cstr(templateSlotName), quote_literal_cstr( - slotName)); - - ExecuteCriticalRemoteCommand(sourceNodeConnection, - createReplicationSlotCommand->data); - } -} - - -/* - * DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles - * and replication slots. These might have been left there after - * the coordinator crashed during a shard split. It's important to delete them - * for two reasons: - * 1. Starting new shard split might fail when they exist, because it cannot - * create them. - * 2. Leftover replication slots that are not consumed from anymore make it - * impossible for WAL to be dropped. This can cause out-of-disk issues. - */ -void -DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub) -{ - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - - /* - * We open new connections to all nodes. The reason for this is that - * operations on subscriptions and publications cannot be run in a - * transaction. By forcing a new connection we make sure no transaction is - * active on the connection. - */ - int connectionFlags = FORCE_NEW_CONNECTION; - - HASH_SEQ_STATUS statusForSubscription; - hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != - NULL) - { - uint32_t nodeId = entry->key.nodeId; - WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); - - MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection( - connectionFlags, workerNode->workerName, workerNode->workerPort, - superUser, databaseName); - - /* We need to claim the connection exclusively while dropping the subscription */ - ClaimConnectionExclusively(cleanupConnection); - - DropAllShardSplitSubscriptions(cleanupConnection); - - DropAllShardSplitUsers(cleanupConnection); - - /* Close connection after cleanup */ - CloseConnection(cleanupConnection); - } - - /*Drop all shard split publications at the source*/ - MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( - connectionFlags, sourceNode->workerName, sourceNode->workerPort, - superUser, databaseName); - - ClaimConnectionExclusively(sourceNodeConnection); - - /* - * If replication slot could not be dropped while dropping the - * subscriber, drop it here. - */ - DropAllShardSplitReplicationSlots(sourceNodeConnection); - DropAllShardSplitPublications(sourceNodeConnection); - - CloseConnection(sourceNodeConnection); -} - - -/* - * DropAllShardSplitSubscriptions drops all the existing subscriptions that - * match our shard split naming scheme on the node that the connection points - * to. - */ -void -DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) -{ - char *query = psprintf( - "SELECT subname FROM pg_catalog.pg_subscription " - "WHERE subname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX)); - List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query); - char *subscriptionName = NULL; - foreach_ptr(subscriptionName, subscriptionNameList) - { - DisableAndDropShardSplitSubscription(cleanupConnection, subscriptionName); - } -} - - -/* - * DropAllShardSplitPublications drops all the existing publications that - * match our shard split naming scheme on the node that the connection points - * to. - */ -static void -DropAllShardSplitPublications(MultiConnection *connection) -{ - char *query = psprintf( - "SELECT pubname FROM pg_catalog.pg_publication " - "WHERE pubname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX)); - List *publicationNameList = GetQueryResultStringList(connection, query); - char *publicationName; - foreach_ptr(publicationName, publicationNameList) - { - DropShardPublication(connection, publicationName); - } -} - - -/* - * DropAllShardSplitUsers drops all the users that match our shard split naming - * scheme. The users are temporary created for shard splits. - */ -static void -DropAllShardSplitUsers(MultiConnection *connection) -{ - char *query = psprintf( - "SELECT rolname FROM pg_catalog.pg_roles " - "WHERE rolname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); - List *usernameList = GetQueryResultStringList(connection, query); - char *username; - foreach_ptr(username, usernameList) - { - DropShardUser(connection, username); - } -} - - -/* - * DropAllShardSplitReplicationSlots drops all the existing replication slots - * that match shard split naming scheme on the node that the connection - * points to. - */ -static void -DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection) -{ - char *query = psprintf( - "SELECT slot_name FROM pg_catalog.pg_replication_slots " - "WHERE slot_name LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_REPLICATION_SLOT_PREFIX)); - List *slotNameList = GetQueryResultStringList(cleanupConnection, query); - char *slotName; - foreach_ptr(slotName, slotNameList) - { - DropShardReplicationSlot(cleanupConnection, slotName); - } -} - - -/* - * DropShardSplitPublications drops the publication used for shard splits over the given - * connection, if it exists. - */ -void -DropShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, shardInfoHashMapForPublication); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32 nodeId = entry->key.nodeId; - uint32 tableOwnerId = entry->key.tableOwnerId; - DropShardPublication(sourceConnection, ShardSplitPublicationName(nodeId, - tableOwnerId)); - } -} - - -/* - * DropShardSplitSubsriptions disables and drops subscriptions from the subscriber node that - * are used to split shards. Note that, it does not drop the replication slots on the publisher node. - * Replication slots will be dropped separately by calling DropShardSplitReplicationSlots. - */ -void -DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList) -{ - ShardSplitSubscriberMetadata *subscriberMetadata = NULL; - foreach_ptr(subscriberMetadata, shardSplitSubscribersMetadataList) - { - uint32 tableOwnerId = subscriberMetadata->tableOwnerId; - MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection; - - DisableAndDropShardSplitSubscription(targetNodeConnection, ShardSubscriptionName( - tableOwnerId, - SHARD_SPLIT_SUBSCRIPTION_PREFIX)); - - DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId, - SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); - } -} - - -/* - * DisableAndDropShardSplitSubscription disables the subscription, resets the slot name to 'none' and - * then drops subscription on the given connection. It does not drop the replication slot. - * The caller of this method should ensure to cleanup the replication slot. - * - * Directly executing 'DROP SUBSCRIPTION' attempts to drop the replication slot at the source node. - * When the subscription is local, direcly dropping the subscription can lead to a self deadlock. - * To avoid this, we first disable the subscription, reset the slot name and then drop the subscription. - */ -void -DisableAndDropShardSplitSubscription(MultiConnection *connection, char *subscriptionName) -{ - StringInfo alterSubscriptionSlotCommand = makeStringInfo(); - StringInfo alterSubscriptionDisableCommand = makeStringInfo(); - - appendStringInfo(alterSubscriptionDisableCommand, - "ALTER SUBSCRIPTION %s DISABLE", - quote_identifier(subscriptionName)); - ExecuteCriticalRemoteCommand(connection, - alterSubscriptionDisableCommand->data); - - appendStringInfo(alterSubscriptionSlotCommand, - "ALTER SUBSCRIPTION %s SET (slot_name = NONE)", - quote_identifier(subscriptionName)); - ExecuteCriticalRemoteCommand(connection, alterSubscriptionSlotCommand->data); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "DROP SUBSCRIPTION %s", - quote_identifier(subscriptionName))); -} - - -/* - * DropShardSplitReplicationSlots drops replication slots on the source node. - */ -void -DropShardSplitReplicationSlots(MultiConnection *sourceConnection, - List *replicationSlotInfoList) -{ - ReplicationSlotInfo *replicationSlotInfo = NULL; - foreach_ptr(replicationSlotInfo, replicationSlotInfoList) - { - DropShardReplicationSlot(sourceConnection, replicationSlotInfo->slotName); - } -} - - -/* - * CloseShardSplitSubscriberConnections closes connection of subscriber nodes. - * 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method - * traverses the list and closes each connection. - */ -void -CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList) -{ - ShardSplitSubscriberMetadata *subscriberMetadata = NULL; - foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList) - { - CloseConnection(subscriberMetadata->targetNodeConnection); - } + return target; } diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index c22a9a084..d9a2daad4 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -187,32 +187,6 @@ ReleaseSharedMemoryOfShardSplitInfo() } -/* - * EncodeReplicationSlot returns an encoded replication slot name - * in the following format. - * Slot Name = citus_split_nodeId_tableOwnerOid - * Max supported length of replication slot name is 64 bytes. - */ -char * -EncodeReplicationSlot(uint32_t nodeId, - uint32_t tableOwnerId) -{ - StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "%s%u_%u", SHARD_SPLIT_REPLICATION_SLOT_PREFIX, nodeId, - tableOwnerId); - - if (slotName->len > NAMEDATALEN) - { - ereport(ERROR, - (errmsg( - "Replication Slot name:%s having length:%d is greater than maximum allowed length:%d", - slotName->data, slotName->len, NAMEDATALEN))); - } - - return slotName->data; -} - - /* * InitializeShardSplitSMHandleManagement requests the necessary shared memory * from Postgres and sets up the shared memory startup hook. diff --git a/src/backend/distributed/utils/hash_helpers.c b/src/backend/distributed/utils/hash_helpers.c index 6bbf14938..9509d8b53 100644 --- a/src/backend/distributed/utils/hash_helpers.c +++ b/src/backend/distributed/utils/hash_helpers.c @@ -10,6 +10,8 @@ #include "postgres.h" +#include "common/hashfn.h" +#include "distributed/citus_safe_lib.h" #include "distributed/hash_helpers.h" #include "utils/hsearch.h" @@ -34,6 +36,50 @@ hash_delete_all(HTAB *htab) } +/* + * CreateSimpleHashWithName creates a hashmap that hashes its key using + * tag_hash function and stores the entries in the current memory context. + */ +HTAB +* +CreateSimpleHashWithName(Size keySize, Size entrySize, char *name) +{ + HASHCTL info; + memset_struct_0(info); + info.keysize = keySize; + info.entrysize = entrySize; + info.hcxt = CurrentMemoryContext; + + /* + * uint32_hash does the same as tag_hash for keys of 4 bytes, but it's + * faster. + */ + if (keySize == sizeof(uint32)) + { + info.hash = uint32_hash; + } + else + { + info.hash = tag_hash; + } + + int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + /* + * We use 32 as the initial number of elements that fit into this hash + * table. This value seems a reasonable tradeof between two issues: + * 1. An empty hashmap shouldn't take up a lot of space + * 2. Doing a few inserts shouldn't require growing the hashmap + * + * NOTE: No performance testing has been performed when choosing this + * value. If this ever turns out to be a problem, feel free to do some + * performance tests. + */ + HTAB *publicationInfoHash = hash_create(name, 32, &info, hashFlags); + return publicationInfoHash; +} + + /* * foreach_htab_cleanup cleans up the hash iteration state after the iteration * is done. This is only needed when break statements are present in the diff --git a/src/include/distributed/hash_helpers.h b/src/include/distributed/hash_helpers.h index 4abe18e11..bebbf885d 100644 --- a/src/include/distributed/hash_helpers.h +++ b/src/include/distributed/hash_helpers.h @@ -30,4 +30,8 @@ extern void hash_delete_all(HTAB *htab); extern void foreach_htab_cleanup(void *var, HASH_SEQ_STATUS *status); +extern HTAB * CreateSimpleHashWithName(Size keysize, Size entrysize, char *name); + +#define CreateSimpleHash(keyType, entryType) \ + CreateSimpleHashWithName(sizeof(keyType), sizeof(entryType), # entryType "Hash") #endif diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 783cd4daf..a2aa17965 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -12,8 +12,10 @@ #ifndef MULTI_LOGICAL_REPLICATION_H_ #define MULTI_LOGICAL_REPLICATION_H_ +#include "c.h" #include "nodes/pg_list.h" +#include "distributed/connection_management.h" /* Config variables managed via guc.c */ @@ -21,49 +23,157 @@ extern int LogicalReplicationTimeout; extern bool PlacementMovedUsingLogicalReplicationInTX; +/* + * NodeAndOwner should be used as a key for structs that should be hashed by a + * combination of node and owner. + */ +typedef struct NodeAndOwner +{ + uint32_t nodeId; + Oid tableOwnerId; +} NodeAndOwner; + + +/* + * ReplicationSlotInfo stores the info that defines a replication slot. For + * shard splits this information is built by parsing the result of the + * 'worker_split_shard_replication_setup' UDF. + */ +typedef struct ReplicationSlotInfo +{ + uint32 targetNodeId; + Oid tableOwnerId; + char *name; +} ReplicationSlotInfo; + +/* + * PublicationInfo stores the information that defines a publication. + */ +typedef struct PublicationInfo +{ + NodeAndOwner key; + char *name; + List *shardIntervals; + struct LogicalRepTarget *target; +} PublicationInfo; + +/* + * Stores information necesary to create all the th + */ +typedef struct LogicalRepTarget +{ + /* + * The Oid of the user that owns the shards in newShards. This Oid is the + * Oid of the user on the coordinator, this Oid is likely different than + * the Oid of the user on the logical replication source or target. + */ + Oid tableOwnerId; + char *subscriptionName; + + /* + * The name of the user that's used as the owner of the subscription. This + * is not the same as the name of the user that matches tableOwnerId. + * Instead we create a temporary user with the same permissions as that + * user, with its only purpose being owning the subscription. + */ + char *subscriptionOwnerName; + ReplicationSlotInfo *replicationSlot; + PublicationInfo *publication; + + /* + * The shardIntervals that we want to create on this logical replication + * target. This can be different from the shard intervals that are part of + * the publication for two reasons: + * 1. The publication does not contain partitioned tables, only their + * children. The partition parent tables ARE part of newShards. + * 2. For shard splits the publication also contains dummy shards, these + * ARE NOT part of newShards. + */ + List *newShards; + + /* + * The superuserConnection is shared between all LogicalRepTargets that have + * the same node. This can be initialized easily by using + * CreateGroupedLogicalRepTargetsConnections. + */ + MultiConnection *superuserConnection; +} LogicalRepTarget; + +/* + * GroupedLogicalRepTargets groups LogicalRepTargets by node. This allows to + * create a hashmap where we can filter by search by nodeId. Which is useful + * because these targets can all use the same superuserConection for + * management, which allows us to batch certain operations such as getting + * state of the subscriptions. + */ +typedef struct GroupedLogicalRepTargets +{ + uint32 nodeId; + List *logicalRepTargetList; + MultiConnection *superuserConnection; +} GroupedLogicalRepTargets; + + +/* + * LogicalRepType is used for various functions to do something different for + * shard moves than for shard splits. Such as using a different prefix for a + * subscription name. + */ +typedef enum LogicalRepType +{ + SHARD_MOVE, + SHARD_SPLIT, +} LogicalRepType; extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); extern void ConflictOnlyWithIsolationTesting(void); -extern void CreateReplicaIdentity(List *shardList, char *nodeName, int32 - nodePort); +extern void CreateReplicaIdentities(List *subscriptionInfoList); +extern void CreateReplicaIdentitiesOnNode(List *shardList, + char *nodeName, + int32 nodePort); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); extern List * GetQueryResultStringList(MultiConnection *connection, char *query); -extern void DropShardSubscription(MultiConnection *connection, - char *subscriptionName); -extern void DropShardPublication(MultiConnection *connection, char *publicationName); +extern MultiConnection * GetReplicationConnection(char *nodeName, int nodePort); +extern void CreatePublications(MultiConnection *sourceConnection, + HTAB *publicationInfoHash); +extern void CreateSubscriptions(MultiConnection *sourceConnection, + char *databaseName, List *subscriptionInfoList); +extern char * CreateReplicationSlots(MultiConnection *sourceConnection, + MultiConnection *sourceReplicationConnection, + List *subscriptionInfoList, + char *outputPlugin); +extern void EnableSubscriptions(List *subscriptionInfoList); +extern void DropSubscriptions(List *subscriptionInfoList); +extern void DropReplicationSlots(MultiConnection *sourceConnection, + List *subscriptionInfoList); +extern void DropPublications(MultiConnection *sourceConnection, + HTAB *publicationInfoHash); +extern void DropAllLogicalReplicationLeftovers(LogicalRepType type); -extern void DropShardUser(MultiConnection *connection, char *username); -extern void DropShardReplicationSlot(MultiConnection *connection, - char *publicationName); +extern char * PublicationName(LogicalRepType type, uint32_t nodeId, Oid ownerId); +extern char * ReplicationSlotName(LogicalRepType type, uint32_t nodeId, Oid ownerId); +extern char * SubscriptionName(LogicalRepType type, Oid ownerId); +extern char * SubscriptionRoleName(LogicalRepType type, Oid ownerId); - -extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix); -extern char * ShardSubscriptionName(Oid ownerId, char *operationPrefix); -extern void CreateShardSplitSubscription(MultiConnection *connection, - char *sourceNodeName, - int sourceNodePort, char *userName, - char *databaseName, - char *publicationName, char *slotName, - Oid ownerId); - -extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, - char *operationPrefix); +extern void WaitForAllSubscriptionsToBecomeReady(HTAB *groupedLogicalRepTargetsHash); +extern void WaitForAllSubscriptionsToCatchUp(MultiConnection *sourceConnection, + HTAB *groupedLogicalRepTargetsHash); extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr sourcePosition, Bitmapset *tableOwnerIds, char *operationPrefix); +extern HTAB * CreateGroupedLogicalRepTargetsHash(List *subscriptionInfoList); +extern void CreateGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash, + char *user, + char *databaseName); +extern void RecreateGroupedLogicalRepTargetsConnections( + HTAB *groupedLogicalRepTargetsHash, + char *user, + char *databaseName); +extern void CloseGroupedLogicalRepTargetsConnections(HTAB *groupedLogicalRepTargetsHash); -#define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" -#define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_" -#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" -#define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_" -#define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_" -#define SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX "citus_shard_split_subscription_role_" -#define SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX "citus_shard_split_template_slot_" -#define SHARD_SPLIT_REPLICATION_SLOT_PREFIX "citus_shard_split_" #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 28138dd2a..b47861369 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -12,86 +12,34 @@ #ifndef SHARDSPLIT_LOGICAL_REPLICATION_H #define SHARDSPLIT_LOGICAL_REPLICATION_H +#include "distributed/metadata_utility.h" #include "distributed/multi_logical_replication.h" +#include "distributed/worker_manager.h" /* - * Invocation of 'worker_split_shard_replication_setup' UDF returns set of records - * of custom datatype 'replication_slot_info'. This information is parsed and stored in - * the below data structure. The information is used to create a subscriber on target node - * with corresponding slot name. + * GroupedShardSplitInfos groups all ShardSplitInfos belonging to the same node + * and table owner together. This data structure its only purpose is creating a + * hashmap that allows us to search ShardSplitInfos by node and owner. */ -typedef struct ReplicationSlotInfo +typedef struct GroupedShardSplitInfos { - uint32 targetNodeId; - char *tableOwnerName; - char *slotName; -} ReplicationSlotInfo; - -/* - * Stores information necesary for creating a subscriber on target node. - * Based on how a shard is split and mapped to target nodes, for each unique combination of - * there is a 'ShardSplitSubscriberMetadata'. - */ -typedef struct ShardSplitSubscriberMetadata -{ - Oid tableOwnerId; - ReplicationSlotInfo *slotInfo; - - /* - * Exclusively claimed connection for a subscription.The target node of subscription - * is pointed by ReplicationSlotInfo. - */ - MultiConnection *targetNodeConnection; -} ShardSplitSubscriberMetadata; - -/* key for NodeShardMappingEntry */ -typedef struct NodeShardMappingKey -{ - uint32_t nodeId; - Oid tableOwnerId; -} NodeShardMappingKey; - -/* Entry for hash map */ -typedef struct NodeShardMappingEntry -{ - NodeShardMappingKey key; + NodeAndOwner key; List *shardSplitInfoList; -} NodeShardMappingEntry; +} GroupedShardSplitInfos; -extern uint32 NodeShardMappingHash(const void *key, Size keysize); -extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); -extern HTAB * SetupHashMapForShardInfo(void); /* Functions for subscriber metadata management */ extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, - List *replicationSlotInfoList); + List *replicationSlotInfoList, + List * + shardGroupSplitIntervalListList, + List *workersForPlacementList); extern HTAB * CreateShardSplitInfoMapForPublication( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -/* Functions for creating publications and subscriptions*/ -extern void AlterShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication); -extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitSubscriberMetadataList, - WorkerNode *sourceWorkerNode, char *superUser, - char *databaseName); -extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, - char *templateSlotName, - List *shardSplitSubscriberMetadataList); -extern List * CreateTargetNodeConnectionsForShardSplit( - List *shardSplitSubscribersMetadataList, - int - connectionFlags, char *user, - char *databaseName); - /* Functions to drop publisher-subscriber resources */ -extern void CreateShardSplitPublications(MultiConnection *sourceConnection, - HTAB *shardInfoHashMapForPublication); -extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, - MultiConnection * - sourceConnection); extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitMapOfPublications); extern void DropShardSplitPublications(MultiConnection *sourceConnection, @@ -99,16 +47,4 @@ extern void DropShardSplitPublications(MultiConnection *sourceConnection, extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList); extern void DropShardSplitReplicationSlots(MultiConnection *sourceConnection, List *replicationSlotInfoList); -extern void DisableAndDropShardSplitSubscription(MultiConnection *connection, - char *subscriptionName); - -/* Wrapper functions which wait for a subscriber to be ready and catchup */ -extern void WaitForShardSplitRelationSubscriptionsBecomeReady( - List *shardSplitPubSubMetadataList); -extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List * - shardSplitPubSubMetadataList); - -extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId); -extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ diff --git a/src/include/distributed/shardsplit_shared_memory.h b/src/include/distributed/shardsplit_shared_memory.h index e4d6e910d..d06fdca01 100644 --- a/src/include/distributed/shardsplit_shared_memory.h +++ b/src/include/distributed/shardsplit_shared_memory.h @@ -14,6 +14,8 @@ #ifndef SHARDSPLIT_SHARED_MEMORY_H #define SHARDSPLIT_SHARED_MEMORY_H +#include "postgres.h" + /* * In-memory mapping of a split child shard. */ @@ -79,6 +81,4 @@ extern ShardSplitInfoSMHeader * GetShardSplitInfoSMHeader(void); extern HTAB * PopulateSourceToDestinationShardMapForSlot(char *slotName, MemoryContext cxt); - -extern char * EncodeReplicationSlot(uint32_t nodeId, uint32_t tableOwnerId); #endif /* SHARDSPLIT_SHARED_MEMORY_H */ diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 231470df0..777a6e59e 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -154,6 +154,25 @@ SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel (1 row) +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: canceling statement due to user request +-- failure on disabling subscription (right before dropping it) +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +ERROR: connection not open +CONTEXT: while executing command on localhost:xxxxx +-- cancellation on disabling subscription (right before dropping it) +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -- failure on dropping subscription @@ -164,15 +183,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -WARNING: connection not open +ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy @@ -209,11 +221,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -WARNING: connection not open +ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -241,11 +250,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -WARNING: connection not open +ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -267,11 +273,8 @@ SELECT citus.mitmproxy('conn.matches(b"CREATE INDEX").killall()'); (1 row) SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); -WARNING: connection not open +ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx -WARNING: connection not open -CONTEXT: while executing command on localhost:xxxxx -ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- Verify that the shard is not moved and the number of rows are still 100k SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/logical_replication.out b/src/test/regress/expected/logical_replication.out index d8e91aa1e..3218387f3 100644 --- a/src/test/regress/expected/logical_replication.out +++ b/src/test/regress/expected/logical_replication.out @@ -27,8 +27,11 @@ CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_683 \c - - - :master_port SET search_path TO logical_replication; \set connection_string '\'user=postgres host=localhost port=' :worker_1_port '\'' -CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :connection_string PUBLICATION citus_shard_move_publication_:postgres_oid; -NOTICE: created replication slot "citus_shard_move_subscription_10" on publisher +CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid + CONNECTION :connection_string + PUBLICATION citus_shard_move_publication_:postgres_oid + WITH (slot_name=citus_shard_move_slot_:postgres_oid); +NOTICE: created replication slot "citus_shard_move_slot_10" on publisher SELECT count(*) from pg_subscription; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index 29c535cd1..eead15e40 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -78,8 +78,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index be0a46d4b..4bdb8c013 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -71,7 +71,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ 1 (1 row) -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index 77bdf8336..d14b29552 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -19,7 +19,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ 1 (1 row) -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; CREATE SUBSCRIPTION local_subscription diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 1f41519ca..88df2a5b7 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -18,8 +18,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple 2 (1 row) -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index 8028ccb24..eb8d68e20 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -79,6 +79,14 @@ SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost' SELECT citus.mitmproxy('conn.onQuery(query="^SELECT min\(latest_end_lsn").cancel(' || :pid || ')'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); +-- failure on disabling subscription (right before dropping it) +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").kill()'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + +-- cancellation on disabling subscription (right before dropping it) +SELECT citus.mitmproxy('conn.onQuery(query="^ALTER SUBSCRIPTION .* DISABLE").cancel(' || :pid || ')'); +SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); + -- failure on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").kill()'); SELECT master_move_shard_placement(101, 'localhost', :worker_1_port, 'localhost', :worker_2_proxy_port); diff --git a/src/test/regress/sql/logical_replication.sql b/src/test/regress/sql/logical_replication.sql index 103295f5c..c4f3eeb12 100644 --- a/src/test/regress/sql/logical_replication.sql +++ b/src/test/regress/sql/logical_replication.sql @@ -24,7 +24,10 @@ CREATE PUBLICATION citus_shard_move_publication_:postgres_oid FOR TABLE dist_683 \c - - - :master_port SET search_path TO logical_replication; \set connection_string '\'user=postgres host=localhost port=' :worker_1_port '\'' -CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid CONNECTION :connection_string PUBLICATION citus_shard_move_publication_:postgres_oid; +CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid + CONNECTION :connection_string + PUBLICATION citus_shard_move_publication_:postgres_oid + WITH (slot_name=citus_shard_move_slot_:postgres_oid); SELECT count(*) from pg_subscription; SELECT count(*) from pg_publication; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 871bf49b5..a8311bceb 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -76,9 +76,9 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 97e1d275f..20e12ee7b 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -71,7 +71,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index 40cbd4063..6d0091fe2 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -18,7 +18,7 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info ]); -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 47e0c2aa3..21c2b0252 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -16,8 +16,8 @@ SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[ ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info ]); -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1