From 1c617e7d1d5d2a155ef664ffe516e3a3cbee6b96 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Sat, 16 Jul 2022 15:38:51 +0530 Subject: [PATCH] Added code to drop publications and subscriptions --- .../connection/connection_configuration.c | 7 + .../connection/connection_management.c | 17 + .../distributed/operations/shard_split.c | 222 ++++++++++-- .../replication/multi_logical_replication.c | 54 ++- .../shardsplit_logical_replication.c | 340 ++++++++++++------ .../distributed/connection_management.h | 11 +- .../distributed/multi_logical_replication.h | 9 + .../shardsplit_logical_replication.h | 27 +- src/test/regress/expected/citus_sameer.out | 206 ++--------- src/test/regress/sql/citus_sameer.sql | 8 +- 10 files changed, 531 insertions(+), 370 deletions(-) diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 88828d4cb..73c925dfa 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -377,6 +377,13 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, authParamsIdx++; } + if (key->replication) + { + connKeywords[authParamsIdx] = "replication"; + connValues[authParamsIdx] = "database"; + authParamsIdx++; + } + PQconninfoFree(optionArray); /* final step: add terminal NULL, required by libpq */ diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index df6096321..ce177e978 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -290,6 +290,15 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, strlcpy(key.database, CurrentDatabaseName(), NAMEDATALEN); } + if (flags & EXCLUSIVE_AND_REPLICATION) + { + key.replication = true; + } + else + { + key.replication = false; + } + if (CurrentCoordinatedTransactionState == COORD_TRANS_NONE) { CurrentCoordinatedTransactionState = COORD_TRANS_IDLE; @@ -347,6 +356,10 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, MultiConnection *connection = MemoryContextAllocZero(ConnectionContext, sizeof(MultiConnection)); connection->initilizationState = POOL_STATE_NOT_INITIALIZED; + if (flags & EXCLUSIVE_AND_REPLICATION) + { + connection->claimedExclusively = true; + } dlist_push_tail(entry->connections, &connection->connectionNode); /* these two flags are by nature cannot happen at the same time */ @@ -666,6 +679,7 @@ CloseConnection(MultiConnection *connection) strlcpy(key.hostname, connection->hostname, MAX_NODE_LENGTH); key.port = connection->port; + key.replication = connection->replication; strlcpy(key.user, connection->user, NAMEDATALEN); strlcpy(key.database, connection->database, NAMEDATALEN); @@ -1210,6 +1224,7 @@ ConnectionHashHash(const void *key, Size keysize) hash = hash_combine(hash, hash_uint32(entry->port)); hash = hash_combine(hash, string_hash(entry->user, NAMEDATALEN)); hash = hash_combine(hash, string_hash(entry->database, NAMEDATALEN)); + hash = hash_combine(hash, hash_uint32(entry->replication)); return hash; } @@ -1223,6 +1238,7 @@ ConnectionHashCompare(const void *a, const void *b, Size keysize) if (strncmp(ca->hostname, cb->hostname, MAX_NODE_LENGTH) != 0 || ca->port != cb->port || + ca->replication != cb->replication || strncmp(ca->user, cb->user, NAMEDATALEN) != 0 || strncmp(ca->database, cb->database, NAMEDATALEN) != 0) { @@ -1250,6 +1266,7 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key connection->port = key->port; strlcpy(connection->database, key->database, NAMEDATALEN); strlcpy(connection->user, key->user, NAMEDATALEN); + connection->replication = key->replication; connection->pgConn = PQconnectStartParams((const char **) entry->keywords, (const char **) entry->values, diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 86f8b11b1..43fc50337 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -57,7 +57,8 @@ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalLis List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, List *workersForPlacementList); -static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, +static void SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, + List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, List *workersForPlacementList); @@ -81,10 +82,12 @@ static void NonBlockingShardSplit(SplitOperation splitOperation, static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, - List *workersForPlacementList); + List *workersForPlacementList, + char *snapShotName); static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, - List *workersForPlacementList); + List *workersForPlacementList, + char *snapShotName); static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, @@ -94,9 +97,14 @@ static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode); +static StringInfo CreateSplitShardReplicationSetupUDF( + List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static void DropDummyShards(void); void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); +char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, + WorkerNode *sourceWorkerNode); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -632,7 +640,7 @@ CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, workersForPlacementList); DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, workersForPlacementList); + shardGroupSplitIntervalListList, workersForPlacementList, NULL); /* Create auxiliary structures (indexes, stats, replicaindentities, triggers) */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, @@ -649,7 +657,8 @@ CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, */ static void DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList) + List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList, + char *snapShotName) { ShardInterval *sourceShardIntervalToCopy = NULL; List *splitShardIntervalList = NIL; @@ -661,7 +670,8 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, { StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, splitShardIntervalList, - destinationWorkerNodesList); + destinationWorkerNodesList, + snapShotName); Task *splitCopyTask = CreateBasicTask( sourceShardIntervalToCopy->shardId, /* jobId */ @@ -709,7 +719,8 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, - List *destinationWorkerNodesList) + List *destinationWorkerNodesList, + char *snapShotName) { StringInfo splitCopyInfoArray = makeStringInfo(); appendStringInfo(splitCopyInfoArray, "ARRAY["); @@ -743,7 +754,31 @@ CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, sourceShardSplitInterval->shardId, splitCopyInfoArray->data); - return splitCopyUdf; + if (snapShotName == NULL) + { + return splitCopyUdf; + } + + + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + + StringInfo commitTransaction = makeStringInfo(); + appendStringInfo(commitTransaction, "COMMIT;"); + + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", quote_literal_cstr( + snapShotName)); + + StringInfo snapShottedCopyUDF = makeStringInfo(); + appendStringInfo(snapShottedCopyUDF, "%s%s%s%s", beginTransaction->data, + snapShotString->data, splitCopyUdf->data, commitTransaction->data); + + printf("sameer value:%s\n", snapShottedCopyUDF->data); + printf("sameer actual value :%s \n", splitCopyUdf->data); + + return snapShottedCopyUDF; } @@ -1109,6 +1144,12 @@ NonBlockingShardSplit(SplitOperation splitOperation, shardGroupSplitIntervalListList, workersForPlacementList); + /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ + /* shardIntervalToSplit, sourceShardToCopyNode); */ + + // DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, + // shardGroupSplitIntervalListList, workersForPlacementList, NULL); + CreateDummyShardsForShardGroup( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, @@ -1116,32 +1157,32 @@ NonBlockingShardSplit(SplitOperation splitOperation, workersForPlacementList); /*TODO: Refactor this method. BlockWrites is within this as of now, take it out */ - SplitShardReplicationSetup( - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - sourceShardToCopyNode, - workersForPlacementList); + SplitShardReplicationSetup(shardIntervalToSplit, + sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + sourceShardToCopyNode, + workersForPlacementList); - /* - * Drop old shards and delete related metadata. Have to do that before - * creating the new shard metadata, because there's cross-checks - * preventing inconsistent metadata (like overlapping shards). - */ - DropShardList(sourceColocatedShardIntervalList); + // /* + // * Drop old shards and delete related metadata. Have to do that before + // * creating the new shard metadata, because there's cross-checks + // * preventing inconsistent metadata (like overlapping shards). + // */ + // DropShardList(sourceColocatedShardIntervalList); - /* Insert new shard and placement metdata */ - InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, - workersForPlacementList); + // /* Insert new shard and placement metdata */ + // InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, + // workersForPlacementList); - /* - * Create foreign keys if exists after the metadata changes happening in - * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign - * key creation depends on the new metadata. - */ - CreateForeignKeyConstraints(shardGroupSplitIntervalListList, - workersForPlacementList); + // /* + // * Create foreign keys if exists after the metadata changes happening in + // * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign + // * key creation depends on the new metadata. + // */ + // CreateForeignKeyConstraints(shardGroupSplitIntervalListList, + // workersForPlacementList); - DropDummyShards(); + // DropDummyShards(); } PG_CATCH(); { @@ -1304,7 +1345,8 @@ CreateWorkerForPlacementSet(List *workersForPlacementList) static void -SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, +SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, + List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList) @@ -1341,17 +1383,37 @@ SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, /* Get replication slot information */ List *replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); - List *shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList( + PQclear(result); + ForgetResults(sourceConnection); + + + // /* // / *Create Template replication slot * / */ + // /* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */ + // /* shardIntervalToSplit, sourceWorkerNode); */ + // List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( + // sourceColocatedShardIntervalList, + // shardGroupSplitIntervalListList, + // destinationWorkerNodesList, + // replicationSlotInfoList); + // earlier the above method used to take replication slot info as information + + + + // LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, + // sourceColocatedShardIntervalList, + // shardGroupSplitIntervalListList, + // destinationWorkerNodesList); + + HTAB * shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, - destinationWorkerNodesList, - replicationSlotInfoList); + destinationWorkerNodesList); + DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); - LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList); + CreateShardSplitPublicationsTwo(sourceConnection, shardSplitHashMapForPublication); + + //DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); } @@ -1379,6 +1441,12 @@ AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) static void DropDummyShards() { + /* Return if no dummy shards are created */ + if (DummyShardInfoHashMap == NULL) + { + return; + } + HASH_SEQ_STATUS status; hash_seq_init(&status, DummyShardInfoHashMap); @@ -1427,3 +1495,81 @@ TryDropShard(MultiConnection *connection, ShardInterval *shardInterval) dropShardQuery->data, NULL /* pgResult */); } + + +StringInfo +CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) +{ + StringInfo splitChildrenRows = makeStringInfo(); + + ShardInterval *sourceShardIntervalToCopy = NULL; + List *splitChildShardIntervalList = NULL; + bool addComma = false; + forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitChildShardIntervalList, shardGroupSplitIntervalListList) + { + int64 sourceShardId = sourceShardIntervalToCopy->shardId; + + ShardInterval *splitChildShardInterval = NULL; + WorkerNode *destinationWorkerNode = NULL; + forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, + destinationWorkerNode, destinationWorkerNodesList) + { + if (addComma) + { + appendStringInfo(splitChildrenRows, ","); + } + + StringInfo minValueString = makeStringInfo(); + appendStringInfo(minValueString, "%d", DatumGetInt32( + splitChildShardInterval->minValue)); + + StringInfo maxValueString = makeStringInfo(); + appendStringInfo(maxValueString, "%d", DatumGetInt32( + splitChildShardInterval->maxValue)); + + appendStringInfo(splitChildrenRows, + "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", + sourceShardId, + splitChildShardInterval->shardId, + quote_literal_cstr(minValueString->data), + quote_literal_cstr(maxValueString->data), + destinationWorkerNode->nodeId); + + addComma = true; + } + } + + StringInfo splitShardReplicationUDF = makeStringInfo(); + appendStringInfo(splitShardReplicationUDF, + "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s]);", + splitChildrenRows->data); + + return splitShardReplicationUDF; +} + + +char * +CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, + WorkerNode *sourceWorkerNode) +{ + /*Create Template replication slot */ + int connectionFlags = FORCE_NEW_CONNECTION; + connectionFlags |= EXCLUSIVE_AND_REPLICATION; + + MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, + sourceWorkerNode-> + workerName, + sourceWorkerNode-> + workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + + char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, + sourceConnection); + + return snapShotName; +} diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 31029470a..5b89f900e 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -152,12 +152,8 @@ static void DropAllShardMovePublications(MultiConnection *connection); static void DropAllShardMoveUsers(MultiConnection *connection); static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix); -static void DropShardMoveSubscription(MultiConnection *connection, - char *subscriptionName); static void DropShardMoveReplicationSlot(MultiConnection *connection, char *publicationName); -static void DropShardMovePublication(MultiConnection *connection, char *publicationName); -static void DropShardMoveUser(MultiConnection *connection, char *username); /* * LogicallyReplicateShards replicates a list of shards from one node to another @@ -1094,7 +1090,7 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) */ DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); - DropShardMovePublication(connection, ShardMovePublicationName(ownerId)); + DropShardPublication(connection, ShardMovePublicationName(ownerId)); } } @@ -1117,11 +1113,11 @@ DropShardMoveReplicationSlot(MultiConnection *connection, char *replicationSlotN /* - * DropShardMovePublication drops the publication with the given name if it + * DropShardPublication drops the publication with the given name if it * exists. */ -static void -DropShardMovePublication(MultiConnection *connection, char *publicationName) +void +DropShardPublication(MultiConnection *connection, char *publicationName) { ExecuteCriticalRemoteCommand(connection, psprintf( "DROP PUBLICATION IF EXISTS %s", @@ -1166,13 +1162,13 @@ ShardSubscriptionName(Oid ownerId, char *operationPrefix) /* - * ShardMoveSubscriptionRole returns the name of the role used by the + * ShardSubscriptionRole returns the name of the role used by the * subscription that subscribes to the tables of the given owner. */ static char * -ShardMoveSubscriptionRole(Oid ownerId) +ShardSubscriptionRole(Oid ownerId, char * operationPrefix) { - return psprintf("%s%i", SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX, ownerId); + return psprintf("%s%i", operationPrefix, ownerId); } @@ -1181,7 +1177,7 @@ ShardMoveSubscriptionRole(Oid ownerId) * strings. This query is executed on the connection and the function then * returns the results of the query in a List. */ -static List * +List * GetQueryResultStringList(MultiConnection *connection, char *query) { bool raiseInterrupts = true; @@ -1242,7 +1238,7 @@ DropAllShardMoveSubscriptions(MultiConnection *connection) char *subscriptionName; foreach_ptr(subscriptionName, subscriptionNameList) { - DropShardMoveSubscription(connection, subscriptionName); + DropShardSubscription(connection, subscriptionName); } } @@ -1263,7 +1259,7 @@ DropAllShardMoveUsers(MultiConnection *connection) char *username; foreach_ptr(username, usernameList) { - DropShardMoveUser(connection, username); + DropShardUser(connection, username); } } @@ -1305,7 +1301,7 @@ DropAllShardMovePublications(MultiConnection *connection) char *publicationName; foreach_ptr(publicationName, publicationNameList) { - DropShardMovePublication(connection, publicationName); + DropShardPublication(connection, publicationName); } } @@ -1325,22 +1321,22 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds int ownerId = -1; while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) { - DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, + DropShardSubscription(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); - DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId)); + DropShardUser(connection, ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX)); } } /* - * DropShardMoveSubscription drops subscription with the given name on the + * DropShardSubscription drops subscription with the given name on the * subscriber node. Note that, it also drops the replication slot on the * publisher node if it can drop the slot as well with the DROP SUBSCRIPTION * command. Otherwise, only the subscription will be deleted with DROP * SUBSCRIPTION via the connection. */ -static void -DropShardMoveSubscription(MultiConnection *connection, char *subscriptionName) +void +DropShardSubscription(MultiConnection *connection, char *subscriptionName) { PGresult *result = NULL; @@ -1393,10 +1389,10 @@ DropShardMoveSubscription(MultiConnection *connection, char *subscriptionName) /* - * DropShardMoveUser drops the user with the given name if it exists. + * DropShardUser drops the user with the given name if it exists. */ -static void -DropShardMoveUser(MultiConnection *connection, char *username) +void +DropShardUser(MultiConnection *connection, char *username) { /* * The DROP USER command should not propagate, so we temporarily disable @@ -1488,7 +1484,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "CREATE USER %s SUPERUSER IN ROLE %s", - ShardMoveSubscriptionRole(ownerId), + ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX), GetUserNameFromId(ownerId, false) ))); @@ -1512,7 +1508,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, "ALTER SUBSCRIPTION %s OWNER TO %s", ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), - ShardMoveSubscriptionRole(ownerId) + ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) )); /* @@ -1525,7 +1521,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "ALTER ROLE %s NOSUPERUSER", - ShardMoveSubscriptionRole(ownerId) + ShardSubscriptionRole(ownerId, SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX) ))); ExecuteCriticalRemoteCommand(connection, psprintf( @@ -2098,7 +2094,7 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "CREATE USER %s SUPERUSER IN ROLE %s", - ShardMoveSubscriptionRole(ownerId), + ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX), GetUserNameFromId(ownerId, false) ))); @@ -2122,7 +2118,7 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, "ALTER SUBSCRIPTION %s OWNER TO %s", ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX), - ShardMoveSubscriptionRole(ownerId) + ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) )); /* @@ -2135,7 +2131,7 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, "SET LOCAL citus.enable_ddl_propagation TO OFF;", psprintf( "ALTER ROLE %s NOSUPERUSER", - ShardMoveSubscriptionRole(ownerId) + ShardSubscriptionRole(ownerId, SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX) ))); ExecuteCriticalRemoteCommand(connection, psprintf( diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 8685d9d81..60c9add64 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -39,77 +39,32 @@ static void CreateShardSplitPublicationForNode(MultiConnection *connection, tableOwner); static void CreateShardSplitPublications(MultiConnection *sourceConnection, List *shardSplitPubSubMetadataList); + static void CreateShardSplitSubscriptions(List *targetNodeConnectionList, List *shardSplitPubSubMetadataList, WorkerNode *sourceWorkerNode, char *superUser, char *databaseName); static void WaitForShardSplitRelationSubscriptionsBecomeReady( - List *targetNodeConnectionList, List *shardSplitPubSubMetadataList); + List *shardSplitPubSubMetadataList); static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List * - targetNodeConnectionList, List * shardSplitPubSubMetadataList); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); +List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int + connectionFlags, char *user, + char *databaseName); + + +static void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection); +static void DropAllShardSplitPublications(MultiConnection * cleanupConnection); +static void DropAllShardSplitUsers(MultiConnection * cleanupConnection); +static void DropAllReplicationSlots(List * replicationSlotInfo); /*used for debuggin. Remove later*/ void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata *shardSplitMetadata); -StringInfo -CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList) -{ - StringInfo splitChildrenRows = makeStringInfo(); - - ShardInterval *sourceShardIntervalToCopy = NULL; - List *splitChildShardIntervalList = NULL; - bool addComma = false; - forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, - splitChildShardIntervalList, shardGroupSplitIntervalListList) - { - int64 sourceShardId = sourceShardIntervalToCopy->shardId; - - ShardInterval *splitChildShardInterval = NULL; - WorkerNode *destinationWorkerNode = NULL; - forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, - destinationWorkerNode, destinationWorkerNodesList) - { - if (addComma) - { - appendStringInfo(splitChildrenRows, ","); - } - - StringInfo minValueString = makeStringInfo(); - appendStringInfo(minValueString, "%d", DatumGetInt32( - splitChildShardInterval->minValue)); - - StringInfo maxValueString = makeStringInfo(); - appendStringInfo(maxValueString, "%d", DatumGetInt32( - splitChildShardInterval->maxValue)); - - appendStringInfo(splitChildrenRows, - "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", - sourceShardId, - splitChildShardInterval->shardId, - quote_literal_cstr(minValueString->data), - quote_literal_cstr(maxValueString->data), - destinationWorkerNode->nodeId); - - addComma = true; - } - } - - StringInfo splitShardReplicationUDF = makeStringInfo(); - appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", - splitChildrenRows->data); - - return splitShardReplicationUDF; -} - List * ParseReplicationSlotInfoFromResult(PGresult *result) @@ -117,6 +72,8 @@ ParseReplicationSlotInfoFromResult(PGresult *result) int64 rowCount = PQntuples(result); int64 colCount = PQnfields(result); + printf("sameer row count %d col count: %d\n ", rowCount, colCount); + List *replicationSlotInfoList = NIL; for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { @@ -134,16 +91,17 @@ ParseReplicationSlotInfoFromResult(PGresult *result) replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); } - /*TODO(saawasek): size of this should not be NULL */ + /*TODO(saawasek): size of this should not be NULL + * Also check for warning + */ return replicationSlotInfoList; } -List * -CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, +HTAB * +CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList, - List *replicationSlotInfoList) + List *destinationWorkerNodesList) { ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); ShardInterval *sourceShardIntervalToCopy = NULL; @@ -170,26 +128,7 @@ CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, } } - /* Populate pubsub meta data*/ - HASH_SEQ_STATUS status; - hash_seq_init(&status, ShardInfoHashMapForPublications); - - List *shardSplitPubSubMetadataList = NIL; - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32_t nodeId = entry->key.nodeId; - uint32_t tableOwnerId = entry->key.tableOwnerId; - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = - CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, - entry->shardSplitInfoList, - replicationSlotInfoList); - - shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, - shardSplitPubSubMetadata); - } - - return shardSplitPubSubMetadataList; + return ShardInfoHashMapForPublications; } @@ -256,6 +195,7 @@ CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List *shardInter } } + PrintShardSplitPubSubMetadata(shardSplitPubSubMetadata); return shardSplitPubSubMetadata; } @@ -279,21 +219,10 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, ClaimConnectionExclusively(sourceConnection); - List *targetNodeConnectionList = NIL; - ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; - foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) - { - uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; - WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); - - MultiConnection *targetConnection = - GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, - targetWorkerNode->workerPort, - superUser, databaseName); - ClaimConnectionExclusively(targetConnection); - - targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); - } + List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( + shardSplitPubSubMetadataList, + connectionFlags, + superUser, databaseName); /* create publications */ CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); @@ -304,12 +233,10 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, superUser, databaseName); - WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, - shardSplitPubSubMetadataList); + WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitPubSubMetadataList); XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - targetNodeConnectionList, shardSplitPubSubMetadataList); CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, @@ -317,17 +244,14 @@ LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - targetNodeConnectionList, shardSplitPubSubMetadataList); BlockWritesToShardList(sourceColocatedShardIntervalList); sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - targetNodeConnectionList, shardSplitPubSubMetadataList); - /*TOOD : Create foreign key constraints and handle partitioned tables*/ } @@ -435,37 +359,225 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) static void -WaitForShardSplitRelationSubscriptionsBecomeReady(List *targetNodeConnectionList, - List *shardSplitPubSubMetadataList) +WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList) { - MultiConnection *targetConnection = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) { Bitmapset *tableOwnerIds = NULL; tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); - WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds, - SHARD_SPLIT_SUBSCRIPTION_PREFIX); + WaitForRelationSubscriptionsBecomeReady( + shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds, + SHARD_SPLIT_SUBSCRIPTION_PREFIX); } } static void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, - List *targetNodeConnectionList, List *shardSplitPubSubMetadataList) { - MultiConnection *targetConnection = NULL; ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; - forboth_ptr(targetConnection, targetNodeConnectionList, - shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) { Bitmapset *tableOwnerIds = NULL; tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); - WaitForShardSubscriptionToCatchUp(targetConnection, sourcePosition, tableOwnerIds, + + WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection, + sourcePosition, + tableOwnerIds, SHARD_SPLIT_SUBSCRIPTION_PREFIX); } } + + +List * +CreateTargetNodeConnectionsForShardSplit(List *shardSplitPubSubMetadataList, int + connectionFlags, char *user, char *databaseName) +{ + List *targetNodeConnectionList = NIL; + ShardSplitPubSubMetadata *shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + WorkerNode *targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); + + MultiConnection *targetConnection = + GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, + targetWorkerNode->workerPort, + user, + databaseName); + ClaimConnectionExclusively(targetConnection); + + targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); + shardSplitPubSubMetadata->targetNodeConnection = targetConnection; + } + + return targetNodeConnectionList; +} + + +char * +DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, + MultiConnection *sourceConnection) +{ + StringInfo splitTemplateReplicationSlotName = makeStringInfo(); + appendStringInfo(splitTemplateReplicationSlotName, + "citus_split_replicationslot_for_shard_%lu", + shardIntervalToSplit->shardId); + + /* + * To ensure SPLIT is idempotent drop any existing slot from + * previous failed operation. + */ + StringInfo dropReplicationSlotCommand = makeStringInfo(); + appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')", + splitTemplateReplicationSlotName->data); + + /* The Drop command can fail so ignore the response / result and proceed anyways */ + PGresult *result = NULL; + int response = ExecuteOptionalRemoteCommand(sourceConnection, + dropReplicationSlotCommand->data, + &result); + + PQclear(result); + ForgetResults(sourceConnection); + + /* + * PG 13 Function: pg_create_logical_replication_slot ( slot_name name, plugin name [, temporary boolean ] ) + * PG 14 Function: pg_create_logical_replication_slot (slot_name name, plugin name [, temporary boolean, two_phase boolean ] ) + * Return: record ( slot_name name, lsn pg_lsn ) + * Note: Temporary slot are only live during the session's lifetime causing them to be dropped when the session ends. + * In our invocation 'two_phase' support is disabled. + */ + StringInfo createReplicationSlotCommand = makeStringInfo(); + + /* TODO(niupre): Replace pgoutput with an appropriate name (to be introduced in by saawasek's PR) */ + /*TODO(saawasek): Try creating TEMPORAL once basic flow is ready and we have a testcase*/ + appendStringInfo(createReplicationSlotCommand, + "CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;", + splitTemplateReplicationSlotName->data); + + response = ExecuteOptionalRemoteCommand(sourceConnection, + createReplicationSlotCommand->data, &result); + + if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ReportResultError(sourceConnection, result, ERROR); + } + + /*'snapshot_name' is second column where index starts from zero. + * We're using the pstrdup to copy the data into the current memory context */ + char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); + printf("Sameer sanpshot name %s \n", snapShotName); + return snapShotName; +} + + +void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitHashMapForPubSub) +{ + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); + + /* + * We open new connections to all nodes. The reason for this is that + * operations on subscriptions and publications cannot be run in a + * transaction. By forcing a new connection we make sure no transaction is + * active on the connection. + */ + int connectionFlags = FORCE_NEW_CONNECTION; + + HASH_SEQ_STATUS statusForSubscription; + hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != NULL) + { + uint32_t nodeId = entry->key.nodeId; + WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); + MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection( + connectionFlags, workerNode->workerName, workerNode->workerPort, + superUser, databaseName); + + DropAllShardSplitSubscriptions(cleanupConnection); + DropAllShardSplitUsers(cleanupConnection); + + CloseConnection(cleanupConnection); + } + + /*Drop all shard split publications at the source*/ + MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( + connectionFlags, sourceNode->workerName, sourceNode->workerPort, + superUser, databaseName); + + DropAllShardSplitPublications(sourceNodeConnection); + + CloseConnection(sourceNodeConnection); +} + +void DropAllShardSplitSubscriptions(MultiConnection * cleanupConnection) +{ + char *query = psprintf( + "SELECT subname FROM pg_subscription " + "WHERE subname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX)); + List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query); + char *subscriptionName = NULL; + foreach_ptr(subscriptionName, subscriptionNameList) + { + DropShardSubscription(cleanupConnection, subscriptionName); + } +} + +static void +DropAllShardSplitPublications(MultiConnection *connection) +{ + char *query = psprintf( + "SELECT pubname FROM pg_publication " + "WHERE pubname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX)); + List *publicationNameList = GetQueryResultStringList(connection, query); + char *publicationName; + foreach_ptr(publicationName, publicationNameList) + { + DropShardPublication(connection, publicationName); + } +} + +void +DropAllShardSplitUsers(MultiConnection *connection) +{ + char *query = psprintf( + "SELECT rolname FROM pg_roles " + "WHERE rolname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); + List *usernameList = GetQueryResultStringList(connection, query); + char *username; + foreach_ptr(username, usernameList) + { + DropShardUser(connection, username); + } +} + +void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, + HTAB * shardInfoHashMapForPublication) + +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMapForPublication); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + uint32 tableOwnerId = entry->key.tableOwnerId; + List * shardListForPublication = entry->shardSplitInfoList; + + CreateShardSplitPublicationForNode(sourceConnection, + shardListForPublication, + nodeId, + tableOwnerId); + } +} \ No newline at end of file diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5c2e07fe8..2e6b62e71 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -97,7 +97,12 @@ enum MultiConnectionMode * establishments may be suspended until a connection slot is available to * the remote host. */ - WAIT_FOR_CONNECTION = 1 << 7 + WAIT_FOR_CONNECTION = 1 << 7, + + /* + * Force Replication flags + */ + EXCLUSIVE_AND_REPLICATION = 1 << 8 }; @@ -187,6 +192,9 @@ typedef struct MultiConnection /* number of bytes sent to PQputCopyData() since last flush */ uint64 copyBytesWrittenSinceLastFlush; + /* replication option */ + bool replication; + MultiConnectionStructInitializationState initilizationState; } MultiConnection; @@ -207,6 +215,7 @@ typedef struct ConnectionHashKey int32 port; char user[NAMEDATALEN]; char database[NAMEDATALEN]; + bool replication; } ConnectionHashKey; /* hash entry */ diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index f19106ab3..5806c0836 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -27,6 +27,14 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int targetNodePort); extern XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); +extern List * +GetQueryResultStringList(MultiConnection *connection, char *query); + +extern void DropShardSubscription(MultiConnection *connection, + char *subscriptionName); +extern void DropShardPublication(MultiConnection *connection, char *publicationName); + +extern void DropShardUser(MultiConnection *connection, char *username); extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, @@ -47,4 +55,5 @@ extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, #define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_" #define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_" #define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_" +#define SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX "citus_shard_split_subscription_role_" #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index e366c5b86..775db1623 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -22,6 +22,13 @@ typedef struct ShardSplitPubSubMetadata List *shardIntervalListForSubscription; Oid tableOwnerId; ReplicationSlotInfo *slotInfo; + + /* + * Exclusively claimed connection for subscription. + * The target node of subscription + * is pointed by ReplicationSlotInfo. + */ + MultiConnection *targetNodeConnection; } ShardSplitPubSubMetadata; /* key for NodeShardMappingEntry */ @@ -44,20 +51,24 @@ HTAB * SetupHashMapForShardInfo(void); List * ParseReplicationSlotInfoFromResult(PGresult *result); -extern StringInfo CreateSplitShardReplicationSetupUDF( - List *sourceColocatedShardIntervalList, - List * - shardGroupSplitIntervalListList, - List *destinationWorkerNodesList); -extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, +extern HTAB * CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList, - List *replicationSlotInfoList); + List *destinationWorkerNodesList); extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, List *shardSplitPubSubMetadataList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); + +extern void CreateShardSplitPublicationsTwo(MultiConnection *sourceConnection, + HTAB * shardInfoHashMapForPublication); + +extern void DropAllShardSplitLeftOvers(WorkerNode* sourceNode, HTAB * shardSplitMapOfPublications); + +extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot( + ShardInterval *shardIntervalToSplit, + MultiConnection * + sourceConnection); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index 8f526bd49..7ce89ef7c 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -16,39 +16,26 @@ SELECT create_distributed_table('table_to_split','id'); (1 row) -SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - +--SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT * FROM citus_shards; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size --------------------------------------------------------------------- - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9997 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8888 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8887 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9995 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9992 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 57637 | 0 - table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9998 | 0 table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8888 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0 table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8887 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0 table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9995 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0 table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9992 | 0 -(14 rows) + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0 +(7 rows) SELECT * FROM pg_dist_shard; logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue --------------------------------------------------------------------- table_to_split | 1 | t | -2147483648 | 2147483647 - table_second | 2 | t | -2147483648 | 2147483647 -(2 rows) +(1 row) SET client_min_messages TO LOG; SET citus.log_remote_commands TO on; @@ -112,8 +99,8 @@ SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT citus_split_shard_by_split_points( 1, - ARRAY['-1073741826'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY['0'], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -133,22 +120,6 @@ NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') @@ -157,146 +128,28 @@ NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '0', 16)::citus.split_shard_info,ROW(1, 101, '1', '2147483647', 18)::citus.split_shard_info]); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT +NOTICE: issuing SELECT subname FROM pg_subscription WHERE subname LIKE 'citus_shard_split_subscription_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +NOTICE: issuing SELECT rolname FROM pg_roles WHERE rolname LIKE 'citus_shard_split_subscription_role_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +NOTICE: issuing SELECT pubname FROM pg_publication WHERE pubname LIKE 'citus_shard_split_publication_' || '%' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_16_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1 DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT +NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_to_split_1 DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 18)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info]) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE PUBLICATION citus_shard_split_publication_18_10 FOR TABLE citus_split_shard_by_split_points_negative.table_to_split_100,citus_split_shard_by_split_points_negative.table_to_split_1,citus_split_shard_by_split_points_negative.table_to_split_101,citus_split_shard_by_split_points_negative.table_second_102,citus_split_shard_by_split_points_negative.table_second_2,citus_split_shard_by_split_points_negative.table_second_103 -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE USER citus_shard_move_subscription_role_10 SUPERUSER IN ROLE postgres -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_10 CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_18_10 WITH (citus_use_authinfo=true, enabled=false) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 OWNER TO citus_shard_move_subscription_role_10 -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SET LOCAL citus.enable_ddl_propagation TO OFF; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER ROLE citus_shard_move_subscription_role_10 NOSUPERUSER -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing ALTER SUBSCRIPTION citus_shard_split_subscription_10 ENABLE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT sum(pg_total_relation_size(srrelid)) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT count(*) FROM pg_subscription_rel, pg_stat_subscription WHERE srsubid = subid AND srsubstate != 'r' AND subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split ADD CONSTRAINT table_to_split_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second ADD CONSTRAINT table_second_pkey PRIMARY KEY (id)') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2]) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT lock_shard_metadata(7, ARRAY[1, 2]) -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_current_wal_lsn() -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT min(latest_end_lsn) FROM pg_stat_subscription WHERE subname IN ('citus_shard_split_subscription_10') -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(1); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT citus_internal_delete_shard_metadata(2); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 100, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_to_split'::regclass, 101, 't'::"char", '-1073741825', '2147483647'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 102, 't'::"char", '-2147483648', '-1073741826'), ('citus_split_shard_by_split_points_negative.table_second'::regclass, 103, 't'::"char", '-1073741825', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (100, 1, 0, 16, 85), (101, 1, 0, 16, 86), (102, 1, 0, 16, 87), (103, 1, 0, 16, 88)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data; -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_100 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_101 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_102 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_103 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_to_split_1 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing DROP TABLE IF EXISTS citus_split_shard_by_split_points_negative.table_second_2 CASCADE -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -WARNING: connection claimed exclusively at transaction commit -WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit citus_split_shard_by_split_points --------------------------------------------------------------------- @@ -310,25 +163,26 @@ SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; Schema | Name | Owner --------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_second | postgres - citus_split_shard_by_split_points_negative | table_second_102 | postgres - citus_split_shard_by_split_points_negative | table_second_103 | postgres citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_100 | postgres + citus_split_shard_by_split_points_negative | table_to_split_1 | postgres citus_split_shard_by_split_points_negative | table_to_split_101 | postgres -(6 rows) +(3 rows) \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; - Schema | Name | Owner + Schema | Name | Owner --------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_second | postgres - citus_split_shard_by_split_points_negative | table_to_split | postgres + citus_split_shard_by_split_points_negative | table_to_split | postgres + citus_split_shard_by_split_points_negative | table_to_split_1 | postgres + citus_split_shard_by_split_points_negative | table_to_split_100 | postgres + citus_split_shard_by_split_points_negative | table_to_split_101 | postgres +(4 rows) + +SELECT * FROM pg_publication; + oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot +--------------------------------------------------------------------- + 17381 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f + 17384 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f (2 rows) -SELECT * FROM pg_publication_tables; - pubname | schemaname | tablename ---------------------------------------------------------------------- -(0 rows) - diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index 560bf42a9..68d0853f9 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -13,7 +13,7 @@ CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); -- Shard3 | 0 | 1073741823 -- Shard4 | 1073741824 | 2147483647 SELECT create_distributed_table('table_to_split','id'); -SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); +--SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -47,8 +47,8 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, - ARRAY['-1073741826'], - ARRAY[:worker_2_node, :worker_2_node], + ARRAY['0'], + ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -- On worker2, we want child shard 2 and dummy shard 1 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 -- @@ -60,4 +60,4 @@ SELECT * FROM show_catalog; \c - - - :worker_1_port SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; -SELECT * FROM pg_publication_tables; +SELECT * FROM pg_publication;