From 2fa1dac051bd4374250e8be0aede98c9110a499f Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Thu, 14 Jul 2022 16:23:00 +0530 Subject: [PATCH] Add: 1) Drop colocated shards 2) Swap Metadata 3) Drop dummy shards --- .../citus_split_shard_by_split_points.c | 1 + .../distributed/operations/shard_split.c | 198 +++++-- .../split_shard_replication_setup.c | 61 +- .../replication/multi_logical_replication.c | 177 +++--- .../shardsplit_logical_replication.c | 539 ++++++++++-------- .../distributed/multi_logical_replication.h | 19 +- src/include/distributed/shard_split.h | 4 +- .../shardsplit_logical_replication.h | 30 +- src/test/regress/expected/citus_sameer.out | 139 ++++- src/test/regress/sql/citus_sameer.sql | 10 +- 10 files changed, 750 insertions(+), 428 deletions(-) diff --git a/src/backend/distributed/operations/citus_split_shard_by_split_points.c b/src/backend/distributed/operations/citus_split_shard_by_split_points.c index 07e273544..231b2fdbc 100644 --- a/src/backend/distributed/operations/citus_split_shard_by_split_points.c +++ b/src/backend/distributed/operations/citus_split_shard_by_split_points.c @@ -81,6 +81,7 @@ LookupSplitMode(Oid shardTransferModeOid) { shardSplitMode = BLOCKING_SPLIT; } + /* Extend with other modes as we support them */ else if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0 || strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index e91a8ba90..86f8b11b1 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -50,9 +50,9 @@ static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, @@ -74,9 +74,9 @@ static void BlockingShardSplit(SplitOperation splitOperation, List *shardSplitPointsList, List *workersForPlacementList); static void NonBlockingShardSplit(SplitOperation splitOperation, - ShardInterval *shardIntervalToSplit, - List *shardSplitPointsList, - List *workersForPlacementList); + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList); static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, @@ -94,6 +94,10 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode); +static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); +static void DropDummyShards(void); +void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); + /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = { @@ -106,6 +110,9 @@ static const char *const SplitTargetName[] = [ISOLATE_TENANT_TO_NEW_SHARD] = "tenant", }; +/* Map containing list of dummy shards created on target nodes */ +static HTAB *DummyShardInfoHashMap = NULL; + /* Function definitions */ /* @@ -1039,7 +1046,7 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, int connectionFlags = FOR_DDL; connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connnection = GetNodeUserDatabaseConnection( + MultiConnection *connection = GetNodeUserDatabaseConnection( connectionFlags, workerPlacementNode->workerName, workerPlacementNode->workerPort, @@ -1051,13 +1058,14 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, * The shard may or may not exist and the connection could have died. */ ExecuteOptionalRemoteCommand( - connnection, + connection, dropShardQuery->data, NULL /* pgResult */); } } } + /* * SplitShard API to split a given shard (or shard group) in blocking fashion * based on specified split points to a set of destination nodes. @@ -1068,9 +1076,9 @@ TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, */ static void NonBlockingShardSplit(SplitOperation splitOperation, - ShardInterval *shardIntervalToSplit, - List *shardSplitPointsList, - List *workersForPlacementList) + ShardInterval *shardIntervalToSplit, + List *shardSplitPointsList, + List *workersForPlacementList) { List *sourceColocatedShardIntervalList = ColocatedShardIntervalList( shardIntervalToSplit); @@ -1106,13 +1114,34 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, sourceShardToCopyNode, workersForPlacementList); - + + /*TODO: Refactor this method. BlockWrites is within this as of now, take it out */ SplitShardReplicationSetup( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, sourceShardToCopyNode, workersForPlacementList); - + + /* + * Drop old shards and delete related metadata. Have to do that before + * creating the new shard metadata, because there's cross-checks + * preventing inconsistent metadata (like overlapping shards). + */ + DropShardList(sourceColocatedShardIntervalList); + + /* Insert new shard and placement metdata */ + InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, + workersForPlacementList); + + /* + * Create foreign keys if exists after the metadata changes happening in + * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign + * key creation depends on the new metadata. + */ + CreateForeignKeyConstraints(shardGroupSplitIntervalListList, + workersForPlacementList); + + DropDummyShards(); } PG_CATCH(); { @@ -1120,17 +1149,20 @@ NonBlockingShardSplit(SplitOperation splitOperation, TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, workersForPlacementList); + DropDummyShards(); + PG_RE_THROW(); } PG_END_TRY(); } + /* Create ShardGroup split children on a list of corresponding workers. */ static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList) + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList) { /* Iterate on shard interval list for shard group */ List *shardIntervalList = NULL; @@ -1151,18 +1183,25 @@ CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, splitShardCreationCommandList, shardInterval->shardId); - /* Create new split child shard on the specified placement list */ + /* Create new split child shard on the specified worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); } } } + static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, List *workersForPlacementList) { + /* + * Setup a hash map to store list of dummy shards created on nodes. + * This will facilitate easy cleanup. + */ + DummyShardInfoHashMap = SetupHashMapForShardInfo(); + /* * Statisfy Constraint 1: Create dummy source shard(s) on all destination nodes. * If source node is also in desintation, skip dummy shard creation(see Note 1 from function description). @@ -1185,7 +1224,7 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, sourceColocatedShardIntervalList) { - /* Populate list of commands necessary to create shard interval on destination */ + /* Populate list of commands necessary to create shard interval on destination */ List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( shardInterval->relationId, false, /* includeSequenceDefaults */ @@ -1194,8 +1233,11 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, splitShardCreationCommandList, shardInterval->shardId); - /* Create new split child shard on the specified placement list */ + /* Create dummy source shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + + /* Add dummy source shard entry created for placement node in map */ + AddDummyShardEntryInMap(workerPlacementNode->nodeId, shardInterval); } } @@ -1224,12 +1266,16 @@ CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, splitShardCreationCommandList, shardInterval->shardId); - /* Create new split child shard on the specified placement list */ + /* Create dummy split child shard on source worker node */ CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + + /* Add dummy split child shard entry created on source node */ + AddDummyShardEntryInMap(sourceWorkerNode->nodeId, shardInterval); } } } + static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList) { @@ -1257,15 +1303,16 @@ CreateWorkerForPlacementSet(List *workersForPlacementList) } -static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - WorkerNode *sourceWorkerNode, - List *destinationWorkerNodesList) +static void +SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + WorkerNode *sourceWorkerNode, + List *destinationWorkerNodesList) { - - StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList); + StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList); int connectionFlags = FORCE_NEW_CONNECTION; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, @@ -1279,25 +1326,104 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, ClaimConnectionExclusively(sourceConnection); PGresult *result = NULL; - int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result); + int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, + splitShardReplicationUDF->data, + &result); if (queryResult != RESPONSE_OKAY || !IsResponseOK(result)) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), - errmsg("Failed to run worker_split_shard_replication_setup"))); + errmsg("Failed to run worker_split_shard_replication_setup"))); PQclear(result); ForgetResults(sourceConnection); } /* Get replication slot information */ - List * replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); + List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); - List * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList(sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList, - replicationSlotInfoList); + List *shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList( + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList, + replicationSlotInfoList); - LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, destinationWorkerNodesList); + LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList); } + +static void +AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) +{ + NodeShardMappingKey key; + key.nodeId = targetNodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); + + bool found = false; + NodeShardMappingEntry *nodeMappingEntry = + (NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER, + &found); + if (!found) + { + nodeMappingEntry->shardSplitInfoList = NIL; + } + + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); +} + + +static void +DropDummyShards() +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, DummyShardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId, + false /* missingOk */); + + int connectionFlags = FOR_DDL; + connectionFlags |= OUTSIDE_TRANSACTION; + MultiConnection *connection = GetNodeUserDatabaseConnection( + connectionFlags, + shardToBeDroppedNode->workerName, + shardToBeDroppedNode->workerPort, + CurrentUserName(), + NULL /* databaseName */); + + List *dummyShardIntervalList = entry->shardSplitInfoList; + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, dummyShardIntervalList) + { + TryDropShard(connection, shardInterval); + } + } +} + + +void +TryDropShard(MultiConnection *connection, ShardInterval *shardInterval) +{ + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + StringInfo dropShardQuery = makeStringInfo(); + + /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ + appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + + /* + * Perform a drop in best effort manner. + * The shard may or may not exist and the connection could have died. + */ + ExecuteOptionalRemoteCommand( + connection, + dropShardQuery->data, + NULL /* pgResult */); +} diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index da1b0c2bb..0d9a2b8f6 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -47,12 +47,13 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, HTAB *shardInfoHashMap); - -static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); + +static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap); -StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList); -char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo); +StringInfo GetSoureAndDestinationShardNames(List *shardSplitInfoList); +char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo); /* * worker_split_shard_replication_setup UDF creates in-memory data structures @@ -170,7 +171,7 @@ SetupHashMapForShardInfo() int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE); - HTAB * shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags); + HTAB *shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags); return shardInfoMap; } @@ -420,9 +421,10 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, *nodeId = DatumGetInt32(nodeIdDatum); } -static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap) + +static void +CreatePublishersForSplitChildren(HTAB *shardInfoHashMap) { - HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMap); @@ -436,21 +438,25 @@ static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap) int connectionFlags = FORCE_NEW_CONNECTION; printf("Sameer getting new connection \n"); MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - "localhost", - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(entry->shardSplitInfoList); + "localhost", + PostPortNumber, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames( + entry->shardSplitInfoList); StringInfo command = makeStringInfo(); - appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", nodeId, tableOwnerId,shardNamesForPublication->data); + appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", + nodeId, tableOwnerId, shardNamesForPublication->data); ExecuteCriticalRemoteCommand(sourceConnection, command->data); printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false)); } } -StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) + +StringInfo +GetSoureAndDestinationShardNames(List *shardSplitInfoList) { HASHCTL info; int flags = HASH_ELEM | HASH_CONTEXT; @@ -462,7 +468,7 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) info.hcxt = CurrentMemoryContext; HTAB *sourceShardIdSet = hash_create("Source ShardId Set", 128, &info, flags); - + /* Get child shard names */ StringInfo allShardNames = makeStringInfo(); bool addComma = false; @@ -475,7 +481,7 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) uint64 sourceShardId = shardSplitInfo->sourceShardId; hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found); - if(addComma) + if (addComma) { appendStringInfo(allShardNames, ","); } @@ -493,9 +499,9 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL) { ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry); - char* sourceShardName = ConstructQualifiedShardName(sourceShardInterval); + char *sourceShardName = ConstructQualifiedShardName(sourceShardInterval); - if(addComma) + if (addComma) { appendStringInfo(allShardNames, ","); } @@ -507,8 +513,9 @@ StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList) return allShardNames; } + char * -ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo) +ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo) { Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid); char *schemaName = get_namespace_name(schemaId); @@ -521,7 +528,10 @@ ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo) return shardName; } -static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) + +static void +ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc + tupleDescriptor) { HASH_SEQ_STATUS status; hash_seq_init(&status, shardInfoHashMap); @@ -536,13 +546,14 @@ static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *t memset(nulls, false, sizeof(nulls)); values[0] = Int32GetDatum(entry->key.nodeId); - - char * tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); + + char *tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); values[1] = CStringGetTextDatum(tableOwnerName); - char * slotName = encode_replication_slot(entry->key.nodeId, entry->key.tableOwnerId); + char *slotName = encode_replication_slot(entry->key.nodeId, + entry->key.tableOwnerId); values[2] = CStringGetTextDatum(slotName); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); } -} \ No newline at end of file +} diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 6820021b1..31029470a 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -136,19 +136,22 @@ static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); static uint64 TotalRelationSizeForSubscription(MultiConnection *connection, char *command); static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char * operationPrefix); + Bitmapset *tableOwnerIds, + char *operationPrefix); static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, - Bitmapset *tableOwnerIds, char * operationPrefix); + Bitmapset *tableOwnerIds, + char *operationPrefix); static char * ShardMovePublicationName(Oid ownerId); -static char * ShardSubscriptionName(Oid ownerId, char * operationPrefix); +static char * ShardSubscriptionName(Oid ownerId, char *operationPrefix); static void AcquireLogicalReplicationLock(void); static void DropAllShardMoveLeftovers(void); static void DropAllShardMoveSubscriptions(MultiConnection *connection); static void DropAllShardMoveReplicationSlots(MultiConnection *connection); static void DropAllShardMovePublications(MultiConnection *connection); static void DropAllShardMoveUsers(MultiConnection *connection); -static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix); +static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, + char *operationPrefix); static void DropShardMoveSubscription(MultiConnection *connection, char *subscriptionName); static void DropShardMoveReplicationSlot(MultiConnection *connection, @@ -227,14 +230,16 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo * subscription is not ready. There is no point of locking the shards before the * subscriptions for each relation becomes ready, so wait for it. */ - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, + SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * Wait until the subscription is caught up to changes that has happened * after the initial COPY on the shards. */ XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, + SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * Now lets create the post-load objects, such as the indexes, constraints @@ -244,7 +249,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo CreatePostLogicalReplicationDataLoadObjects(shardList, targetNodeName, targetNodePort); sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, + SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * We're almost done, we'll block the writes to the shards that we're @@ -257,7 +263,8 @@ LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePo BlockWritesToShardList(shardList); sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_MOVE_SUBSCRIPTION_PREFIX); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, + SHARD_MOVE_SUBSCRIPTION_PREFIX); /* * We're creating the foreign constraints to reference tables after the @@ -1085,7 +1092,8 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) * If replication slot can not be dropped while dropping the subscriber, drop * it here. */ - DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); + DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMovePublication(connection, ShardMovePublicationName(ownerId)); } } @@ -1144,7 +1152,7 @@ ShardMovePublicationName(Oid ownerId) * coordinator is blocked by the blocked replication process. */ static char * -ShardSubscriptionName(Oid ownerId, char * operationPrefix) +ShardSubscriptionName(Oid ownerId, char *operationPrefix) { if (RunningUnderIsolationTest) { @@ -1317,7 +1325,8 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds int ownerId = -1; while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) { - DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); + DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId)); } } @@ -1491,7 +1500,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "WITH (citus_use_authinfo=true, enabled=false)", - quote_identifier(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)), + quote_identifier(ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX)), quote_literal_cstr(conninfo->data), quote_identifier(ShardMovePublicationName(ownerId))); @@ -1500,7 +1510,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, pfree(createSubscriptionCommand); ExecuteCriticalRemoteCommand(connection, psprintf( "ALTER SUBSCRIPTION %s OWNER TO %s", - ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), + ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX), ShardMoveSubscriptionRole(ownerId) )); @@ -1519,7 +1530,8 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, ExecuteCriticalRemoteCommand(connection, psprintf( "ALTER SUBSCRIPTION %s ENABLE", - ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX) + ShardSubscriptionName(ownerId, + SHARD_MOVE_SUBSCRIPTION_PREFIX) )); } } @@ -1627,7 +1639,7 @@ GetRemoteLSN(MultiConnection *connection, char *command) */ void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char * operationPrefix) + Bitmapset *tableOwnerIds, char *operationPrefix) { uint64 previousTotalRelationSizeForSubscription = 0; TimestampTz previousSizeChangeTime = GetCurrentTimestamp(); @@ -1655,7 +1667,8 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, while (true) { /* we're done, all relations are ready */ - if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds, operationPrefix)) + if (RelationSubscriptionsAreReady(targetConnection, tableOwnerIds, + operationPrefix)) { ereport(LOG, (errmsg("The states of the relations belonging to the " "subscriptions became READY on the " @@ -1665,7 +1678,8 @@ WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, break; } - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, + operationPrefix); /* Get the current total size of tables belonging to the subscriber */ uint64 currentTotalRelationSize = @@ -1824,7 +1838,7 @@ TotalRelationSizeForSubscription(MultiConnection *connection, char *command) * be used in a query by using the IN operator. */ static char * -ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix) +ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix) { StringInfo subscriptionValueList = makeStringInfo(); appendStringInfoString(subscriptionValueList, "("); @@ -1842,7 +1856,8 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix first = false; } appendStringInfoString(subscriptionValueList, - quote_literal_cstr(ShardSubscriptionName(ownerId, operationPrefix))); + quote_literal_cstr(ShardSubscriptionName(ownerId, + operationPrefix))); } appendStringInfoString(subscriptionValueList, ")"); return subscriptionValueList->data; @@ -1855,11 +1870,12 @@ ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char * operationPrefix */ static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char * operationPrefix) + Bitmapset *tableOwnerIds, char *operationPrefix) { bool raiseInterrupts = false; - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, + operationPrefix); char *query = psprintf( "SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription " "WHERE srsubid = subid AND srsubstate != 'r' AND subname IN %s", @@ -1910,8 +1926,9 @@ RelationSubscriptionsAreReady(MultiConnection *targetConnection, * The function also reports its progress in every logicalReplicationProgressReportTimeout. */ void -WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr sourcePosition, - Bitmapset *tableOwnerIds, char * operationPrefix) +WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, XLogRecPtr + sourcePosition, + Bitmapset *tableOwnerIds, char *operationPrefix) { XLogRecPtr previousTargetPosition = 0; TimestampTz previousLSNIncrementTime = GetCurrentTimestamp(); @@ -2050,9 +2067,11 @@ WaitForMiliseconds(long timeout) * replication. */ static XLogRecPtr -GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, char * operationPrefix) +GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, + char *operationPrefix) { - char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, operationPrefix); + char *subscriptionValueList = ShardSubscriptionNamesValueList(tableOwnerIds, + operationPrefix); return GetRemoteLSN(connection, psprintf( "SELECT min(latest_end_lsn) FROM pg_stat_subscription " "WHERE subname IN %s", subscriptionValueList)); @@ -2062,64 +2081,66 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, c /*Refactor this for ShardMove too.*/ void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, - int sourceNodePort, char *userName, char *databaseName, - char * publicationName, - Oid ownerId) + int sourceNodePort, char *userName, char *databaseName, + char *publicationName, + Oid ownerId) { + StringInfo createSubscriptionCommand = makeStringInfo(); + StringInfo conninfo = makeStringInfo(); - StringInfo createSubscriptionCommand = makeStringInfo(); - StringInfo conninfo = makeStringInfo(); + /* + * The CREATE USER command should not propagate, so we temporarily + * disable DDL propagation. + */ + SendCommandListToWorkerOutsideTransaction( + connection->hostname, connection->port, connection->user, + list_make2( + "SET LOCAL citus.enable_ddl_propagation TO OFF;", + psprintf( + "CREATE USER %s SUPERUSER IN ROLE %s", + ShardMoveSubscriptionRole(ownerId), + GetUserNameFromId(ownerId, false) + ))); - /* - * The CREATE USER command should not propagate, so we temporarily - * disable DDL propagation. - */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, - list_make2( - "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf( - "CREATE USER %s SUPERUSER IN ROLE %s", - ShardMoveSubscriptionRole(ownerId), - GetUserNameFromId(ownerId, false) - ))); + appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " + "connect_timeout=20", + escape_param_str(sourceNodeName), sourceNodePort, + escape_param_str(userName), escape_param_str(databaseName)); - appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " - "connect_timeout=20", - escape_param_str(sourceNodeName), sourceNodePort, - escape_param_str(userName), escape_param_str(databaseName)); + appendStringInfo(createSubscriptionCommand, + "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " + "WITH (citus_use_authinfo=true, enabled=false)", + quote_identifier(ShardSubscriptionName(ownerId, + SHARD_SPLIT_SUBSCRIPTION_PREFIX)), + quote_literal_cstr(conninfo->data), + quote_identifier(publicationName)); - appendStringInfo(createSubscriptionCommand, - "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " - "WITH (citus_use_authinfo=true, enabled=false)", - quote_identifier(ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)), - quote_literal_cstr(conninfo->data), - quote_identifier(publicationName)); + ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); + pfree(createSubscriptionCommand->data); + pfree(createSubscriptionCommand); + ExecuteCriticalRemoteCommand(connection, psprintf( + "ALTER SUBSCRIPTION %s OWNER TO %s", + ShardSubscriptionName(ownerId, + SHARD_SPLIT_SUBSCRIPTION_PREFIX), + ShardMoveSubscriptionRole(ownerId) + )); - ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); - pfree(createSubscriptionCommand->data); - pfree(createSubscriptionCommand); - ExecuteCriticalRemoteCommand(connection, psprintf( - "ALTER SUBSCRIPTION %s OWNER TO %s", - ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX), - ShardMoveSubscriptionRole(ownerId) - )); + /* + * The ALTER ROLE command should not propagate, so we temporarily + * disable DDL propagation. + */ + SendCommandListToWorkerOutsideTransaction( + connection->hostname, connection->port, connection->user, + list_make2( + "SET LOCAL citus.enable_ddl_propagation TO OFF;", + psprintf( + "ALTER ROLE %s NOSUPERUSER", + ShardMoveSubscriptionRole(ownerId) + ))); - /* - * The ALTER ROLE command should not propagate, so we temporarily - * disable DDL propagation. - */ - SendCommandListToWorkerOutsideTransaction( - connection->hostname, connection->port, connection->user, - list_make2( - "SET LOCAL citus.enable_ddl_propagation TO OFF;", - psprintf( - "ALTER ROLE %s NOSUPERUSER", - ShardMoveSubscriptionRole(ownerId) - ))); - - ExecuteCriticalRemoteCommand(connection, psprintf( - "ALTER SUBSCRIPTION %s ENABLE", - ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX) - )); + ExecuteCriticalRemoteCommand(connection, psprintf( + "ALTER SUBSCRIPTION %s ENABLE", + ShardSubscriptionName(ownerId, + SHARD_SPLIT_SUBSCRIPTION_PREFIX) + )); } diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index b66604ad3..8685d9d81 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -26,34 +26,43 @@ static HTAB *ShardInfoHashMapForPublications = NULL; /* function declarations */ -static void AddShardEntryInMap(Oid tableOwner, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval); -ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIdList, List * replicationSlotInfoList); +static void AddPublishableShardEntryInMap(uint32 targetNodeId, + ShardInterval *shardInterval, bool + isChildShardInterval); +ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, + List *shardIdList, + List *replicationSlotInfoList); -static void -CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, - uint32_t publicationForTargetNodeId, Oid tableOwner); -static void -CreateShardSplitPublications(MultiConnection *sourceConnection, List * shardSplitPubSubMetadataList); -static void -CreateShardSplitSubscriptions(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList, WorkerNode * sourceWorkerNode, char * superUser, char * databaseName); -static void -WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); +static void CreateShardSplitPublicationForNode(MultiConnection *connection, + List *shardList, + uint32_t publicationForTargetNodeId, Oid + tableOwner); +static void CreateShardSplitPublications(MultiConnection *sourceConnection, + List *shardSplitPubSubMetadataList); +static void CreateShardSplitSubscriptions(List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList, + WorkerNode *sourceWorkerNode, char *superUser, + char *databaseName); +static void WaitForShardSplitRelationSubscriptionsBecomeReady( + List *targetNodeConnectionList, List *shardSplitPubSubMetadataList); -static void -WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); +static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, + List * + targetNodeConnectionList, + List * + shardSplitPubSubMetadataList); - -static char * -ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); +static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); /*used for debuggin. Remove later*/ -void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata); +void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata); -StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList) +StringInfo +CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) { - StringInfo splitChildrenRows = makeStringInfo(); + StringInfo splitChildrenRows = makeStringInfo(); ShardInterval *sourceShardIntervalToCopy = NULL; List *splitChildShardIntervalList = NULL; @@ -66,7 +75,7 @@ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardInterva ShardInterval *splitChildShardInterval = NULL; WorkerNode *destinationWorkerNode = NULL; forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, - destinationWorkerNode, destinationWorkerNodesList) + destinationWorkerNode, destinationWorkerNodesList) { if (addComma) { @@ -74,62 +83,69 @@ StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardInterva } StringInfo minValueString = makeStringInfo(); - appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue)); + appendStringInfo(minValueString, "%d", DatumGetInt32( + splitChildShardInterval->minValue)); StringInfo maxValueString = makeStringInfo(); - appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue)); + appendStringInfo(maxValueString, "%d", DatumGetInt32( + splitChildShardInterval->maxValue)); appendStringInfo(splitChildrenRows, - "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", - sourceShardId, - splitChildShardInterval->shardId, - quote_literal_cstr(minValueString->data), - quote_literal_cstr(maxValueString->data), - destinationWorkerNode->nodeId); + "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", + sourceShardId, + splitChildShardInterval->shardId, + quote_literal_cstr(minValueString->data), + quote_literal_cstr(maxValueString->data), + destinationWorkerNode->nodeId); - addComma = true; + addComma = true; } } StringInfo splitShardReplicationUDF = makeStringInfo(); - appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data); + appendStringInfo(splitShardReplicationUDF, + "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", + splitChildrenRows->data); - return splitShardReplicationUDF; + return splitShardReplicationUDF; } -List * ParseReplicationSlotInfoFromResult(PGresult * result) + +List * +ParseReplicationSlotInfoFromResult(PGresult *result) { - int64 rowCount = PQntuples(result); + int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); - List *replicationSlotInfoList = NIL; - for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) + List *replicationSlotInfoList = NIL; + for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { - ReplicationSlotInfo * replicationSlotInfo = (ReplicationSlotInfo *)palloc0(sizeof(ReplicationSlotInfo)); + ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( + sizeof(ReplicationSlotInfo)); - char * targeNodeIdString = PQgetvalue(result, rowIndex, 0); - replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); + char *targeNodeIdString = PQgetvalue(result, rowIndex, 0); + replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); - /* we're using the pstrdup to copy the data into the current memory context */ - replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1)); + /* we're using the pstrdup to copy the data into the current memory context */ + replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1)); - replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2)); + replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2)); - replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); + replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); } - - /*TODO(saawasek): size of this should not be NULL */ - return replicationSlotInfoList; + + /*TODO(saawasek): size of this should not be NULL */ + return replicationSlotInfoList; } -List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList, - List *replicationSlotInfoList) +List * +CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList, + List *replicationSlotInfoList) { - ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); + ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); ShardInterval *sourceShardIntervalToCopy = NULL; List *splitChildShardIntervalList = NULL; forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, @@ -138,247 +154,276 @@ List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList ShardInterval *splitChildShardInterval = NULL; WorkerNode *destinationWorkerNode = NULL; forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, - destinationWorkerNode, destinationWorkerNodesList) + destinationWorkerNode, destinationWorkerNodesList) { - /* Table owner is same for both parent and child shard */ - Oid tableOwnerId = TableOwnerOid(sourceShardIntervalToCopy->relationId); - uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId; + uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId; - /* Add split child shard interval */ - AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, splitChildShardInterval, true /*isChildShardInterval*/); + /* Add split child shard interval */ + AddPublishableShardEntryInMap(destinationWorkerNodeId, + splitChildShardInterval, + true /*isChildShardInterval*/); - /* Add parent shard interval if not already added */ - AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, sourceShardIntervalToCopy, false /*isChildShardInterval*/); + /* Add parent shard interval if not already added */ + AddPublishableShardEntryInMap(destinationWorkerNodeId, + sourceShardIntervalToCopy, + false /*isChildShardInterval*/); } } - /* Populate pubsub meta data*/ - HASH_SEQ_STATUS status; + /* Populate pubsub meta data*/ + HASH_SEQ_STATUS status; hash_seq_init(&status, ShardInfoHashMapForPublications); - - List * shardSplitPubSubMetadataList = NIL; - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32_t nodeId = entry->key.nodeId; + + List *shardSplitPubSubMetadataList = NIL; + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32_t nodeId = entry->key.nodeId; uint32_t tableOwnerId = entry->key.tableOwnerId; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, entry->shardSplitInfoList, replicationSlotInfoList); + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = + CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, + entry->shardSplitInfoList, + replicationSlotInfoList); - shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, shardSplitPubSubMetadata); - } + shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, + shardSplitPubSubMetadata); + } - return shardSplitPubSubMetadataList; + return shardSplitPubSubMetadataList; } -static void AddShardEntryInMap(Oid tableOwnerId, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval) + +static void +AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool + isChildShardInterval) { - NodeShardMappingKey key; - key.nodeId = nodeId; - key.tableOwnerId = tableOwnerId; + NodeShardMappingKey key; + key.nodeId = targetNodeId; + key.tableOwnerId = TableOwnerOid(shardInterval->relationId); bool found = false; NodeShardMappingEntry *nodeMappingEntry = - (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, HASH_ENTER, + (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, + HASH_ENTER, &found); if (!found) { nodeMappingEntry->shardSplitInfoList = NIL; } - - if(isChildShardInterval) - { - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); - return; - } - ShardInterval * existingShardInterval = NULL; - foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) - { - if(existingShardInterval->shardId == shardInterval->shardId) - { - /* parent shard interval is already added hence return */ - return; - } - } + if (isChildShardInterval) + { + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, + (ShardInterval *) shardInterval); + return; + } - /* Add parent shard Interval */ - nodeMappingEntry->shardSplitInfoList = - lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + ShardInterval *existingShardInterval = NULL; + foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) + { + if (existingShardInterval->shardId == shardInterval->shardId) + { + /* parent shard interval is already added hence return */ + return; + } + } + /* Add parent shard Interval */ + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); } -ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIntervalList, List * replicationSlotInfoList) +ShardSplitPubSubMetadata * +CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardIntervalList, + List *replicationSlotInfoList) { + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = palloc0( + sizeof(ShardSplitPubSubMetadata)); + shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; + shardSplitPubSubMetadata->tableOwnerId = tableOwnerId; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = palloc0(sizeof(ShardSplitPubSubMetadata)); - shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; - shardSplitPubSubMetadata->tableOwnerId = tableOwnerId; + char *tableOwnerName = GetUserNameFromId(tableOwnerId, false); + ReplicationSlotInfo *replicationSlotInfo = NULL; + foreach_ptr(replicationSlotInfo, replicationSlotInfoList) + { + if (nodeId == replicationSlotInfo->targetNodeId && + strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) + { + shardSplitPubSubMetadata->slotInfo = replicationSlotInfo; + break; + } + } - char * tableOwnerName = GetUserNameFromId(tableOwnerId, false); - ReplicationSlotInfo * replicationSlotInfo = NULL; - foreach_ptr(replicationSlotInfo, replicationSlotInfoList) - { - if(nodeId == replicationSlotInfo->targetNodeId && - strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) - { - shardSplitPubSubMetadata->slotInfo = replicationSlotInfo; - break; - } - } - - return shardSplitPubSubMetadata; + return shardSplitPubSubMetadata; } -void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, - List* shardSplitPubSubMetadataList, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList) +void +LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, + List *shardSplitPubSubMetadataList, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) { - char *superUser = CitusExtensionOwnerName(); + char *superUser = CitusExtensionOwnerName(); char *databaseName = get_database_name(MyDatabaseId); int connectionFlags = FORCE_NEW_CONNECTION; - /* Get source node connection */ - MultiConnection *sourceConnection = - GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, sourceWorkerNode->workerPort, - superUser, databaseName); - - ClaimConnectionExclusively(sourceConnection); + /* Get source node connection */ + MultiConnection *sourceConnection = + GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, + sourceWorkerNode->workerPort, + superUser, databaseName); - List * targetNodeConnectionList = NIL; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - WorkerNode * targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); - - MultiConnection *targetConnection = - GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, targetWorkerNode->workerPort, - superUser, databaseName); - ClaimConnectionExclusively(targetConnection); + ClaimConnectionExclusively(sourceConnection); - targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); - } + List *targetNodeConnectionList = NIL; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); - /* create publications */ - CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); + MultiConnection *targetConnection = + GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, + targetWorkerNode->workerPort, + superUser, databaseName); + ClaimConnectionExclusively(targetConnection); - CreateShardSplitSubscriptions(targetNodeConnectionList, - shardSplitPubSubMetadataList, - sourceWorkerNode, - superUser, - databaseName); + targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); + } - WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, shardSplitPubSubMetadataList); - - XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); + /* create publications */ + CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); - CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, destinationWorkerNodesList); + CreateShardSplitSubscriptions(targetNodeConnectionList, + shardSplitPubSubMetadataList, + sourceWorkerNode, + superUser, + databaseName); - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); - - BlockWritesToShardList(sourceColocatedShardIntervalList); - - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, targetNodeConnectionList, shardSplitPubSubMetadataList); - - /*TOOD : Create foreign key constraints and handle partitioned tables*/ + WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, + shardSplitPubSubMetadataList); + + XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + targetNodeConnectionList, + shardSplitPubSubMetadataList); + + CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, + destinationWorkerNodesList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + targetNodeConnectionList, + shardSplitPubSubMetadataList); + + BlockWritesToShardList(sourceColocatedShardIntervalList); + + sourcePosition = GetRemoteLogPosition(sourceConnection); + WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, + targetNodeConnectionList, + shardSplitPubSubMetadataList); + + + /*TOOD : Create foreign key constraints and handle partitioned tables*/ } -void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata) +void +PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata) { - printf("sameer: ShardSplitPubSbuMetadata\n"); - ReplicationSlotInfo * replicationInfo = shardSplitMetadata->slotInfo; + printf("sameer: ShardSplitPubSbuMetadata\n"); + ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo; - List * shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription; - printf("shardIds: "); - ShardInterval * shardInterval = NULL; - foreach_ptr(shardInterval, shardIntervalList) - { - printf("%ld ", shardInterval->shardId); - } + List *shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription; + printf("shardIds: "); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + printf("%ld ", shardInterval->shardId); + } - printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(shardSplitMetadata->tableOwnerId, false)); - printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, replicationInfo->targetNodeId, replicationInfo->tableOwnerName); + printf("\nManual Username from OID at source: %s \n", GetUserNameFromId( + shardSplitMetadata->tableOwnerId, false)); + printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, + replicationInfo->targetNodeId, replicationInfo->tableOwnerName); } static void -CreateShardSplitPublications(MultiConnection *sourceConnection, List *shardSplitPubSubMetadataList) +CreateShardSplitPublications(MultiConnection *sourceConnection, + List *shardSplitPubSubMetadataList) { - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; - CreateShardSplitPublicationForNode(sourceConnection, - shardSplitPubSubMetadata->shardIntervalListForSubscription, - publicationForNodeId, - tableOwnerId); - } + CreateShardSplitPublicationForNode(sourceConnection, + shardSplitPubSubMetadata-> + shardIntervalListForSubscription, + publicationForNodeId, + tableOwnerId); + } } + static void -CreateShardSplitSubscriptions(List * targetNodeConnectionList, - List * shardSplitPubSubMetadataList, - WorkerNode * sourceWorkerNode, - char * superUser, - char * databaseName) +CreateShardSplitSubscriptions(List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList, + WorkerNode *sourceWorkerNode, + char *superUser, + char *databaseName) { - MultiConnection * targetConnection = NULL; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; - CreateShardSubscription(targetConnection, - sourceWorkerNode->workerName, - sourceWorkerNode->workerPort, - superUser, - databaseName, - ShardSplitPublicationName(publicationForNodeId, ownerId), - ownerId); - } + MultiConnection *targetConnection = NULL; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; + CreateShardSubscription(targetConnection, + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort, + superUser, + databaseName, + ShardSplitPublicationName(publicationForNodeId, ownerId), + ownerId); + } } static void CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, - uint32_t publicationForTargetNodeId, Oid ownerId) + uint32_t publicationForTargetNodeId, Oid ownerId) { + StringInfo createPublicationCommand = makeStringInfo(); + bool prefixWithComma = false; - StringInfo createPublicationCommand = makeStringInfo(); - bool prefixWithComma = false; + appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", + ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); - appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", - ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); + ShardInterval *shard = NULL; + foreach_ptr(shard, shardList) + { + char *shardName = ConstructQualifiedShardName(shard); - ShardInterval *shard = NULL; - foreach_ptr(shard, shardList) - { - char *shardName = ConstructQualifiedShardName(shard); + if (prefixWithComma) + { + appendStringInfoString(createPublicationCommand, ","); + } - if (prefixWithComma) - { - appendStringInfoString(createPublicationCommand, ","); - } + appendStringInfoString(createPublicationCommand, shardName); + prefixWithComma = true; + } - appendStringInfoString(createPublicationCommand, shardName); - prefixWithComma = true; - } - - ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); - pfree(createPublicationCommand->data); - pfree(createPublicationCommand); + ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); + pfree(createPublicationCommand->data); + pfree(createPublicationCommand); } @@ -390,29 +435,37 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) static void -WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) +WaitForShardSplitRelationSubscriptionsBecomeReady(List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList) { - MultiConnection * targetConnection = NULL; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = NULL; - tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } + MultiConnection *targetConnection = NULL; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, + shardSplitPubSubMetadata->tableOwnerId); + WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, + SHARD_SPLIT_SUBSCRIPTION_PREFIX); + } } + static void -WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) +WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, + List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList) { - MultiConnection * targetConnection = NULL; - ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - Bitmapset *tableOwnerIds = NULL; - tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); - } + MultiConnection *targetConnection = NULL; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, + shardSplitPubSubMetadata->tableOwnerId); + WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, + SHARD_SPLIT_SUBSCRIPTION_PREFIX); + } } diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index c5a6f9612..f19106ab3 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -28,18 +28,19 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); -extern void -CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, - int sourceNodePort, char *userName, char *databaseName, - char * publicationName, - Oid ownerId); +extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, + int sourceNodePort, char *userName, + char *databaseName, + char *publicationName, + Oid ownerId); extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds, char * operationPrefix); + Bitmapset *tableOwnerIds, + char *operationPrefix); extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, - XLogRecPtr sourcePosition, - Bitmapset *tableOwnerIds, - char * operationPrefix); + XLogRecPtr sourcePosition, + Bitmapset *tableOwnerIds, + char *operationPrefix); #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" #define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index dbeb306bf..156d378b2 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -41,8 +41,8 @@ typedef struct ShardSplitInfo int32 shardMinValue; /* min hash value */ int32 shardMaxValue; /* max hash value */ uint32_t nodeId; /* node where child shard is to be placed */ - uint64 sourceShardId; /* parent shardId */ - uint64 splitChildShardId; /* child shardId*/ + uint64 sourceShardId; /* parent shardId */ + uint64 splitChildShardId; /* child shardId*/ char slotName[NAMEDATALEN]; /* replication slot name belonging to this node */ } ShardSplitInfo; diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index 652582b6c..e366c5b86 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -12,16 +12,16 @@ typedef struct ReplicationSlotInfo { - uint32 targetNodeId; - char * tableOwnerName; - char * slotName; + uint32 targetNodeId; + char *tableOwnerName; + char *slotName; } ReplicationSlotInfo; typedef struct ShardSplitPubSubMetadata { - List * shardIntervalListForSubscription; + List *shardIntervalListForSubscription; Oid tableOwnerId; - ReplicationSlotInfo *slotInfo; + ReplicationSlotInfo *slotInfo; } ShardSplitPubSubMetadata; /* key for NodeShardMappingEntry */ @@ -42,20 +42,22 @@ extern uint32 NodeShardMappingHash(const void *key, Size keysize); extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); HTAB * SetupHashMapForShardInfo(void); -List * ParseReplicationSlotInfoFromResult(PGresult * result); +List * ParseReplicationSlotInfoFromResult(PGresult *result); -extern StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList); +extern StringInfo CreateSplitShardReplicationSetupUDF( + List *sourceColocatedShardIntervalList, + List * + shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList, - List *replicationSlotInfoList); + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList, + List *replicationSlotInfoList); extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, - List* shardSplitPubSubMetadataList, + List *shardSplitPubSubMetadataList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ \ No newline at end of file +#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index 3ac378f06..8f526bd49 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -113,8 +113,8 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node], - 'non_blocking'); + ARRAY[:worker_2_node, :worker_2_node], + 'force_logical'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') @@ -165,6 +165,14 @@ NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') @@ -173,14 +181,121 @@ NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info]) +NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info]) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_second_102,citus_split_shard_by_split_points_negative.table_second_2,citus_split_shard_by_split_points_negative.table_second_103 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE USER citus_shard_move_subscription_role_10 SUPERUSER IN ROLE postgres +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_move_subscription_role_10 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER ROLE citus_shard_move_subscription_role_10 NOSUPERUSER +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2]) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2]) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT pg_current_wal_lsn() +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_102 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_103 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit citus_split_shard_by_split_points @@ -196,27 +311,21 @@ SELECT * FROM show_catalog; Schema | Name | Owner --------------------------------------------------------------------- citus_split_shard_by_split_points_negative | table_second | postgres + citus_split_shard_by_split_points_negative | table_second_102 | postgres citus_split_shard_by_split_points_negative | table_second_103 | postgres - citus_split_shard_by_split_points_negative | table_second_2 | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_1 | postgres + citus_split_shard_by_split_points_negative | table_to_split_100 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres (6 rows) \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; - Schema | Name | Owner + Schema | Name | Owner --------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_second | postgres - citus_split_shard_by_split_points_negative | table_second_102 | postgres - citus_split_shard_by_split_points_negative | table_second_103 | postgres - citus_split_shard_by_split_points_negative | table_second_2 | postgres - citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_1 | postgres - citus_split_shard_by_split_points_negative | table_to_split_100 | postgres - citus_split_shard_by_split_points_negative | table_to_split_101 | postgres -(8 rows) + citus_split_shard_by_split_points_negative | table_second | postgres + citus_split_shard_by_split_points_negative | table_to_split | postgres +(2 rows) SELECT * FROM pg_publication_tables; pubname | schemaname | tablename diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index d95c7007c..560bf42a9 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -18,7 +18,7 @@ SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -SELECT * FROM citus_shards; +SELECT * FROM citus_shards; SELECT * FROM pg_dist_shard; SET client_min_messages TO LOG; @@ -49,17 +49,15 @@ SELECT citus_split_shard_by_split_points( 1, ARRAY['-1073741826'], ARRAY[:worker_2_node, :worker_2_node], - 'non_blocking'); --- On worker2, we want child shard 2 and dummy shard 1 -- + 'force_logical'); +-- On worker2, we want child shard 2 and dummy shard 1 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 -- \c - - - :worker_2_port SET search_path TO citus_split_shard_by_split_points_negative; -SELECT * FROM pg_stat_subscription; -SELECT * FROM pg_subscription_rel; SELECT * FROM show_catalog; \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; -SELECT * FROM pg_publication_tables; \ No newline at end of file +SELECT * FROM pg_publication_tables;