diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index e886566e5..f20cff26e 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -34,6 +34,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_physical_planner.h" #include "commands/dbcommands.h" +#include "distributed/shardsplit_logical_replication.h" /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, @@ -1186,47 +1187,10 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, WorkerNode *sourceWorkerNode, List *destinationWorkerNodesList) { - StringInfo splitChildrenRows = makeStringInfo(); - ShardInterval *sourceShardIntervalToCopy = NULL; - List *splitChildShardIntervalList = NULL; - bool addComma = false; - forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, - splitChildShardIntervalList, shardGroupSplitIntervalListList) - { - int64 sourceShardId = sourceShardIntervalToCopy->shardId; - - ShardInterval *splitChildShardInterval = NULL; - WorkerNode *destinationWorkerNode = NULL; - forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, - destinationWorkerNode, destinationWorkerNodesList) - { - if (addComma) - { - appendStringInfo(splitChildrenRows, ","); - } - - StringInfo minValueString = makeStringInfo(); - appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue)); - - StringInfo maxValueString = makeStringInfo(); - appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue)); - - appendStringInfo(splitChildrenRows, - "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", - sourceShardId, - splitChildShardInterval->shardId, - quote_literal_cstr(minValueString->data), - quote_literal_cstr(maxValueString->data), - destinationWorkerNode->nodeId); - - addComma = true; - } - } - - StringInfo splitShardReplicationUDF = makeStringInfo(); - appendStringInfo(splitShardReplicationUDF, - "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data); + StringInfo splitShardReplicationUDF = CreateSplitShardReplicationSetupUDF(sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList); int connectionFlags = FORCE_NEW_CONNECTION; MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, @@ -1241,7 +1205,6 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, PGresult *result = NULL; int queryResult = ExecuteOptionalRemoteCommand(sourceConnection, splitShardReplicationUDF->data, &result); - if (queryResult != RESPONSE_OKAY || !IsResponseOK(result)) { ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), @@ -1250,4 +1213,15 @@ static void SplitShardReplicationSetup(List *sourceColocatedShardIntervalList, PQclear(result); ForgetResults(sourceConnection); } -} \ No newline at end of file + + /* Get replication slot information */ + List * replicationSlotInfoList = ParseReplicationSlotInfoFromResult(result); + + List * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadataList(sourceColocatedShardIntervalList, + shardGroupSplitIntervalListList, + destinationWorkerNodesList, + replicationSlotInfoList); + + LogicallReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata); +} + diff --git a/src/backend/distributed/operations/split_shard_replication_setup.c b/src/backend/distributed/operations/split_shard_replication_setup.c index fa59957b5..da1b0c2bb 100644 --- a/src/backend/distributed/operations/split_shard_replication_setup.c +++ b/src/backend/distributed/operations/split_shard_replication_setup.c @@ -19,29 +19,18 @@ #include "distributed/citus_safe_lib.h" #include "distributed/listutils.h" #include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" +#include "distributed/shardsplit_logical_replication.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "commands/dbcommands.h" + /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(worker_split_shard_replication_setup); static HTAB *ShardInfoHashMap = NULL; -/* key for NodeShardMappingEntry */ -typedef struct NodeShardMappingKey -{ - uint32_t nodeId; - Oid tableOwnerId; -} NodeShardMappingKey; - -/* Entry for hash map */ -typedef struct NodeShardMappingEntry -{ - NodeShardMappingKey key; - List *shardSplitInfoList; -} NodeShardMappingEntry; - /* Function declarations */ static void ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, uint64 *sourceShardId, @@ -58,11 +47,8 @@ static ShardSplitInfo * CreateShardSplitInfo(uint64 sourceShardIdToSplit, static void AddShardSplitInfoEntryForNodeInMap(ShardSplitInfo *shardSplitInfo); static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, HTAB *shardInfoHashMap); - -static void SetupHashMapForShardInfo(void); -static uint32 NodeShardMappingHash(const void *key, Size keysize); -static int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); - + +static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap); StringInfo GetSoureAndDestinationShardNames(List* shardSplitInfoList); @@ -117,7 +103,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) } /* SetupMap */ - SetupHashMapForShardInfo(); + ShardInfoHashMap = SetupHashMapForShardInfo(); int shardSplitInfoCount = 0; @@ -157,7 +143,9 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) /* store handle in statically allocated shared memory*/ StoreShardSplitSharedMemoryHandle(dsmHandle); - CreatePublishersForSplitChildren(ShardInfoHashMap); + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + ReturnReplicationSlotInfo(ShardInfoHashMap, tupleStore, tupleDescriptor); PG_RETURN_VOID(); } @@ -169,7 +157,7 @@ worker_split_shard_replication_setup(PG_FUNCTION_ARGS) * is 'nodeId' and value is a list of ShardSplitInfo that are placed on * this particular node. */ -static void +HTAB * SetupHashMapForShardInfo() { HASHCTL info; @@ -182,7 +170,8 @@ SetupHashMapForShardInfo() int hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_FUNCTION | HASH_COMPARE); - ShardInfoHashMap = hash_create("ShardInfoMap", 128, &info, hashFlags); + HTAB * shardInfoMap = hash_create("ShardInfoMap", 128, &info, hashFlags); + return shardInfoMap; } @@ -350,7 +339,7 @@ PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMHeader, * NodeShardMappingHash returns hash value by combining hash of node id * and tableowner Id. */ -static uint32 +uint32 NodeShardMappingHash(const void *key, Size keysize) { NodeShardMappingKey *entry = (NodeShardMappingKey *) key; @@ -363,7 +352,7 @@ NodeShardMappingHash(const void *key, Size keysize) /* * Comparator function for hash keys */ -static int +int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize) { NodeShardMappingKey *leftKey = (NodeShardMappingKey *) left; @@ -530,4 +519,30 @@ ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo* shardSplitInfo) shardName = quote_qualified_identifier(schemaName, shardName); return shardName; +} + +static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMap); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + Datum values[3]; + bool nulls[3]; + + memset(values, 0, sizeof(values)); + memset(nulls, false, sizeof(nulls)); + + values[0] = Int32GetDatum(entry->key.nodeId); + + char * tableOwnerName = GetUserNameFromId(entry->key.tableOwnerId, false); + values[1] = CStringGetTextDatum(tableOwnerName); + + char * slotName = encode_replication_slot(entry->key.nodeId, entry->key.tableOwnerId); + values[2] = CStringGetTextDatum(slotName); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); + } } \ No newline at end of file diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index ce92cfa75..04f868f90 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -133,8 +133,7 @@ static void CreateShardMoveSubscriptions(MultiConnection *connection, static char * escape_param_str(const char *str); static XLogRecPtr GetRemoteLogPosition(MultiConnection *connection); static XLogRecPtr GetRemoteLSN(MultiConnection *connection, char *command); -static void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, - Bitmapset *tableOwnerIds); + static uint64 TotalRelationSizeForSubscription(MultiConnection *connection, char *command); static bool RelationSubscriptionsAreReady(MultiConnection *targetConnection, @@ -146,8 +145,7 @@ static void WaitForMiliseconds(long timeout); static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds); static char * ShardMovePublicationName(Oid ownerId); -static char * ShardMoveSubscriptionName(Oid ownerId); -static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId); +static char * ShardSubscriptionName(Oid ownerId, char * operationPrefix); static void AcquireLogicalReplicationLock(void); static void DropAllShardMoveLeftovers(void); static void DropAllShardMoveSubscriptions(MultiConnection *connection); @@ -1091,7 +1089,7 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) * If replication slot can not be dropped while dropping the subscriber, drop * it here. */ - DropShardMoveReplicationSlot(connection, ShardMoveSubscriptionName(ownerId)); + DropShardMoveReplicationSlot(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMovePublication(connection, ShardMovePublicationName(ownerId)); } } @@ -1139,7 +1137,7 @@ ShardMovePublicationName(Oid ownerId) /* - * ShardMoveSubscriptionName returns the name of the subscription for the given + * ShardSubscriptionName returns the name of the subscription for the given * owner. If we're running the isolation tester the function also appends the * process id normal subscription name. * @@ -1150,15 +1148,15 @@ ShardMovePublicationName(Oid ownerId) * coordinator is blocked by the blocked replication process. */ static char * -ShardMoveSubscriptionName(Oid ownerId) +ShardSubscriptionName(Oid ownerId, char * operationPrefix) { if (RunningUnderIsolationTest) { - return psprintf("%s%i_%i", SHARD_MOVE_SUBSCRIPTION_PREFIX, ownerId, MyProcPid); + return psprintf("%s%i_%i", operationPrefix, ownerId, MyProcPid); } else { - return psprintf("%s%i", SHARD_MOVE_SUBSCRIPTION_PREFIX, ownerId); + return psprintf("%s%i", operationPrefix, ownerId); } } @@ -1323,7 +1321,7 @@ DropShardMoveSubscriptions(MultiConnection *connection, Bitmapset *tableOwnerIds int ownerId = -1; while ((ownerId = bms_next_member(tableOwnerIds, ownerId)) >= 0) { - DropShardMoveSubscription(connection, ShardMoveSubscriptionName(ownerId)); + DropShardMoveSubscription(connection, ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)); DropShardMoveUser(connection, ShardMoveSubscriptionRole(ownerId)); } } @@ -1497,7 +1495,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "WITH (citus_use_authinfo=true, enabled=false)", - quote_identifier(ShardMoveSubscriptionName(ownerId)), + quote_identifier(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX)), quote_literal_cstr(conninfo->data), quote_identifier(ShardMovePublicationName(ownerId))); @@ -1506,7 +1504,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, pfree(createSubscriptionCommand); ExecuteCriticalRemoteCommand(connection, psprintf( "ALTER SUBSCRIPTION %s OWNER TO %s", - ShardMoveSubscriptionName(ownerId), + ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX), ShardMoveSubscriptionRole(ownerId) )); @@ -1525,7 +1523,7 @@ CreateShardMoveSubscriptions(MultiConnection *connection, char *sourceNodeName, ExecuteCriticalRemoteCommand(connection, psprintf( "ALTER SUBSCRIPTION %s ENABLE", - ShardMoveSubscriptionName(ownerId) + ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX) )); } } @@ -1631,7 +1629,7 @@ GetRemoteLSN(MultiConnection *connection, char *command) * on the target node doesn't change within LogicalReplicationErrorTimeout. The * function also reports its progress in every logicalReplicationProgressReportTimeout. */ -static void +void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, Bitmapset *tableOwnerIds) { @@ -1848,7 +1846,7 @@ ShardMoveSubscriptionNamesValueList(Bitmapset *tableOwnerIds) first = false; } appendStringInfoString(subscriptionValueList, - quote_literal_cstr(ShardMoveSubscriptionName(ownerId))); + quote_literal_cstr(ShardSubscriptionName(ownerId, SHARD_MOVE_SUBSCRIPTION_PREFIX))); } appendStringInfoString(subscriptionValueList, ")"); return subscriptionValueList->data; @@ -2063,11 +2061,68 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds) "WHERE subname IN %s", subscriptionValueList)); } -/* - * ShardSplitPublicationName returns the name of the publication for the given - * table owner. - */ -static char * ShardSplitPublicationName(Oid ownerId, uint32 nodeId) + +/*Refactor this for ShardMove too.*/ +void +CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, + int sourceNodePort, char *userName, char *databaseName, + char * publicationName, + Oid ownerId) { - return psprintf("%s%i_%u", SHARD_SPLIT_PUBLICATION_PREFIX, ownerId, nodeId); + + StringInfo createSubscriptionCommand = makeStringInfo(); + StringInfo conninfo = makeStringInfo(); + + /* + * The CREATE USER command should not propagate, so we temporarily + * disable DDL propagation. + */ + SendCommandListToWorkerOutsideTransaction( + connection->hostname, connection->port, connection->user, + list_make2( + "SET LOCAL citus.enable_ddl_propagation TO OFF;", + psprintf( + "CREATE USER %s SUPERUSER IN ROLE %s", + ShardMoveSubscriptionRole(ownerId), + GetUserNameFromId(ownerId, false) + ))); + + appendStringInfo(conninfo, "host='%s' port=%d user='%s' dbname='%s' " + "connect_timeout=20", + escape_param_str(sourceNodeName), sourceNodePort, + escape_param_str(userName), escape_param_str(databaseName)); + + appendStringInfo(createSubscriptionCommand, + "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " + "WITH (citus_use_authinfo=true, enabled=false)", + quote_identifier(ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX)), + quote_literal_cstr(conninfo->data), + quote_identifier(publicationName)); + + ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data); + pfree(createSubscriptionCommand->data); + pfree(createSubscriptionCommand); + ExecuteCriticalRemoteCommand(connection, psprintf( + "ALTER SUBSCRIPTION %s OWNER TO %s", + ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX), + ShardMoveSubscriptionRole(ownerId) + )); + + /* + * The ALTER ROLE command should not propagate, so we temporarily + * disable DDL propagation. + */ + SendCommandListToWorkerOutsideTransaction( + connection->hostname, connection->port, connection->user, + list_make2( + "SET LOCAL citus.enable_ddl_propagation TO OFF;", + psprintf( + "ALTER ROLE %s NOSUPERUSER", + ShardMoveSubscriptionRole(ownerId) + ))); + + ExecuteCriticalRemoteCommand(connection, psprintf( + "ALTER SUBSCRIPTION %s ENABLE", + ShardSubscriptionName(ownerId, SHARD_SPLIT_SUBSCRIPTION_PREFIX) + )); } diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c new file mode 100644 index 000000000..7657dfa13 --- /dev/null +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -0,0 +1,380 @@ +/*------------------------------------------------------------------------- + * + * shardsplit_logical_replication.c + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "nodes/pg_list.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/connection_management.h" +#include "distributed/remote_commands.h" +#include "distributed/shard_split.h" +#include "distributed/listutils.h" +#include "distributed/shardsplit_logical_replication.h" +#include "distributed/multi_logical_replication.h" +#include "utils/builtins.h" +#include "commands/dbcommands.h" + +static HTAB *ShardInfoHashMapForPublications = NULL; + +/* function declarations */ +static void AddShardEntryInMap(Oid tableOwner, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval); +ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIdList, List * replicationSlotInfoList); + +static void +CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, + uint32_t publicationForTargetNodeId, Oid tableOwner); +static void +CreateShardSplitPublications(MultiConnection *sourceConnection, List * shardSplitPubSubMetadataList); +static void +CreateShardSplitSubscriptions(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList, WorkerNode * sourceWorkerNode, char * superUser, char * databaseName); +static void +WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList); + +static char * +ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); + +/*used for debuggin. Remove later*/ +void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata); + +StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList) +{ + StringInfo splitChildrenRows = makeStringInfo(); + + ShardInterval *sourceShardIntervalToCopy = NULL; + List *splitChildShardIntervalList = NULL; + bool addComma = false; + forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitChildShardIntervalList, shardGroupSplitIntervalListList) + { + int64 sourceShardId = sourceShardIntervalToCopy->shardId; + + ShardInterval *splitChildShardInterval = NULL; + WorkerNode *destinationWorkerNode = NULL; + forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, + destinationWorkerNode, destinationWorkerNodesList) + { + if (addComma) + { + appendStringInfo(splitChildrenRows, ","); + } + + StringInfo minValueString = makeStringInfo(); + appendStringInfo(minValueString, "%d", DatumGetInt32(splitChildShardInterval->minValue)); + + StringInfo maxValueString = makeStringInfo(); + appendStringInfo(maxValueString, "%d", DatumGetInt32(splitChildShardInterval->maxValue)); + + appendStringInfo(splitChildrenRows, + "ROW(%lu, %lu, %s, %s, %u)::citus.split_shard_info", + sourceShardId, + splitChildShardInterval->shardId, + quote_literal_cstr(minValueString->data), + quote_literal_cstr(maxValueString->data), + destinationWorkerNode->nodeId); + + addComma = true; + } + } + + StringInfo splitShardReplicationUDF = makeStringInfo(); + appendStringInfo(splitShardReplicationUDF, + "SELECT * FROM worker_split_shard_replication_setup(ARRAY[%s])", splitChildrenRows->data); + + return splitShardReplicationUDF; +} + +List * ParseReplicationSlotInfoFromResult(PGresult * result) +{ + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + + List *replicationSlotInfoList = NIL; + for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + ReplicationSlotInfo * replicationSlotInfo = (ReplicationSlotInfo *)palloc0(sizeof(ReplicationSlotInfo)); + + char * targeNodeIdString = PQgetvalue(result, rowIndex, 0); + replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); + + /* we're using the pstrdup to copy the data into the current memory context */ + replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1)); + + replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2)); + + replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); + } + + /*TODO(saawasek): size of this should not be NULL */ + return replicationSlotInfoList; +} + + +List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList, + List *replicationSlotInfoList) +{ + ShardInfoHashMapForPublications = SetupHashMapForShardInfo(); + ShardInterval *sourceShardIntervalToCopy = NULL; + List *splitChildShardIntervalList = NULL; + forboth_ptr(sourceShardIntervalToCopy, sourceColocatedShardIntervalList, + splitChildShardIntervalList, shardGroupSplitIntervalListList) + { + ShardInterval *splitChildShardInterval = NULL; + WorkerNode *destinationWorkerNode = NULL; + forboth_ptr(splitChildShardInterval, splitChildShardIntervalList, + destinationWorkerNode, destinationWorkerNodesList) + { + /* Table owner is same for both parent and child shard */ + Oid tableOwnerId = TableOwnerOid(sourceShardIntervalToCopy->relationId); + uint32 destinationWorkerNodeId = destinationWorkerNode->nodeId; + + /* Add split child shard interval */ + AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, splitChildShardInterval, true /*isChildShardInterval*/); + + /* Add parent shard interval if not already added */ + AddShardEntryInMap(tableOwnerId, destinationWorkerNodeId, sourceShardIntervalToCopy, false /*isChildShardInterval*/); + } + } + + /* Populate pubsub meta data*/ + HASH_SEQ_STATUS status; + hash_seq_init(&status, ShardInfoHashMapForPublications); + + List * shardSplitPubSubMetadataList = NIL; + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32_t nodeId = entry->key.nodeId; + uint32_t tableOwnerId = entry->key.tableOwnerId; + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = CreateShardSplitPubSubMetadata(tableOwnerId, nodeId, entry->shardSplitInfoList, replicationSlotInfoList); + + shardSplitPubSubMetadataList = lappend(shardSplitPubSubMetadataList, shardSplitPubSubMetadata); + } + + return shardSplitPubSubMetadataList; +} + +static void AddShardEntryInMap(Oid tableOwnerId, uint32 nodeId, ShardInterval * shardInterval, bool isChildShardInterval) +{ + NodeShardMappingKey key; + key.nodeId = nodeId; + key.tableOwnerId = tableOwnerId; + + bool found = false; + NodeShardMappingEntry *nodeMappingEntry = + (NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key, HASH_ENTER, + &found); + if (!found) + { + nodeMappingEntry->shardSplitInfoList = NIL; + } + + if(isChildShardInterval) + { + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + return; + } + + ShardInterval * existingShardInterval = NULL; + foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList) + { + if(existingShardInterval->shardId == shardInterval->shardId) + { + /* parent shard interval is already added hence return */ + return; + } + } + + /* Add parent shard Interval */ + nodeMappingEntry->shardSplitInfoList = + lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval); + +} + + +ShardSplitPubSubMetadata * CreateShardSplitPubSubMetadata(Oid tableOwnerId, uint32 nodeId, List * shardIntervalList, List * replicationSlotInfoList) +{ + + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = palloc0(sizeof(ShardSplitPubSubMetadata)); + shardSplitPubSubMetadata->shardIntervalListForSubscription = shardIntervalList; + shardSplitPubSubMetadata->tableOwnerId = tableOwnerId; + + char * tableOwnerName = GetUserNameFromId(tableOwnerId, false); + ReplicationSlotInfo * replicationSlotInfo = NULL; + foreach_ptr(replicationSlotInfo, replicationSlotInfoList) + { + if(nodeId == replicationSlotInfo->targetNodeId && + strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0) + { + shardSplitPubSubMetadata->slotInfo = replicationSlotInfo; + break; + } + } + + return shardSplitPubSubMetadata; +} + + +void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList) +{ + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); + int connectionFlags = FORCE_NEW_CONNECTION; + + /* Get source node connection */ + MultiConnection *sourceConnection = + GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, sourceWorkerNode->workerPort, + superUser, databaseName); + + ClaimConnectionExclusively(sourceConnection); + + List * targetNodeConnectionList = NIL; + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 targetWorkerNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + WorkerNode * targetWorkerNode = FindNodeWithNodeId(targetWorkerNodeId, false); + + MultiConnection *targetConnection = + GetNodeUserDatabaseConnection(connectionFlags, targetWorkerNode->workerName, targetWorkerNode->workerPort, + superUser, databaseName); + ClaimConnectionExclusively(targetConnection); + + targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); + } + + /* create publications */ + CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); + + CreateShardSplitSubscriptions(targetNodeConnectionList, + shardSplitPubSubMetadataList, + sourceWorkerNode, + superUser, + databaseName); + + WaitForShardSplitRelationSubscriptionsBecomeReady(targetNodeConnectionList, shardSplitPubSubMetadataList); +} + + +void PrintShardSplitPubSubMetadata(ShardSplitPubSubMetadata * shardSplitMetadata) +{ + printf("sameer: ShardSplitPubSbuMetadata\n"); + ReplicationSlotInfo * replicationInfo = shardSplitMetadata->slotInfo; + + List * shardIntervalList = shardSplitMetadata->shardIntervalListForSubscription; + printf("shardIds: "); + ShardInterval * shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + printf("%ld ", shardInterval->shardId); + } + + printf("\nManual Username from OID at source: %s \n", GetUserNameFromId(shardSplitMetadata->tableOwnerId, false)); + printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, replicationInfo->targetNodeId, replicationInfo->tableOwnerName); +} + + +static void +CreateShardSplitPublications(MultiConnection *sourceConnection, List *shardSplitPubSubMetadataList) +{ + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; + foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + Oid tableOwnerId = shardSplitPubSubMetadata->tableOwnerId; + + CreateShardSplitPublicationForNode(sourceConnection, + shardSplitPubSubMetadata->shardIntervalListForSubscription, + publicationForNodeId, + tableOwnerId); + } +} + +static void +CreateShardSplitSubscriptions(List * targetNodeConnectionList, + List * shardSplitPubSubMetadataList, + WorkerNode * sourceWorkerNode, + char * superUser, + char * databaseName) +{ + MultiConnection * targetConnection = NULL; + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId; + Oid ownerId = shardSplitPubSubMetadata->tableOwnerId; + CreateShardSubscription(targetConnection, + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort, + superUser, + databaseName, + ShardSplitPublicationName(publicationForNodeId, ownerId), + ownerId); + } +} + + +static void +CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, + uint32_t publicationForTargetNodeId, Oid ownerId) +{ + + StringInfo createPublicationCommand = makeStringInfo(); + bool prefixWithComma = false; + + appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ", + ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); + + ShardInterval *shard = NULL; + foreach_ptr(shard, shardList) + { + char *shardName = ConstructQualifiedShardName(shard); + + if (prefixWithComma) + { + appendStringInfoString(createPublicationCommand, ","); + } + + appendStringInfoString(createPublicationCommand, shardName); + prefixWithComma = true; + } + + ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data); + pfree(createPublicationCommand->data); + pfree(createPublicationCommand); +} + + +static char * +ShardSplitPublicationName(uint32_t nodeId, Oid ownerId) +{ + return psprintf("%s%u_%u", SHARD_SPLIT_PUBLICATION_PREFIX, nodeId, ownerId); +} + + +static void +WaitForShardSplitRelationSubscriptionsBecomeReady(List * targetNodeConnectionList, List * shardSplitPubSubMetadataList) +{ + MultiConnection * targetConnection = NULL; + ShardSplitPubSubMetadata * shardSplitPubSubMetadata = NULL; + forboth_ptr(targetConnection, targetNodeConnectionList, + shardSplitPubSubMetadata, shardSplitPubSubMetadataList) + { + Bitmapset *tableOwnerIds = NULL; + tableOwnerIds = bms_add_member(tableOwnerIds, shardSplitPubSubMetadata->tableOwnerId); + WaitForRelationSubscriptionsBecomeReady(targetConnection, tableOwnerIds); + } +} \ No newline at end of file diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql index 8d5e8fae3..aca0e4725 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/11.0-2.sql @@ -5,9 +5,11 @@ CREATE TYPE citus.split_shard_info AS ( shard_max_value text, node_id integer); +CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); + CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) -RETURNS void +RETURNS setof citus.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) diff --git a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql index 8d5e8fae3..cfa3d2a67 100644 --- a/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql +++ b/src/backend/distributed/sql/udfs/worker_split_shard_replication_setup/latest.sql @@ -5,12 +5,15 @@ CREATE TYPE citus.split_shard_info AS ( shard_max_value text, node_id integer); +CREATE TYPE citus.replication_slot_info AS(node_id integer, slot_owner text, slot_name text); + CREATE OR REPLACE FUNCTION pg_catalog.worker_split_shard_replication_setup( splitShardInfo citus.split_shard_info[]) -RETURNS void +RETURNS setof citus.replication_slot_info LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$worker_split_shard_replication_setup$$; COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardInfo citus.split_shard_info[]) IS 'Replication setup for splitting a shard'; + IS 'Replication setup for splitting a shard'; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(citus.split_shard_info[]) FROM PUBLIC; diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index e250072e0..6f7126eaa 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -26,9 +26,18 @@ extern void LogicallyReplicateShards(List *shardList, char *sourceNodeName, int sourceNodePort, char *targetNodeName, int targetNodePort); +extern void +CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, + int sourceNodePort, char *userName, char *databaseName, + char * publicationName, + Oid ownerId); + +extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection, + Bitmapset *tableOwnerIds); + #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" #define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" #define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_" #define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_" - +#define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_" #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h new file mode 100644 index 000000000..7bbe55fb1 --- /dev/null +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * + * shardsplit_logical_replication.h + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef SHARDSPLIT_LOGICAL_REPLICATION_H +#define SHARDSPLIT_LOGICAL_REPLICATION_H + +typedef struct ReplicationSlotInfo +{ + uint32 targetNodeId; + char * tableOwnerName; + char * slotName; +} ReplicationSlotInfo; + +typedef struct ShardSplitPubSubMetadata +{ + List * shardIntervalListForSubscription; + Oid tableOwnerId; + ReplicationSlotInfo *slotInfo; +} ShardSplitPubSubMetadata; + +/* key for NodeShardMappingEntry */ +typedef struct NodeShardMappingKey +{ + uint32_t nodeId; + Oid tableOwnerId; +} NodeShardMappingKey; + +/* Entry for hash map */ +typedef struct NodeShardMappingEntry +{ + NodeShardMappingKey key; + List *shardSplitInfoList; +} NodeShardMappingEntry; + +extern uint32 NodeShardMappingHash(const void *key, Size keysize); +extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); +HTAB * SetupHashMapForShardInfo(void); + +List * ParseReplicationSlotInfoFromResult(PGresult * result); + +extern StringInfo CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList); + +extern List * CreateShardSplitPubSubMetadataList(List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *destinationWorkerNodesList, + List *replicationSlotInfoList); + +extern void LogicallReplicateSplitShards(WorkerNode *sourceWorkerNode, List* shardSplitPubSubMetadataList); +#endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ \ No newline at end of file diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index d8b006741..3ac378f06 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -1,25 +1,11 @@ -- Negative test cases for citus_split_shard_by_split_points UDF. CREATE SCHEMA citus_split_shard_by_split_points_negative; SET search_path TO citus_split_shard_by_split_points_negative; -SET citus.shard_count TO 4; +SET citus.shard_count TO 1; SET citus.shard_replication_factor TO 1; -SET citus.next_shard_id TO 60761300; -CREATE TABLE range_paritioned_table_to_split(rid bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('range_paritioned_table_to_split', 'rid', 'range'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- Shards are not created automatically for range distributed table. -SELECT master_create_empty_shard('range_paritioned_table_to_split'); - master_create_empty_shard ---------------------------------------------------------------------- - 60761300 -(1 row) - -SET citus.next_shard_id TO 49761300; +SET citus.next_shard_id TO 1; CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); +CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); -- Shard1 | -2147483648 | -1073741825 -- Shard2 | -1073741824 | -1 -- Shard3 | 0 | 1073741823 @@ -30,116 +16,210 @@ SELECT create_distributed_table('table_to_split','id'); (1 row) +SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +SELECT * FROM citus_shards; + table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size +--------------------------------------------------------------------- + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9997 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8888 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 8887 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9995 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9992 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 57637 | 0 + table_second | 2 | citus_split_shard_by_split_points_negative.table_second_2 | distributed | 1390006 | localhost | 9998 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8888 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8887 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9995 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9992 | 0 +(14 rows) + +SELECT * FROM pg_dist_shard; + logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue +--------------------------------------------------------------------- + table_to_split | 1 | t | -2147483648 | 2147483647 + table_second | 2 | t | -2147483648 | 2147483647 +(2 rows) + +SET client_min_messages TO LOG; +SET citus.log_remote_commands TO on; +CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", + c.relname as "Name", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner" +FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace +WHERE c.relkind IN ('r','p','') + AND n.nspname <> 'pg_catalog' + AND n.nspname !~ '^pg_toast' + AND n.nspname <> 'information_schema' + AND pg_catalog.pg_table_is_visible(c.oid) +ORDER BY 1,2; +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET search_path TO citus_split_shard_by_split_points_negative; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema", + c.relname AS "Name", + pg_get_userbyid(c.relowner) AS "Owner" + FROM (pg_class c + LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid)) + ORDER BY n.nspname, c.relname; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE OR REPLACE VIEW citus_split_shard_by_split_points_negative.show_catalog ("Schema","Name","Owner") AS SELECT n.nspname AS "Schema", + c.relname AS "Name", + pg_get_userbyid(c.relowner) AS "Owner" + FROM (pg_class c + LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace))) + WHERE ((c.relkind = ANY (ARRAY['r'::"char", 'p'::"char", ''::"char"])) AND (n.nspname <> 'pg_catalog'::name) AND (n.nspname !~ '^pg_toast'::text) AND (n.nspname <> 'information_schema'::name) AND pg_table_is_visible(c.oid)) + ORDER BY n.nspname, c.relname; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('view', ARRAY['citus_split_shard_by_split_points_negative', 'show_catalog']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing PREPARE TRANSACTION 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT PREPARED 'citus_xx_xx_xx_xx' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -- UDF fails for range partitioned tables. +\c - - - :master_port +SET citus.log_remote_commands TO on; +SET citus.next_shard_id TO 100; +SET search_path TO citus_split_shard_by_split_points_negative; +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset SELECT citus_split_shard_by_split_points( - 60761300, + 1, ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Cannot split shard as operation is only supported for hash distributed tables. --- UDF fails if number of placement node list does not exceed split points by one. --- Example: One split point defines two way split (2 worker nodes needed). -SELECT citus_split_shard_by_split_points( - 49761300, - -- 2 split points defined making it a 3 way split but we only specify 2 placement lists. - ARRAY['-1073741826', '-107374182'], - ARRAY[:worker_1_node, :worker_2_node]); -- 2 worker nodes. -ERROR: Number of worker node ids should be one greater split points. NodeId count is '2' and SplitPoint count is '2'. --- UDF fails if split ranges specified are not within the shard id to split. -SELECT citus_split_shard_by_split_points( - 49761300, -- Shard range is from (-2147483648, -1073741825) - ARRAY['0'], -- The range we specified is 0 which is not in the range. - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Split point 0 is outside the min/max range(-2147483648, -1073741825) for shard id 49761300. --- UDF fails if split points are not strictly increasing. -SELECT citus_split_shard_by_split_points( - 49761302, - ARRAY['50', '35'], - ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]); -ERROR: Invalid Split Points '50' followed by '35'. All split points should be strictly increasing. -SELECT citus_split_shard_by_split_points( - 49761302, - ARRAY['50', '50'], - ARRAY[:worker_1_node, :worker_2_node, :worker_1_node]); -ERROR: Invalid Split Points '50' followed by '50'. All split points should be strictly increasing. --- UDF fails if nodeIds are < 1 or Invalid. -SELECT citus_split_shard_by_split_points( - 49761302, - ARRAY['50'], - ARRAY[0, :worker_2_node]); -ERROR: Invalid Node Id '0'. -SELECT citus_split_shard_by_split_points( - 49761302, - ARRAY['50'], - ARRAY[101, 201]); -ERROR: Invalid Node Id '101'. --- UDF fails if split point specified is equal to the max value in the range. --- Example: ShardId 81060002 range is from (-2147483648, -1073741825) --- '-1073741825' as split point is invalid. --- '-1073741826' is valid and will split to: (-2147483648, -1073741826) and (-1073741825, -1073741825) -SELECT citus_split_shard_by_split_points( - 49761300, -- Shard range is from (-2147483648, -1073741825) - ARRAY['-1073741825'], -- Split point equals shard's max value. - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Invalid split point -1073741825, as split points should be inclusive. Please use -1073741826 instead. --- UDF fails where source shard cannot be split further i.e min and max range is equal. --- Create a Shard where range cannot be split further -SELECT isolate_tenant_to_new_shard('table_to_split', 1); - isolate_tenant_to_new_shard ---------------------------------------------------------------------- - 49761305 -(1 row) - -SELECT citus_split_shard_by_split_points( - 49761305, - ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Cannot split shard id "49761305" as min/max range are equal: ('-1905060026', '-1905060026'). --- Create distributed table with replication factor > 1 -SET citus.shard_replication_factor TO 2; -SET citus.next_shard_id TO 51261400; -CREATE TABLE table_to_split_replication_factor_2 (id bigserial PRIMARY KEY, value char); -SELECT create_distributed_table('table_to_split_replication_factor_2','id'); - create_distributed_table + ARRAY[:worker_1_node, :worker_2_node], + 'non_blocking'); +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (100, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (102, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (1, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (2, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_to_split (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (101, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_to_split OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'CREATE TABLE citus_split_shard_by_split_points_negative.table_second (id bigint NOT NULL, value character(1)) ') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT worker_apply_shard_ddl_command (103, 'ALTER TABLE citus_split_shard_by_split_points_negative.table_second OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT * FROM worker_split_shard_replication_setup(ARRAY[ROW(1, 100, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(1, 101, '-1073741825', '2147483647', 18)::citus.split_shard_info,ROW(2, 102, '-2147483648', '-1073741826', 16)::citus.split_shard_info,ROW(2, 103, '-1073741825', '2147483647', 18)::citus.split_shard_info]) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +WARNING: connection claimed exclusively at transaction commit +WARNING: connection claimed exclusively at transaction commit + citus_split_shard_by_split_points --------------------------------------------------------------------- (1 row) --- UDF fails for replication factor > 1 -SELECT citus_split_shard_by_split_points( - 51261400, - ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Operation split not supported for shard as replication factor '2' is greater than 1. --- Create distributed table with columnar type. -SET citus.next_shard_id TO 51271400; -CREATE TABLE table_to_split_columnar (id bigserial PRIMARY KEY, value char) USING columnar; -SELECT create_distributed_table('table_to_split_columnar','id'); - create_distributed_table +-- On worker2, we want child shard xxxxx and dummy shard xxxxx -- +-- on worker1, we want child shard xxxxx and 1 and dummy shard xxxxx -- +\c - - - :worker_2_port +SET search_path TO citus_split_shard_by_split_points_negative; +SELECT * FROM show_catalog; + Schema | Name | Owner --------------------------------------------------------------------- + citus_split_shard_by_split_points_negative | table_second | postgres + citus_split_shard_by_split_points_negative | table_second_103 | postgres + citus_split_shard_by_split_points_negative | table_second_2 | postgres + citus_split_shard_by_split_points_negative | table_to_split | postgres + citus_split_shard_by_split_points_negative | table_to_split_1 | postgres + citus_split_shard_by_split_points_negative | table_to_split_101 | postgres +(6 rows) -(1 row) - --- UDF fails for columnar table. -SELECT citus_split_shard_by_split_points( - 51271400, - ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: Cannot split shard as operation is not supported for Columnar tables. --- Create distributed table which is partitioned. -SET citus.next_shard_id TO 51271900; -CREATE TABLE table_to_split_partitioned(id integer, dt date) PARTITION BY RANGE(dt); -SELECT create_distributed_table('table_to_split_partitioned','id'); - create_distributed_table +\c - - - :worker_1_port +SET search_path TO citus_split_shard_by_split_points_negative; +SELECT * FROM show_catalog; + Schema | Name | Owner --------------------------------------------------------------------- + citus_split_shard_by_split_points_negative | table_second | postgres + citus_split_shard_by_split_points_negative | table_second_102 | postgres + citus_split_shard_by_split_points_negative | table_second_103 | postgres + citus_split_shard_by_split_points_negative | table_second_2 | postgres + citus_split_shard_by_split_points_negative | table_to_split | postgres + citus_split_shard_by_split_points_negative | table_to_split_1 | postgres + citus_split_shard_by_split_points_negative | table_to_split_100 | postgres + citus_split_shard_by_split_points_negative | table_to_split_101 | postgres +(8 rows) -(1 row) +SELECT * FROM pg_publication_tables; + pubname | schemaname | tablename +--------------------------------------------------------------------- +(0 rows) --- UDF fails for partitioned table. -SELECT citus_split_shard_by_split_points( - 51271900, - ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node]); -ERROR: cannot split of 'table_to_split_partitioned', because it is a partitioned table -DETAIL: In colocation group of 'table_to_split_partitioned', a partitioned relation exists: 'table_to_split_partitioned'. Citus does not support split of partitioned tables. diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index c05dc81b0..aba290e38 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -64,16 +64,16 @@ CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char); SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(7, 9, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. - worker_split_shard_replication_setup + count --------------------------------------------------------------------- - + 2 (1 row) SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index ed7d5d400..f06a6dd88 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -62,13 +62,13 @@ CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char); CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); - worker_split_shard_replication_setup + count --------------------------------------------------------------------- - + 1 (1 row) SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index afbc515a2..17750b458 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -10,13 +10,13 @@ SET client_min_messages TO ERROR; -- Create publication at worker1 CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]); - worker_split_shard_replication_setup + count --------------------------------------------------------------------- - + 1 (1 row) SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 09c5fddec..35c42e129 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -8,14 +8,14 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); WARNING: Previous split shard worflow was not successfully and could not complete the cleanup phase. Continuing with the current split shard workflow. - worker_split_shard_replication_setup + count --------------------------------------------------------------------- - + 2 (1 row) SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 871a62031..63190ec12 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -5,12 +5,15 @@ test: multi_cluster_management test: multi_test_catalog_views test: tablespace # Helpers for foreign key catalogs. -test: foreign_key_to_reference_table +#test: foreign_key_to_reference_table # Split tests go here. -#test: citus_sameer +test: citus_sameer #test: split_shard_replication_setup -test: worker_split_copy_test -test: worker_split_binary_copy_test -test: worker_split_text_copy_test -test: citus_split_shard_by_split_points_negative -test: citus_split_shard_by_split_points +#test: split_shard_replication_setup_remote_local +#test: split_shard_replication_setup_local +#test: split_shard_replication_colocated_setup +#test: worker_split_copy_test +#test: worker_split_binary_copy_test +#test: worker_split_text_copy_test +#test: citus_split_shard_by_split_points_negative +#test: citus_split_shard_by_split_points diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index c49b8ad2d..86c7af76f 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -15,7 +15,6 @@ CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); SELECT create_distributed_table('table_to_split','id'); SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); - SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset @@ -49,7 +48,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT citus_split_shard_by_split_points( 1, ARRAY['-1073741826'], - ARRAY[:worker_1_node, :worker_2_node], + ARRAY[:worker_2_node, :worker_2_node], 'non_blocking'); -- On worker2, we want child shard 2 and dummy shard 1 -- -- on worker1, we want child shard 3 and 1 and dummy shard 2 -- diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index a73595f1c..8ef1f3090 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -66,7 +66,7 @@ SET search_path TO split_shard_replication_setup_schema; CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6; CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(4, 5, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(4, 6, '0', '2147483647', :worker_2_node)::citus.split_shard_info, ROW(7, 8, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index b8f9ace57..12195f751 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -66,7 +66,7 @@ CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char); -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_2_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index cfabfb150..f48519f86 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -13,7 +13,7 @@ SET client_min_messages TO ERROR; CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3; -- Worker1 is target for table_to_split_2 and table_to_split_3 -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]); diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index 92fd7ffaf..ff2ebb9a5 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -11,7 +11,7 @@ SET search_path TO split_shard_replication_setup_schema; -- Create publication at worker1 CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3; -SELECT worker_split_shard_replication_setup(ARRAY[ +SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 2, '-2147483648', '-1', :worker_1_node)::citus.split_shard_info, ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]);