diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 43fc50337..8d2184f19 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -106,6 +106,11 @@ void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, WorkerNode *sourceWorkerNode); +static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); + /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = { @@ -1147,8 +1152,8 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ /* shardIntervalToSplit, sourceShardToCopyNode); */ - // DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, - // shardGroupSplitIntervalListList, workersForPlacementList, NULL); + /* DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, */ + /* shardGroupSplitIntervalListList, workersForPlacementList, NULL); */ CreateDummyShardsForShardGroup( sourceColocatedShardIntervalList, @@ -1162,27 +1167,26 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, sourceShardToCopyNode, workersForPlacementList); + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); - // /* - // * Drop old shards and delete related metadata. Have to do that before - // * creating the new shard metadata, because there's cross-checks - // * preventing inconsistent metadata (like overlapping shards). - // */ - // DropShardList(sourceColocatedShardIntervalList); + /* Insert new shard and placement metdata */ + InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, + workersForPlacementList); - // /* Insert new shard and placement metdata */ - // InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, - // workersForPlacementList); + /* + * Create foreign keys if exists after the metadata changes happening in + * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign + * key creation depends on the new metadata. + */ + CreateForeignKeyConstraints(shardGroupSplitIntervalListList, + workersForPlacementList); - // /* - // * Create foreign keys if exists after the metadata changes happening in - // * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign - // * key creation depends on the new metadata. - // */ - // CreateForeignKeyConstraints(shardGroupSplitIntervalListList, - // workersForPlacementList); - - // DropDummyShards(); + DropDummyShards(); } PG_CATCH(); { @@ -1351,69 +1355,90 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList) { - StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF( - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList); + /* / * // / *Create Template replication slot * / * / */ + /* / * char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( * / */ + /* / * shardIntervalToSplit, sourceWorkerNode); * / */ + /* List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( */ + /* sourceColocatedShardIntervalList, */ + /* shardGroupSplitIntervalListList, */ + /* destinationWorkerNodesList, */ + /* replicationSlotInfoList); */ + /* earlier the above method used to take replication slot info as information */ + + /* LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, */ + /* sourceColocatedShardIntervalList, */ + /* shardGroupSplitIntervalListList, */ + /* destinationWorkerNodesList); */ + + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceWorkerNode-> - workerName, - sourceWorkerNode-> - workerPort, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - ClaimConnectionExclusively(sourceConnection); - PGresult *result = NULL; - int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, - splitShardReplicationUDF->data, - &result); - if (queryResult != RESPONSE_OKAY || !IsResponseOK(result)) - { - ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Failed to run worker_split_shard_replication_setup"))); - - PQclear(result); - ForgetResults(sourceConnection); - } - - /* Get replication slot information */ - List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); - - PQclear(result); - ForgetResults(sourceConnection); - - - // /* // / *Create Template replication slot * / */ - // /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ - // /* shardIntervalToSplit, sourceWorkerNode); */ - // List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( - // sourceColocatedShardIntervalList, - // shardGroupSplitIntervalListList, - // destinationWorkerNodesList, - // replicationSlotInfoList); - // earlier the above method used to take replication slot info as information - - - - // LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, - // sourceColocatedShardIntervalList, - // shardGroupSplitIntervalListList, - // destinationWorkerNodesList); - - HTAB * shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( + HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, destinationWorkerNodesList); DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); - CreateShardSplitPublicationsTwo(sourceConnection, shardSplitHashMapForPublication); + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceWorkerNode-> + workerName, + sourceWorkerNode-> + workerPort, + superUser, + databaseName); + ClaimConnectionExclusively(sourceConnection); - //DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); + CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); + + /*Create Template Replication Slot */ + + /* DoSplitCopy */ + + /*worker_split_replication_setup_udf*/ + List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( + sourceWorkerNode, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList); + + /* Subscriber flow starts from here */ + List *shardSplitSubscribersMetadataList = PopulateShardSplitSubscriptionsMetadataList( + shardSplitHashMapForPublication, replicationSlotInfoList); + + List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( + shardSplitSubscribersMetadataList, + connectionFlags, + superUser, databaseName); + + CreateShardSplitSubscriptions(targetNodeConnectionList, + shardSplitSubscribersMetadataList, + sourceWorkerNode, + superUser, + databaseName); + + WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitSubscribersMetadataList); + + XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + + CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, + destinationWorkerNodesList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + + BlockWritesToShardList(sourceColocatedShardIntervalList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + shardSplitSubscribersMetadataList); + + /*DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); */ } @@ -1497,6 +1522,85 @@ TryDropShard(MultiConnection *connection, ShardInterval *shardInterval) } +char * +CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, + WorkerNode *sourceWorkerNode) +{ + /*Create Template replication slot */ + int connectionFlags = FORCE_NEW_CONNECTION; + connectionFlags |= EXCLUSIVE_AND_REPLICATION; + + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceWorkerNode-> + workerName, + sourceWorkerNode-> + workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + + char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, + sourceConnection); + + return snapShotName; +} + + +static List * +ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) +{ + StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList); + + int connectionFlags = FORCE_NEW_CONNECTION; + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceWorkerNode-> + workerName, + sourceWorkerNode-> + workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + ClaimConnectionExclusively(sourceConnection); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, + splitShardReplicationUDF->data, + &result); + + /* + * Result should contain atleast one tuple. The information returned is + * set of tuples where each tuple is formatted as: + * . + */ + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) < 1 || + PQnfields(result) != 3) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg( + "Failed to run worker_split_shard_replication_setup UDF. It should successfully execute " + " for splitting a shard in a non-blocking way. Please retry."))); + + PQclear(result); + ForgetResults(sourceConnection); + } + + /* Get replication slot information */ + List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); + + PQclear(result); + ForgetResults(sourceConnection); + UnclaimConnection(sourceConnection); + + return replicationSlotInfoList; +} + + StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, @@ -1549,27 +1653,3 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, return splitShardReplicationUDF; } - - -char * -CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, - WorkerNode *sourceWorkerNode) -{ - /*Create Template replication slot */ - int connectionFlags = FORCE_NEW_CONNECTION; - connectionFlags |= EXCLUSIVE_AND_REPLICATION; - - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceWorkerNode-> - workerName, - sourceWorkerNode-> - workerPort, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - - char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, - sourceConnection); - - return snapShotName; -} diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 5b89f900e..b0f931967 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1166,7 +1166,7 @@ ShardSubscriptionName(Oid ownerId, char *operationPrefix) * subscription that subscribes to the tables of the given owner. */ static char * -ShardSubscriptionRole(Oid ownerId, char * operationPrefix) +ShardSubscriptionRole(Oid ownerId, char *operationPrefix) { return psprintf("%s%i", operationPrefix, ownerId); } @@ -1322,8 +1322,9 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) { DropShardSubscription(connection, ShardSubscriptionName(ownerId, - SHARD_MOVE_SUBSCRIPTION_PREFIX)); - DropShardUser(connection, ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); + SHARD_MOVE_SUBSCRIPTION_PREFIX)); + DropShardUser(connection, ShardSubscriptionRole(ownerId, + SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); } } @@ -1508,7 +1509,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, "ALTER SUBSCRIPTION %s OWNER TO %s", ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), - ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) + ShardSubscriptionRole(ownerId, + SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) )); /* @@ -2118,7 +2120,8 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, "ALTER SUBSCRIPTION %s OWNER TO %s", ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX), - ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) + ShardSubscriptionRole(ownerId, + SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) )); /* diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 60c9add64..a0e886af1 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -18,7 +18,6 @@ #include "distributed/shard_split.h" #include "distributed/listutils.h" #include "distributed/shardsplit_logical_replication.h" -#include "distributed/multi_logical_replication.h" #include "distributed/resource_lock.h" #include "utils/builtins.h" #include "commands/dbcommands.h" @@ -29,41 +28,22 @@ static HTAB *ShardInfoHashMapForPublications = NULL; static void AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool isChildShardInterval); -ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, - List *shardIdList, - List *replicationSlotInfoList); - +ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 + nodeId, + List * + replicationSlotInfoList); static void CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, uint32_t publicationForTargetNodeId, Oid tableOwner); -static void CreateShardSplitPublications(MultiConnection *sourceConnection, - List *shardSplitPubSubMetadataList); - -static void CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitPubSubMetadataList, - WorkerNode *sourceWorkerNode, char *superUser, - char *databaseName); -static void WaitForShardSplitRelationSubscriptionsBecomeReady( - List *shardSplitPubSubMetadataList); - -static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List * - shardSplitPubSubMetadataList); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); -List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int - connectionFlags, char *user, - char *databaseName); -static void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection); -static void DropAllShardSplitPublications(MultiConnection * cleanupConnection); -static void DropAllShardSplitUsers(MultiConnection * cleanupConnection); -static void DropAllReplicationSlots(List * replicationSlotInfo); - -/*used for debuggin. Remove later*/ -void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata); +static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); +static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); +static void DropAllShardSplitUsers(MultiConnection *cleanupConnection); +static void DropAllReplicationSlots(List *replicationSlotInfo); List * @@ -72,8 +52,6 @@ ParseReplicationSlotInfoFromResult(PGresult *result) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - printf("sameer row count %d col count: %d\n ", rowCount, colCount); - List *replicationSlotInfoList = NIL; for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { @@ -91,17 +69,14 @@ ParseReplicationSlotInfoFromResult(PGresult *result) replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); } - /*TODO(saawasek): size of this should not be NULL - * Also check for warning - */ return replicationSlotInfoList; } HTAB * CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList) + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) { ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); ShardInterval *sourceShardIntervalToCopy = NULL; @@ -174,14 +149,13 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, } -ShardSplitPubSubMetadata * -CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardIntervalList, - List *replicationSlotInfoList) +ShardSplitSubscriberMetadata * +CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, + List *replicationSlotInfoList) { - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = palloc0( - sizeof(ShardSplitPubSubMetadata)); - shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; - shardSplitPubSubMetadata->tableOwnerId = tableOwnerId; + ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0( + sizeof(ShardSplitSubscriberMetadata)); + shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId; char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); ReplicationSlotInfo *replicationSlotInfo = NULL; @@ -190,13 +164,12 @@ CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardInter if (nodeId == replicationSlotInfo->targetNodeId && strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) { - shardSplitPubSubMetadata->slotInfo = replicationSlotInfo; + shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo; break; } } - PrintShardSplitPubSubMetadata(shardSplitPubSubMetadata); - return shardSplitPubSubMetadata; + return shardSplitSubscriberMetadata; } @@ -225,7 +198,7 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, superUser, databaseName); /* create publications */ - CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); + /*CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); */ CreateShardSplitSubscriptions(targetNodeConnectionList, shardSplitPubSubMetadataList, @@ -257,46 +230,38 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, void -PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata) +PrintShardSplitPubSubMetadata(ShardSplitSubscriberMetadata *shardSplitMetadata) { - printf("sameer: ShardSplitPubSbuMetadata\n"); + printf("\nsameer: ShardSplitPubSbuMetadata"); ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo; - - List *shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription; - printf("shardIds: "); - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, shardIntervalList) - { - printf("%ld ", shardInterval->shardId); - } - - printf("\nManual Username from OID at source: %s \n", GetUserNameFromId( + printf("Manual Username from OID at source: %s \n", GetUserNameFromId( shardSplitMetadata->tableOwnerId, false)); printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, replicationInfo->targetNodeId, replicationInfo->tableOwnerName); + printf("\n"); } -static void -CreateShardSplitPublications(MultiConnection *sourceConnection, - List *shardSplitPubSubMetadataList) -{ - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; +/* static void */ +/* CreateShardSplitPublications(MultiConnection *sourceConnection, */ +/* List *shardSplitPubSubMetadataList) */ +/* { */ +/* ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; */ +/* foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) */ +/* { */ +/* uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; */ +/* Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; */ - CreateShardSplitPublicationForNode(sourceConnection, - shardSplitPubSubMetadata-> - shardIntervalListForSubscription, - publicationForNodeId, - tableOwnerId); - } -} +/* CreateShardSplitPublicationForNode(sourceConnection, */ +/* shardSplitPubSubMetadata-> */ +/* shardIntervalListForSubscription, */ +/* publicationForNodeId, */ +/* tableOwnerId); */ +/* } */ +/* } */ -static void +void CreateShardSplitSubscriptions(List *targetNodeConnectionList, List *shardSplitPubSubMetadataList, WorkerNode *sourceWorkerNode, @@ -304,7 +269,7 @@ CreateShardSplitSubscriptions(List *targetNodeConnectionList, char *databaseName) { MultiConnection *targetConnection = NULL; - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; forboth_ptr(targetConnection, targetNodeConnectionList, shardSplitPubSubMetadata, shardSplitPubSubMetadataList) { @@ -358,10 +323,10 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) } -static void +void WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) { - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) { Bitmapset *tableOwnerIds = NULL; @@ -374,11 +339,11 @@ WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadata } -static void +void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List *shardSplitPubSubMetadataList) { - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL; foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) { Bitmapset *tableOwnerIds = NULL; @@ -394,14 +359,15 @@ WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * -CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int +CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int connectionFlags, char *user, char *databaseName) { List *targetNodeConnectionList = NIL; - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = NULL; + foreach_ptr(shardSplitSubscriberMetadata, shardSplitSubscribersMetadataList) { - uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + /*TODO(saawasek):For slot equals not null */ + uint32 targetWorkerNodeId = shardSplitSubscriberMetadata->slotInfo->targetNodeId; WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); MultiConnection *targetConnection = @@ -412,7 +378,8 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int ClaimConnectionExclusively(targetConnection); targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); - shardSplitPubSubMetadata->targetNodeConnection = targetConnection; + + shardSplitSubscriberMetadata->targetNodeConnection = targetConnection; } return targetNodeConnectionList; @@ -476,7 +443,8 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo } -void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMapForPubSub) +void +DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub) { char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); @@ -493,7 +461,8 @@ void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMap hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != NULL) + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != + NULL) { uint32_t nodeId = entry->key.nodeId; WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); @@ -509,15 +478,17 @@ void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMap /*Drop all shard split publications at the source*/ MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( - connectionFlags, sourceNode->workerName, sourceNode->workerPort, - superUser, databaseName); + connectionFlags, sourceNode->workerName, sourceNode->workerPort, + superUser, databaseName); DropAllShardSplitPublications(sourceNodeConnection); - + CloseConnection(sourceNodeConnection); } -void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection) + +void +DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) { char *query = psprintf( "SELECT subname FROM pg_subscription " @@ -531,6 +502,7 @@ void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection) } } + static void DropAllShardSplitPublications(MultiConnection *connection) { @@ -546,10 +518,11 @@ DropAllShardSplitPublications(MultiConnection *connection) } } + void DropAllShardSplitUsers(MultiConnection *connection) { - char *query = psprintf( + char *query = psprintf( "SELECT rolname FROM pg_roles " "WHERE rolname LIKE %s || '%%'", quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); @@ -561,9 +534,10 @@ DropAllShardSplitUsers(MultiConnection *connection) } } -void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, - HTAB * shardInfoHashMapForPublication) +void +CreateShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMapForPublication); @@ -573,11 +547,38 @@ void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, { uint32 nodeId = entry->key.nodeId; uint32 tableOwnerId = entry->key.tableOwnerId; - List * shardListForPublication = entry->shardSplitInfoList; + List *shardListForPublication = entry->shardSplitInfoList; CreateShardSplitPublicationForNode(sourceConnection, - shardListForPublication, - nodeId, - tableOwnerId); + shardListForPublication, + nodeId, + tableOwnerId); } -} \ No newline at end of file +} + + +List * +PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, + List *replicationSlotInfoList) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardSplitInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + List *shardSplitSubscriptionMetadataList = NIL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + uint32 tableOwnerId = entry->key.tableOwnerId; + ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = + CreateShardSplitSubscriberMetadata(tableOwnerId, nodeId, + replicationSlotInfoList); + + shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList, + shardSplitSubscriberMetadata); + + /*replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); */ + } + + return shardSplitSubscriptionMetadataList; +} diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index 5806c0836..5f98102b1 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -27,11 +27,10 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int targetNodePort); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); -extern List * -GetQueryResultStringList(MultiConnection *connection, char *query); +extern List * GetQueryResultStringList(MultiConnection *connection, char *query); extern void DropShardSubscription(MultiConnection *connection, - char *subscriptionName); + char *subscriptionName); extern void DropShardPublication(MultiConnection *connection, char *publicationName); extern void DropShardUser(MultiConnection *connection, char *username); diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 775db1623..38e125bd0 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -10,6 +10,8 @@ #ifndef SHARDSPLIT_LOGICAL_REPLICATION_H #define SHARDSPLIT_LOGICAL_REPLICATION_H +#include "distributed/multi_logical_replication.h" + typedef struct ReplicationSlotInfo { uint32 targetNodeId; @@ -17,19 +19,17 @@ typedef struct ReplicationSlotInfo char *slotName; } ReplicationSlotInfo; -typedef struct ShardSplitPubSubMetadata +typedef struct ShardSplitSubscriberMetadata { - List *shardIntervalListForSubscription; Oid tableOwnerId; ReplicationSlotInfo *slotInfo; /* - * Exclusively claimed connection for subscription. - * The target node of subscription + * Exclusively claimed connection for subscription.The target node of subscription * is pointed by ReplicationSlotInfo. */ MultiConnection *targetNodeConnection; -} ShardSplitPubSubMetadata; +} ShardSplitSubscriberMetadata; /* key for NodeShardMappingEntry */ typedef struct NodeShardMappingKey @@ -49,26 +49,48 @@ extern uint32 NodeShardMappingHash(const void *key, Size keysize); extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); HTAB * SetupHashMapForShardInfo(void); -List * ParseReplicationSlotInfoFromResult(PGresult *result); +extern List * ParseReplicationSlotInfoFromResult(PGresult *result); -extern HTAB * CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList); +extern HTAB * CreateShardSplitInfoMapForPublication( + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, List *shardSplitPubSubMetadataList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); +extern void CreateShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication); +extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode, + HTAB *shardSplitMapOfPublications); -extern void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, - HTAB * shardInfoHashMapForPublication); - -extern void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitMapOfPublications); +extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, + List *replicationSlotInfoList); extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot( ShardInterval *shardIntervalToSplit, MultiConnection * sourceConnection); + +extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList, + WorkerNode *sourceWorkerNode, char *superUser, + char *databaseName); +extern void WaitForShardSplitRelationSubscriptionsBecomeReady( + List *shardSplitPubSubMetadataList); +extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, + List * + shardSplitPubSubMetadataList); + +List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, + int + connectionFlags, char *user, + char *databaseName); + +/*used for debuggin. Remove later*/ +extern void PrintShardSplitPubSubMetadata( + ShardSplitSubscriberMetadata *shardSplitMetadata); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index 7ce89ef7c..2662917b9 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -100,7 +100,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['0'], - ARRAY[:worker_1_node, :worker_2_node], + ARRAY[:worker_2_node, :worker_2_node], 'force_logical'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -128,28 +128,111 @@ NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 16)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_16_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1 +NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101 DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_to_split_1 +NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 18)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE USER citus_shard_split_subscription_role_10 SUPERUSER IN ROLE postgres +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_split_subscription_role_10 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER ROLE citus_shard_split_subscription_role_10 NOSUPERUSER +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1]) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1]) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '0'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '1', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 84), (101, 1, 0, 16, 85)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -164,25 +247,38 @@ SELECT * FROM show_catalog; Schema | Name | Owner --------------------------------------------------------------------- citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_1 | postgres + citus_split_shard_by_split_points_negative | table_to_split_100 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres (3 rows) +SELECT * FROM pg_subscription; + oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications +--------------------------------------------------------------------- + 17324 | 16384 | citus_shard_split_subscription_10 | 17323 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_shard_split_subscription_10 | off | {citus_shard_split_publication_18_10} +(1 row) + \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; - Schema | Name | Owner + Schema | Name | Owner --------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_1 | postgres - citus_split_shard_by_split_points_negative | table_to_split_100 | postgres - citus_split_shard_by_split_points_negative | table_to_split_101 | postgres -(4 rows) + citus_split_shard_by_split_points_negative | table_to_split | postgres +(1 row) SELECT * FROM pg_publication; oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot --------------------------------------------------------------------- - 17381 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f - 17384 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f -(2 rows) + 17381 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f +(1 row) + +SELECT * FROM pg_subscription; + oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications +--------------------------------------------------------------------- +(0 rows) + +SELECT slot_name FROM pg_replication_slots; + slot_name +--------------------------------------------------------------------- + citus_shard_split_subscription_10 +(1 row) diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index 68d0853f9..aa8e14d53 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -48,7 +48,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['0'], - ARRAY[:worker_1_node, :worker_2_node], + ARRAY[:worker_2_node, :worker_2_node], 'force_logical'); -- On worker2, we want child shard 2 and dummy shard 1 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 -- @@ -56,8 +56,11 @@ SELECT citus_split_shard_by_split_points( \c - - - :worker_2_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; +SELECT * FROM pg_subscription; \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; SELECT * FROM pg_publication; +SELECT * FROM pg_subscription; +SELECT slot_name FROM pg_replication_slots;