From 8c871bcd103c61f8c2f63c372f8a1dba787d01f0 Mon Sep 17 00:00:00 2001 From: Sameer Awasekar Date: Fri, 22 Jul 2022 20:10:18 +0530 Subject: [PATCH] Adding comments and code for cleanup --- .../connection/connection_configuration.c | 5 +- .../distributed/operations/shard_split.c | 235 +++------ .../replication/multi_logical_replication.c | 9 +- .../shardsplit_logical_replication.c | 457 +++++++++-------- .../shardsplit/shardsplit_shared_memory.c | 4 +- .../distributed/multi_logical_replication.h | 9 +- src/include/distributed/shard_split.h | 2 - .../shardsplit_logical_replication.h | 63 ++- .../citus_non_blocking_split_shards.out | 465 ++++++++++++++++++ src/test/regress/expected/citus_sameer.out | 28 +- ...plit_shard_replication_colocated_setup.out | 4 +- .../split_shard_replication_setup.out | 2 +- .../split_shard_replication_setup_local.out | 2 +- ...t_shard_replication_setup_remote_local.out | 4 +- src/test/regress/split_schedule | 23 +- .../sql/citus_non_blocking_split_shards.sql | 240 +++++++++ src/test/regress/sql/citus_sameer.sql | 2 +- ...plit_shard_replication_colocated_setup.sql | 4 +- .../sql/split_shard_replication_setup.sql | 2 +- .../split_shard_replication_setup_local.sql | 2 +- ...t_shard_replication_setup_remote_local.sql | 4 +- 21 files changed, 1122 insertions(+), 444 deletions(-) create mode 100644 src/test/regress/expected/citus_non_blocking_split_shards.out create mode 100644 src/test/regress/sql/citus_non_blocking_split_shards.sql diff --git a/src/backend/distributed/connection/connection_configuration.c b/src/backend/distributed/connection/connection_configuration.c index 21f66fd26..ff4e7d04e 100644 --- a/src/backend/distributed/connection/connection_configuration.c +++ b/src/backend/distributed/connection/connection_configuration.c @@ -379,8 +379,9 @@ GetConnParams(ConnectionHashKey *key, char ***keywords, char ***values, if (key->replication) { - connKeywords[authParamsIdx] = "replication"; - connValues[authParamsIdx] = "database"; + connKeywords[authParamsIdx] = MemoryContextStrdup(context, "replication"); + connValues[authParamsIdx] = MemoryContextStrdup(context, "database"); + authParamsIdx++; } diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 1f8bf3175..1aad1eaf7 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -71,11 +71,6 @@ static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalLis List *shardGroupSplitIntervalListList, WorkerNode *sourceWorkerNode, List *workersForPlacementList); -static void SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - WorkerNode *sourceWorkerNode, - List *workersForPlacementList); static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); @@ -100,10 +95,6 @@ static void DoSplitCopy(WorkerNode *sourceShardNode, List *shardGroupSplitIntervalListList, List *workersForPlacementList, char *snapShotName); -static void DoSplitCopy(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); static StringInfo CreateSplitCopyCommand(ShardInterval *sourceShardSplitInterval, List *splitChildrenShardIntervalList, List *workersForPlacementList); @@ -124,7 +115,9 @@ static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInt static void DropDummyShards(void); void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, - WorkerNode *sourceWorkerNode); + WorkerNode *sourceWorkerNode, + MultiConnection ** + templateSlotConnection); static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, List *sourceColocatedShardIntervalList, @@ -181,28 +174,6 @@ ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShar errdetail("Splitting shards backed by foreign tables " "is not supported."))); } - - /* - * At the moment, we do not support copying a shard if that shard's - * relation is in a colocation group with a partitioned table or partition. - */ - if (PartitionedTable(colocatedTableId)) - { - char *sourceRelationName = get_rel_name(relationId); - char *colocatedRelationName = get_rel_name(colocatedTableId); - - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot %s of '%s', because it " - "is a partitioned table", - SplitOperationName[splitOperation], - colocatedRelationName), - errdetail("In colocation group of '%s', a partitioned " - "relation exists: '%s'. Citus does not support " - "%s of partitioned tables.", - sourceRelationName, - colocatedRelationName, - SplitOperationName[splitOperation]))); - } } /* check shards with inactive placements */ @@ -787,48 +758,48 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, */ if (!PartitionedTable(sourceShardIntervalToCopy->relationId)) { - StringInfo splitCopyUdfCommand = CreateSplitCopyCommand(sourceShardIntervalToCopy, - splitShardIntervalList, - destinationWorkerNodesList); + StringInfo splitCopyUdfCommand = CreateSplitCopyCommand( + sourceShardIntervalToCopy, + splitShardIntervalList, + destinationWorkerNodesList); - List *ddlCommandList = NIL; - StringInfo beginTransaction = makeStringInfo(); - appendStringInfo(beginTransaction, - "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); - ddlCommandList = lappend(ddlCommandList, beginTransaction->data); + List *ddlCommandList = NIL; + StringInfo beginTransaction = makeStringInfo(); + appendStringInfo(beginTransaction, + "BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ;"); + ddlCommandList = lappend(ddlCommandList, beginTransaction->data); - /* Set snapshot */ - if (snapShotName != NULL) - { - StringInfo snapShotString = makeStringInfo(); - appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", - quote_literal_cstr( - snapShotName)); - ddlCommandList = lappend(ddlCommandList, snapShotString->data); - printf("Sameer final string snapshotted:%s\n", snapShotString->data); - } + /* Set snapshot */ + if (snapShotName != NULL) + { + StringInfo snapShotString = makeStringInfo(); + appendStringInfo(snapShotString, "SET TRANSACTION SNAPSHOT %s;", + quote_literal_cstr( + snapShotName)); + ddlCommandList = lappend(ddlCommandList, snapShotString->data); + printf("Sameer final string snapshotted:%s\n", snapShotString->data); + } - ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); + ddlCommandList = lappend(ddlCommandList, splitCopyUdfCommand->data); - StringInfo commitCommand = makeStringInfo(); - appendStringInfo(commitCommand, "COMMIT;"); - ddlCommandList = lappend(ddlCommandList, commitCommand->data); + StringInfo commitCommand = makeStringInfo(); + appendStringInfo(commitCommand, "COMMIT;"); + ddlCommandList = lappend(ddlCommandList, commitCommand->data); - Task *splitCopyTask = CitusMakeNode(Task); - splitCopyTask->jobId = sourceShardIntervalToCopy->shardId; - splitCopyTask->taskId = taskId; - splitCopyTask->taskType = READ_TASK; - splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID; - SetTaskQueryStringList(splitCopyTask, ddlCommandList); + Task *splitCopyTask = CitusMakeNode(Task); + splitCopyTask->jobId = sourceShardIntervalToCopy->shardId; + splitCopyTask->taskId = taskId; + splitCopyTask->taskType = READ_TASK; + splitCopyTask->replicationModel = REPLICATION_MODEL_INVALID; + SetTaskQueryStringList(splitCopyTask, ddlCommandList); - ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); - SetPlacementNodeMetadata(taskPlacement, sourceShardNode); + ShardPlacement *taskPlacement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(taskPlacement, sourceShardNode); - splitCopyTask->taskPlacementList = list_make1(taskPlacement); - - splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask); - taskId++; + splitCopyTask->taskPlacementList = list_make1(taskPlacement); + splitCopyTaskList = lappend(splitCopyTaskList, splitCopyTask); + taskId++; } } @@ -1307,16 +1278,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); { - /* - * Physically create split children, perform split copy and create auxillary structures. - * This includes: indexes, replicaIdentity. triggers and statistics. - * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). - */ - CreateSplitShardsForShardGroupTwo( - sourceShardToCopyNode, - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - workersForPlacementList); + /* Physically create split children. */ + CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, + shardGroupSplitIntervalListList, + workersForPlacementList); CreateDummyShardsForShardGroup( @@ -1327,9 +1292,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /*Create Template Replication Slot */ - char *snapShotName = NULL; - snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(shardIntervalToSplit, sourceShardToCopyNode); + /* Create Template Replication Slot */ + MultiConnection *templateSlotConnection = NULL; + char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( + shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); /* DoSplitCopy */ DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, @@ -1354,7 +1320,10 @@ NonBlockingShardSplit(SplitOperation splitOperation, superUser, databaseName); /* Create copies of template replication slot */ - CreateReplicationSlots(sourceConnection, shardSplitSubscribersMetadataList); + char *templateSlotName = ShardSplitTemplateReplicationSlotName( + shardIntervalToSplit->shardId); + CreateReplicationSlots(sourceConnection, templateSlotName, + shardSplitSubscribersMetadataList); CreateShardSplitSubscriptions(targetNodeConnectionList, shardSplitSubscribersMetadataList, @@ -1382,6 +1351,12 @@ NonBlockingShardSplit(SplitOperation splitOperation, WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); + /* Drop Subscribers */ + DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); + + /* Drop Publications */ + DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); + /* * Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks @@ -1400,7 +1375,17 @@ NonBlockingShardSplit(SplitOperation splitOperation, */ CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); + DropDummyShards(); + + /* Close source connection */ + CloseConnection(sourceConnection); + + /* Close all subscriber connections */ + CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); + + /* Close connection of template replication slot */ + CloseConnection(templateSlotConnection); } PG_CATCH(); { @@ -1561,84 +1546,6 @@ CreateWorkerForPlacementSet(List *workersForPlacementList) } -static void -SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - WorkerNode *sourceWorkerNode, - List *destinationWorkerNodesList) -{ - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - int connectionFlags = FORCE_NEW_CONNECTION; - - HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList); - - DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); - - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - sourceWorkerNode-> - workerName, - sourceWorkerNode-> - workerPort, - superUser, - databaseName); - ClaimConnectionExclusively(sourceConnection); - - CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - - /*Create Template Replication Slot */ - - /* DoSplitCopy */ - - /*worker_split_replication_setup_udf*/ - List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( - sourceWorkerNode, - sourceColocatedShardIntervalList, - shardGroupSplitIntervalListList, - destinationWorkerNodesList); - - /* Subscriber flow starts from here */ - List *shardSplitSubscribersMetadataList = PopulateShardSplitSubscriptionsMetadataList( - shardSplitHashMapForPublication, replicationSlotInfoList); - - List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( - shardSplitSubscribersMetadataList, - connectionFlags, - superUser, databaseName); - - CreateShardSplitSubscriptions(targetNodeConnectionList, - shardSplitSubscribersMetadataList, - sourceWorkerNode, - superUser, - databaseName); - - WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitSubscribersMetadataList); - - XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); - - CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, - destinationWorkerNodesList); - - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); - - BlockWritesToShardList(sourceColocatedShardIntervalList); - - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitSubscribersMetadataList); - - /*DropAllShardSplitLeftOvers(sourceWorkerNode, shardSplitHashMapForPublication); */ -} - - static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) { @@ -1681,6 +1588,7 @@ DropDummyShards() int connectionFlags = FOR_DDL; connectionFlags |= OUTSIDE_TRANSACTION; + connectionFlags |= FORCE_NEW_CONNECTION; MultiConnection *connection = GetNodeUserDatabaseConnection( connectionFlags, shardToBeDroppedNode->workerName, @@ -1694,6 +1602,8 @@ DropDummyShards() { TryDropShard(connection, shardInterval); } + + CloseConnection(connection); } } @@ -1721,7 +1631,8 @@ TryDropShard(MultiConnection *connection, ShardInterval *shardInterval) char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, - WorkerNode *sourceWorkerNode) + WorkerNode *sourceWorkerNode, + MultiConnection **templateSlotConnection) { /*Create Template replication slot */ int connectionFlags = FORCE_NEW_CONNECTION; @@ -1736,9 +1647,12 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, get_database_name( MyDatabaseId)); + ClaimConnectionExclusively(sourceConnection); + char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, sourceConnection); + *templateSlotConnection = sourceConnection; return snapShotName; } @@ -1785,6 +1699,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, PQclear(result); ForgetResults(sourceConnection); + CloseConnection(sourceConnection); } /* Get replication slot information */ @@ -1792,8 +1707,8 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, PQclear(result); ForgetResults(sourceConnection); - UnclaimConnection(sourceConnection); + CloseConnection(sourceConnection); return replicationSlotInfoList; } diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 4eb5601e9..c631f234d 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -140,7 +140,6 @@ static XLogRecPtr GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds, char *operationPrefix); static char * ShardMovePublicationName(Oid ownerId); -static char * ShardSubscriptionName(Oid ownerId, char *operationPrefix); static void AcquireLogicalReplicationLock(void); static void DropAllShardMoveLeftovers(void); static void DropAllShardMoveSubscriptions(MultiConnection *connection); @@ -149,8 +148,6 @@ static void DropAllShardMovePublications(MultiConnection *connection); static void DropAllShardMoveUsers(MultiConnection *connection); static char * ShardSubscriptionNamesValueList(Bitmapset *tableOwnerIds, char *operationPrefix); -static void DropShardMoveReplicationSlot(MultiConnection *connection, - char *publicationName); /* * LogicallyReplicateShards replicates a list of shards from one node to another @@ -1096,7 +1093,7 @@ DropShardMovePublications(MultiConnection *connection, Bitmapset *tableOwnerIds) * DropShardMoveReplicationSlot drops the replication slot with the given name * if it exists. */ -static void +void DropShardMoveReplicationSlot(MultiConnection *connection, char *replicationSlotName) { ExecuteCriticalRemoteCommand( @@ -1144,7 +1141,7 @@ ShardMovePublicationName(Oid ownerId) * This PID is then extracted from the application_name to find out which PID on the * coordinator is blocked by the blocked replication process. */ -static char * +char * ShardSubscriptionName(Oid ownerId, char *operationPrefix) { if (RunningUnderIsolationTest) @@ -1162,7 +1159,7 @@ ShardSubscriptionName(Oid ownerId, char *operationPrefix) * ShardSubscriptionRole returns the name of the role used by the * subscription that subscribes to the tables of the given owner. */ -static char * +char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix) { return psprintf("%s%i", operationPrefix, ownerId); diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 00c283361..14fa0de28 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -22,6 +22,7 @@ #include "utils/builtins.h" #include "commands/dbcommands.h" + static HTAB *ShardInfoHashMapForPublications = NULL; /* function declarations */ @@ -32,46 +33,15 @@ ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwner nodeId, List * replicationSlotInfoList); - static void CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList, uint32_t publicationForTargetNodeId, Oid tableOwner); - static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); - - static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); static void DropAllShardSplitUsers(MultiConnection *cleanupConnection); -static void DropAllReplicationSlots(List *replicationSlotInfo); - - -List * -ParseReplicationSlotInfoFromResult(PGresult *result) -{ - int64 rowCount = PQntuples(result); - int64 colCount = PQnfields(result); - - List *replicationSlotInfoList = NIL; - for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) - { - ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( - sizeof(ReplicationSlotInfo)); - - char *targeNodeIdString = PQgetvalue(result, rowIndex, 0); - replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); - - /* we're using the pstrdup to copy the data into the current memory context */ - replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, 1)); - - replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, 2)); - - replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); - } - - return replicationSlotInfoList; -} +static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection); HTAB * @@ -150,74 +120,6 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, } -void -LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, - List *shardSplitPubSubMetadataList, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList) -{ - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - int connectionFlags = FORCE_NEW_CONNECTION; - - /* Get source node connection */ - MultiConnection *sourceConnection = - GetNodeUserDatabaseConnection(connectionFlags, sourceWorkerNode->workerName, - sourceWorkerNode->workerPort, - superUser, databaseName); - - ClaimConnectionExclusively(sourceConnection); - - List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( - shardSplitPubSubMetadataList, - connectionFlags, - superUser, databaseName); - - /* create publications */ - /*CreateShardSplitPublications(sourceConnection, shardSplitPubSubMetadataList); */ - - CreateShardSplitSubscriptions(targetNodeConnectionList, - shardSplitPubSubMetadataList, - sourceWorkerNode, - superUser, - databaseName); - - WaitForShardSplitRelationSubscriptionsBecomeReady(shardSplitPubSubMetadataList); - - XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitPubSubMetadataList); - - CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, - destinationWorkerNodesList); - - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitPubSubMetadataList); - - BlockWritesToShardList(sourceColocatedShardIntervalList); - - sourcePosition = GetRemoteLogPosition(sourceConnection); - WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, - shardSplitPubSubMetadataList); - - /*TOOD : Create foreign key constraints and handle partitioned tables*/ -} - - -void -PrintShardSplitPubSubMetadata(ShardSplitSubscriberMetadata *shardSplitMetadata) -{ - printf("\nsameer: ShardSplitPubSbuMetadata"); - ReplicationSlotInfo *replicationInfo = shardSplitMetadata->slotInfo; - printf("Manual Username from OID at source: %s \n", GetUserNameFromId( - shardSplitMetadata->tableOwnerId, false)); - printf("slotname:%s targetNode:%u tableOwner:%s \n", replicationInfo->slotName, - replicationInfo->targetNodeId, replicationInfo->tableOwnerName); - printf("\n"); -} - void CreateShardSplitSubscriptions(List *targetNodeConnectionList, List *shardSplitPubSubMetadataList, @@ -348,18 +250,14 @@ char * DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, MultiConnection *sourceConnection) { - StringInfo splitTemplateReplicationSlotName = makeStringInfo(); - appendStringInfo(splitTemplateReplicationSlotName, - "citus_split_replicationslot_for_shard_%lu", - shardIntervalToSplit->shardId); - /* * To ensure SPLIT is idempotent drop any existing slot from * previous failed operation. */ StringInfo dropReplicationSlotCommand = makeStringInfo(); appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')", - splitTemplateReplicationSlotName->data); + ShardSplitTemplateReplicationSlotName( + shardIntervalToSplit->shardId)); /* The Drop command can fail so ignore the response / result and proceed anyways */ PGresult *result = NULL; @@ -383,7 +281,8 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo /*TODO(saawasek): Try creating TEMPORAL once basic flow is ready and we have a testcase*/ appendStringInfo(createReplicationSlotCommand, "CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;", - splitTemplateReplicationSlotName->data); + ShardSplitTemplateReplicationSlotName( + shardIntervalToSplit->shardId)); response = ExecuteOptionalRemoteCommand(sourceConnection, createReplicationSlotCommand->data, &result); @@ -396,103 +295,10 @@ DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalTo /*'snapshot_name' is second column where index starts from zero. * We're using the pstrdup to copy the data into the current memory context */ char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */)); - return snapShotName; } -void -DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub) -{ - char *superUser = CitusExtensionOwnerName(); - char *databaseName = get_database_name(MyDatabaseId); - - /* - * We open new connections to all nodes. The reason for this is that - * operations on subscriptions and publications cannot be run in a - * transaction. By forcing a new connection we make sure no transaction is - * active on the connection. - */ - int connectionFlags = FORCE_NEW_CONNECTION; - - HASH_SEQ_STATUS statusForSubscription; - hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); - - NodeShardMappingEntry *entry = NULL; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != - NULL) - { - uint32_t nodeId = entry->key.nodeId; - WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); - MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection( - connectionFlags, workerNode->workerName, workerNode->workerPort, - superUser, databaseName); - - DropAllShardSplitSubscriptions(cleanupConnection); - DropAllShardSplitUsers(cleanupConnection); - - CloseConnection(cleanupConnection); - } - - /*Drop all shard split publications at the source*/ - MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( - connectionFlags, sourceNode->workerName, sourceNode->workerPort, - superUser, databaseName); - - DropAllShardSplitPublications(sourceNodeConnection); - - CloseConnection(sourceNodeConnection); -} - - -void -DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) -{ - char *query = psprintf( - "SELECT subname FROM pg_subscription " - "WHERE subname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX)); - List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query); - char *subscriptionName = NULL; - foreach_ptr(subscriptionName, subscriptionNameList) - { - DropShardSubscription(cleanupConnection, subscriptionName); - } -} - - -static void -DropAllShardSplitPublications(MultiConnection *connection) -{ - char *query = psprintf( - "SELECT pubname FROM pg_publication " - "WHERE pubname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX)); - List *publicationNameList = GetQueryResultStringList(connection, query); - char *publicationName; - foreach_ptr(publicationName, publicationNameList) - { - DropShardPublication(connection, publicationName); - } -} - - -void -DropAllShardSplitUsers(MultiConnection *connection) -{ - char *query = psprintf( - "SELECT rolname FROM pg_roles " - "WHERE rolname LIKE %s || '%%'", - quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); - List *usernameList = GetQueryResultStringList(connection, query); - char *username; - foreach_ptr(username, usernameList) - { - DropShardUser(connection, username); - } -} - - void CreateShardSplitPublications(MultiConnection *sourceConnection, HTAB *shardInfoHashMapForPublication) @@ -560,17 +366,13 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId, } } - PrintShardSplitPubSubMetadata(shardSplitSubscriberMetadata); - return shardSplitSubscriberMetadata; } /*TODO(saawasek): Remove existing slots before creating newer ones */ - -/* extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, List * shardSplitSubscriberMetadataList); */ void -CreateReplicationSlots(MultiConnection *sourceNodeConnection, +CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName, List *shardSplitSubscriberMetadataList) { ShardSplitSubscriberMetadata *subscriberMetadata = NULL; @@ -580,10 +382,9 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, StringInfo createReplicationSlotCommand = makeStringInfo(); - /* TODO(niupre): Replace pgoutput with an appropriate name (to e introduced in by saawasek's PR) */ appendStringInfo(createReplicationSlotCommand, - "SELECT * FROM pg_create_logical_replication_slot('%s','citus', false)", - slotName); + "SELECT * FROM pg_copy_logical_replication_slot ('%s','%s')", + templateSlotName, slotName); PGresult *result = NULL; int response = ExecuteOptionalRemoteCommand(sourceNodeConnection, @@ -598,3 +399,243 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, ForgetResults(sourceNodeConnection); } } + + +/* + * ShardSplitTemplateReplicationSlotName returns name of template replication slot. + */ +char * +ShardSplitTemplateReplicationSlotName(uint64 shardId) +{ + return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId); +} + + +/* + * ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'. + * 'replication_slot_info' is a tuple with below format: + * + */ +List * +ParseReplicationSlotInfoFromResult(PGresult *result) +{ + int64 rowCount = PQntuples(result); + int64 colCount = PQnfields(result); + + List *replicationSlotInfoList = NIL; + for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0( + sizeof(ReplicationSlotInfo)); + + char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/); + + replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10); + + /* We're using the pstrdup to copy the data into the current memory context */ + replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex, + 1 /* table owner name column */)); + + /* Replication slot name */ + replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex, + 2 /* slot name column */)); + + replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); + } + + return replicationSlotInfoList; +} + + +/* + * DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles + * and replication slots on all nodes. These might have been left there after + * the coordinator crashed during a shard split. It's important to delete them + * for two reasons: + * 1. Starting new shard split might fail when they exist, because it cannot + * create them. + * 2. Leftover replication slots that are not consumed from anymore make it + * impossible for WAL to be dropped. This can cause out-of-disk issues. + */ +void +DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPubSub) +{ + char *superUser = CitusExtensionOwnerName(); + char *databaseName = get_database_name(MyDatabaseId); + + /* + * We open new connections to all nodes. The reason for this is that + * operations on subscriptions and publications cannot be run in a + * transaction. By forcing a new connection we make sure no transaction is + * active on the connection. + */ + int connectionFlags = FORCE_NEW_CONNECTION; + + HASH_SEQ_STATUS statusForSubscription; + hash_seq_init(&statusForSubscription, shardSplitHashMapForPubSub); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&statusForSubscription)) != + NULL) + { + uint32_t nodeId = entry->key.nodeId; + WorkerNode *workerNode = FindNodeWithNodeId(nodeId, false /*missingOk*/); + + MultiConnection *cleanupConnection = GetNodeUserDatabaseConnection( + connectionFlags, workerNode->workerName, workerNode->workerPort, + superUser, databaseName); + + /* We need to claim the connection exclusively while dropping the subscription */ + ClaimConnectionExclusively(cleanupConnection); + + DropAllShardSplitSubscriptions(cleanupConnection); + DropAllShardSplitUsers(cleanupConnection); + + /* Close connection after cleanup */ + CloseConnection(cleanupConnection); + } + + /*Drop all shard split publications at the source*/ + MultiConnection *sourceNodeConnection = GetNodeUserDatabaseConnection( + connectionFlags, sourceNode->workerName, sourceNode->workerPort, + superUser, databaseName); + + ClaimConnectionExclusively(sourceNodeConnection); + + /* + * If replication slot could not be dropped while dropping the + * subscriber, drop it here. + */ + DropAllShardSplitReplicationSlots(sourceNodeConnection); + DropAllShardSplitPublications(sourceNodeConnection); + + CloseConnection(sourceNodeConnection); +} + + +void +DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection) +{ + char *query = psprintf( + "SELECT subname FROM pg_subscription " + "WHERE subname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_PREFIX)); + List *subscriptionNameList = GetQueryResultStringList(cleanupConnection, query); + char *subscriptionName = NULL; + foreach_ptr(subscriptionName, subscriptionNameList) + { + DropShardSubscription(cleanupConnection, subscriptionName); + } +} + + +static void +DropAllShardSplitPublications(MultiConnection *connection) +{ + char *query = psprintf( + "SELECT pubname FROM pg_publication " + "WHERE pubname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_PUBLICATION_PREFIX)); + List *publicationNameList = GetQueryResultStringList(connection, query); + char *publicationName; + foreach_ptr(publicationName, publicationNameList) + { + DropShardPublication(connection, publicationName); + } +} + + +static void +DropAllShardSplitUsers(MultiConnection *connection) +{ + char *query = psprintf( + "SELECT rolname FROM pg_roles " + "WHERE rolname LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); + List *usernameList = GetQueryResultStringList(connection, query); + char *username; + foreach_ptr(username, usernameList) + { + DropShardUser(connection, username); + } +} + + +static void +DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection) +{ + char *query = psprintf( + "SELECT slot_name FROM pg_replication_slots " + "WHERE slot_name LIKE %s || '%%'", + quote_literal_cstr(SHARD_SPLIT_REPLICATION_SLOT_PREFIX)); + List *slotNameList = GetQueryResultStringList(cleanupConnection, query); + char *slotName; + foreach_ptr(slotName, slotNameList) + { + DropShardMoveReplicationSlot(cleanupConnection, slotName); + } +} + + +/* + * DropShardSplitPublications drops the publication used for shard splits over the given + * connection, if it exists. + */ +void +DropShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication) +{ + HASH_SEQ_STATUS status; + hash_seq_init(&status, shardInfoHashMapForPublication); + + NodeShardMappingEntry *entry = NULL; + while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) + { + uint32 nodeId = entry->key.nodeId; + uint32 tableOwnerId = entry->key.tableOwnerId; + DropShardPublication(sourceConnection, ShardSplitPublicationName(nodeId, + tableOwnerId)); + } +} + + +/* + * DropShardSplitSubsriptions drops subscriptions from the subscriber node that + * are used to split shards for the given table owners. Note that, it drops the + * replication slots on the publisher node if it can drop the slots as well + * with the DROP SUBSCRIPTION command. Otherwise, only the subscriptions will + * be deleted with DROP SUBSCRIPTION via the connection. In the latter case, + * replication slots will be dropped separately by calling DropShardSplitReplicationSlots. + */ +void +DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList) +{ + ShardSplitSubscriberMetadata *subscriberMetadata = NULL; + foreach_ptr(subscriberMetadata, shardSplitSubscribersMetadataList) + { + uint32 tableOwnerId = subscriberMetadata->tableOwnerId; + MultiConnection *targetNodeConnection = subscriberMetadata->targetNodeConnection; + + DropShardSubscription(targetNodeConnection, ShardSubscriptionName(tableOwnerId, + SHARD_SPLIT_SUBSCRIPTION_PREFIX)); + + DropShardUser(targetNodeConnection, ShardSubscriptionRole(tableOwnerId, + SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX)); + } +} + + +/* + * CloseShardSplitSubscriberConnections closes connection of subscriber nodes. + * 'ShardSplitSubscriberMetadata' holds connection for a subscriber node. The method + * traverses the list and closes each connection. + */ +void +CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList) +{ + ShardSplitSubscriberMetadata *subscriberMetadata = NULL; + foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList) + { + CloseConnection(subscriberMetadata->targetNodeConnection); + } +} diff --git a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c index c6eb9bc0d..ff3f8f226 100644 --- a/src/backend/distributed/shardsplit/shardsplit_shared_memory.c +++ b/src/backend/distributed/shardsplit/shardsplit_shared_memory.c @@ -15,6 +15,7 @@ #include "distributed/shardinterval_utils.h" #include "distributed/shardsplit_shared_memory.h" #include "distributed/citus_safe_lib.h" +#include "distributed/multi_logical_replication.h" #include "storage/ipc.h" #include "utils/memutils.h" #include "common/hashfn.h" @@ -199,7 +200,8 @@ encode_replication_slot(uint32_t nodeId, uint32_t tableOwnerId) { StringInfo slotName = makeStringInfo(); - appendStringInfo(slotName, "citus_split_%u_%u", nodeId, tableOwnerId); + appendStringInfo(slotName, "%s%u_%u", SHARD_SPLIT_REPLICATION_SLOT_PREFIX, nodeId, + tableOwnerId); if (slotName->len > NAMEDATALEN) { diff --git a/src/include/distributed/multi_logical_replication.h b/src/include/distributed/multi_logical_replication.h index b079baff2..f91b1f850 100644 --- a/src/include/distributed/multi_logical_replication.h +++ b/src/include/distributed/multi_logical_replication.h @@ -34,7 +34,12 @@ extern void DropShardSubscription(MultiConnection *connection, extern void DropShardPublication(MultiConnection *connection, char *publicationName); extern void DropShardUser(MultiConnection *connection, char *username); +extern void DropShardMoveReplicationSlot(MultiConnection *connection, + char *publicationName); + +extern char * ShardSubscriptionRole(Oid ownerId, char *operationPrefix); +extern char * ShardSubscriptionName(Oid ownerId, char *operationPrefix); extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName, int sourceNodePort, char *userName, char *databaseName, @@ -50,9 +55,11 @@ extern void WaitForShardSubscriptionToCatchUp(MultiConnection *targetConnection, char *operationPrefix); #define SHARD_MOVE_PUBLICATION_PREFIX "citus_shard_move_publication_" -#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" #define SHARD_MOVE_SUBSCRIPTION_ROLE_PREFIX "citus_shard_move_subscription_role_" +#define SHARD_MOVE_SUBSCRIPTION_PREFIX "citus_shard_move_subscription_" #define SHARD_SPLIT_PUBLICATION_PREFIX "citus_shard_split_publication_" #define SHARD_SPLIT_SUBSCRIPTION_PREFIX "citus_shard_split_subscription_" #define SHARD_SPLIT_SUBSCRIPTION_ROLE_PREFIX "citus_shard_split_subscription_role_" +#define SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX "citus_shard_split_template_slot_" +#define SHARD_SPLIT_REPLICATION_SLOT_PREFIX "citus_shard_split_" #endif /* MULTI_LOGICAL_REPLICATION_H_ */ diff --git a/src/include/distributed/shard_split.h b/src/include/distributed/shard_split.h index 59fc87fa5..7dff7bb71 100644 --- a/src/include/distributed/shard_split.h +++ b/src/include/distributed/shard_split.h @@ -59,8 +59,6 @@ extern void SplitShard(SplitMode splitMode, /* TODO(niupre): Make all these APIs private when all consumers (Example : ISOLATE_TENANT_TO_NEW_SHARD) directly call 'SplitShard' API. */ extern void ErrorIfCannotSplitShard(SplitOperation splitOperation, ShardInterval *sourceShard); -extern void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); extern void DropShardList(List *shardIntervalList); #endif /* SHARDSPLIT_H_ */ diff --git a/src/include/distributed/shardsplit_logical_replication.h b/src/include/distributed/shardsplit_logical_replication.h index d5aa6fef8..11b7efab2 100644 --- a/src/include/distributed/shardsplit_logical_replication.h +++ b/src/include/distributed/shardsplit_logical_replication.h @@ -12,6 +12,12 @@ #include "distributed/multi_logical_replication.h" +/* + * Invocation of 'worker_split_shard_replication_setup' UDF returns set of records + * of custom datatype 'replication_slot_info'. This information is parsed and stored in + * the below data structure. The information is used to create a subscriber on target node + * with corresponding slot name. + */ typedef struct ReplicationSlotInfo { uint32 targetNodeId; @@ -19,13 +25,18 @@ typedef struct ReplicationSlotInfo char *slotName; } ReplicationSlotInfo; +/* + * Stores information necesary for creating a subscriber on target node. + * Based on how a shard is split and mapped to target nodes, for each unique combination of + * there is a 'ShardSplitSubscriberMetadata'. + */ typedef struct ShardSplitSubscriberMetadata { Oid tableOwnerId; ReplicationSlotInfo *slotInfo; /* - * Exclusively claimed connection for subscription.The target node of subscription + * Exclusively claimed connection for a subscription.The target node of subscription * is pointed by ReplicationSlotInfo. */ MultiConnection *targetNodeConnection; @@ -47,52 +58,52 @@ typedef struct NodeShardMappingEntry extern uint32 NodeShardMappingHash(const void *key, Size keysize); extern int NodeShardMappingHashCompare(const void *left, const void *right, Size keysize); -HTAB * SetupHashMapForShardInfo(void); +extern HTAB * SetupHashMapForShardInfo(void); +/* Functions for subscriber metadata management */ extern List * ParseReplicationSlotInfoFromResult(PGresult *result); - - +extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, + List *replicationSlotInfoList); extern HTAB * CreateShardSplitInfoMapForPublication( List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *destinationWorkerNodesList); -extern void LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode, - List *shardSplitPubSubMetadataList, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *destinationWorkerNodesList); +/* Functions for creating publications and subscriptions*/ extern void CreateShardSplitPublications(MultiConnection *sourceConnection, HTAB *shardInfoHashMapForPublication); +extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, + List *shardSplitPubSubMetadataList, + WorkerNode *sourceWorkerNode, char *superUser, + char *databaseName); +extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, + char *templateSlotName, + List *shardSplitSubscriberMetadataList); +extern List * CreateTargetNodeConnectionsForShardSplit( + List *shardSplitSubscribersMetadataList, + int + connectionFlags, char *user, + char *databaseName); + +/* Functions to drop publisher-subscriber resources */ extern void DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitMapOfPublications); - -extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, - List *replicationSlotInfoList); - +extern void DropShardSplitPublications(MultiConnection *sourceConnection, + HTAB *shardInfoHashMapForPublication); +extern void DropShardSplitSubsriptions(List *shardSplitSubscribersMetadataList); extern char * DropExistingIfAnyAndCreateTemplateReplicationSlot( ShardInterval *shardIntervalToSplit, MultiConnection * sourceConnection); -extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, - List *shardSplitPubSubMetadataList, - WorkerNode *sourceWorkerNode, char *superUser, - char *databaseName); +/* Wrapper functions which wait for a subscriber to be ready and catchup */ extern void WaitForShardSplitRelationSubscriptionsBecomeReady( List *shardSplitPubSubMetadataList); extern void WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition, List * shardSplitPubSubMetadataList); -List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, - int - connectionFlags, char *user, - char *databaseName); -extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, - List *shardSplitSubscriberMetadataList); +extern char * ShardSplitTemplateReplicationSlotName(uint64 shardId); -/*used for debuggin. Remove later*/ -extern void PrintShardSplitPubSubMetadata( - ShardSplitSubscriberMetadata *shardSplitMetadata); +extern void CloseShardSplitSubscriberConnections(List *shardSplitSubscriberMetadataList); #endif /* SHARDSPLIT_LOGICAL_REPLICATION_H */ diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out new file mode 100644 index 000000000..a112ed537 --- /dev/null +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -0,0 +1,465 @@ +/* +Citus Shard Split Test.The test is model similar to 'shard_move_constraints'. +Here is a high level overview of test plan: + 1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table. + 2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors. + 3. Create Foreign key constraints between the two co-located distributed tables. + 4. Load data into the three tables. + 5. Move one of the shards for 'sensors' to test ShardMove -> Split. + 6. Trigger Split on both shards of 'sensors'. This will also split co-located tables. + 7. Move one of the split shard to test Split -> ShardMove. + 8. Split an already split shard second time on a different schema. +*/ +CREATE SCHEMA "citus_split_test_schema"; +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981000; +SET citus.next_placement_id TO 8610000; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; +-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc. +CREATE TABLE sensors( + measureid integer, + eventdatetime date, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); +CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); +ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; +CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); +CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); +CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors; +SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc. +-- BEGIN: Create co-located distributed and reference tables. +CREATE TABLE reference_table (measureid integer PRIMARY KEY); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CLUSTER colocated_dist_table USING colocated_dist_table_pkey; +SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_with_index_rep_identity(key int NOT NULL); +CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key); +ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx; +CLUSTER table_with_index_rep_identity USING uqx; +SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- END: Create co-located distributed and reference tables. +-- BEGIN : Create Foreign key constraints. +ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid); +-- END : Create Foreign key constraints. +-- BEGIN : Load data into tables. +INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; +SELECT COUNT(*) FROM sensors; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM reference_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM colocated_dist_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +-- END: Load data into tables. +-- BEGIN : Display current state. +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport +--------------------------------------------------------------------- + 8981000 | sensors | -2147483648 | -1 | localhost | 57637 + 8981001 | sensors | 0 | 2147483647 | localhost | 57638 + 8981003 | colocated_dist_table | -2147483648 | -1 | localhost | 57637 + 8981004 | colocated_dist_table | 0 | 2147483647 | localhost | 57638 + 8981005 | table_with_index_rep_identity | -2147483648 | -1 | localhost | 57637 + 8981006 | table_with_index_rep_identity | 0 | 2147483647 | localhost | 57638 +(6 rows) + +\c - - - :worker_1_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + relname | Constraint | Definition +--------------------------------------------------------------------- + sensors_8981000 | fkey_table_to_dist_8981000 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981003(measureid) +(1 row) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + sensors_8981000 | CREATE INDEX hash_index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981000 | CREATE INDEX index_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (lower((measureid)::text)) + sensors_8981000 | CREATE INDEX index_with_include_on_sensors_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981000 | CREATE UNIQUE INDEX sensors_pkey_8981000 ON citus_split_test_schema.sensors_8981000 USING btree (measureid, eventdatetime, measure_data) +(4 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + table_with_index_rep_identity_8981005 | CREATE UNIQUE INDEX uqx_8981005 ON citus_split_test_schema.table_with_index_rep_identity_8981005 USING btree (key) +(1 row) + + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + stxname +--------------------------------------------------------------------- + stats_on_sensors + stats_on_sensors_8981000 +(2 rows) + +\c - - - :worker_2_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + relname | Constraint | Definition +--------------------------------------------------------------------- + sensors_8981001 | fkey_table_to_dist_8981001 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981004(measureid) +(1 row) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + sensors_8981001 | CREATE INDEX hash_index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981001 | CREATE INDEX index_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (lower((measureid)::text)) + sensors_8981001 | CREATE INDEX index_with_include_on_sensors_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981001 | CREATE UNIQUE INDEX sensors_pkey_8981001 ON citus_split_test_schema.sensors_8981001 USING btree (measureid, eventdatetime, measure_data) +(4 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + table_with_index_rep_identity_8981006 | CREATE UNIQUE INDEX uqx_8981006 ON citus_split_test_schema.table_with_index_rep_identity_8981006 USING btree (key) +(1 row) + + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + stxname +--------------------------------------------------------------------- + stats_on_sensors + stats_on_sensors_8981001 +(2 rows) + +-- END : Display current state +-- BEGIN : Move one shard before we split it. +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981007; +SET citus.defer_drop_after_shard_move TO OFF; +SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- END : Move one shard before we split it. +-- BEGIN : Set node id variables +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END : Set node id variables +-- BEGIN : Split two shards : One with move and One without move. +-- Perform 2 way split +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-1073741824'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: replication slot "citus_shard_split_template_slot_8981000" does not exist +CONTEXT: while executing command on localhost:xxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +-- Perform 3 way split +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981001, + ARRAY['536870911', '1610612735'], + ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: replication slot "citus_shard_split_template_slot_8981001" does not exist +CONTEXT: while executing command on localhost:xxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +-- END : Split two shards : One with move and One without move. +-- BEGIN : Move a shard post split. +SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +-- END : Move a shard post split. +-- BEGIN : Display current state. +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport +--------------------------------------------------------------------- + 8981007 | sensors | -2147483648 | -1073741824 | localhost | 57638 + 8981008 | sensors | -1073741823 | -1 | localhost | 57638 + 8981013 | sensors | 0 | 536870911 | localhost | 57637 + 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 + 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 + 8981009 | colocated_dist_table | -2147483648 | -1073741824 | localhost | 57638 + 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 + 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 + 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 + 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 + 8981011 | table_with_index_rep_identity | -2147483648 | -1073741824 | localhost | 57638 + 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 + 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 + 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 + 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 +(15 rows) + +\c - - - :worker_1_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + relname | Constraint | Definition +--------------------------------------------------------------------- + sensors_8981013 | fkey_table_to_dist_8981013 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981016(measureid) + sensors_8981014 | fkey_table_to_dist_8981014 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981017(measureid) +(2 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + sensors_8981013 | CREATE INDEX hash_index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981013 | CREATE INDEX index_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (lower((measureid)::text)) + sensors_8981013 | CREATE INDEX index_with_include_on_sensors_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981013 | CREATE UNIQUE INDEX sensors_pkey_8981013 ON citus_split_test_schema.sensors_8981013 USING btree (measureid, eventdatetime, measure_data) + sensors_8981014 | CREATE INDEX hash_index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981014 | CREATE INDEX index_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (lower((measureid)::text)) + sensors_8981014 | CREATE INDEX index_with_include_on_sensors_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981014 | CREATE UNIQUE INDEX sensors_pkey_8981014 ON citus_split_test_schema.sensors_8981014 USING btree (measureid, eventdatetime, measure_data) +(8 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + table_with_index_rep_identity_8981019 | CREATE UNIQUE INDEX uqx_8981019 ON citus_split_test_schema.table_with_index_rep_identity_8981019 USING btree (key) + table_with_index_rep_identity_8981020 | CREATE UNIQUE INDEX uqx_8981020 ON citus_split_test_schema.table_with_index_rep_identity_8981020 USING btree (key) +(2 rows) + + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + stxname +--------------------------------------------------------------------- + stats_on_sensors + stats_on_sensors_8981013 + stats_on_sensors_8981014 +(3 rows) + +\c - - - :worker_2_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + relname | Constraint | Definition +--------------------------------------------------------------------- + sensors_8981007 | fkey_table_to_dist_8981007 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981009(measureid) + sensors_8981008 | fkey_table_to_dist_8981008 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981010(measureid) + sensors_8981015 | fkey_table_to_dist_8981015 | FOREIGN KEY (measureid) REFERENCES colocated_dist_table_8981018(measureid) +(3 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + sensors_8981007 | CREATE INDEX hash_index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981007 | CREATE INDEX index_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (lower((measureid)::text)) + sensors_8981007 | CREATE INDEX index_with_include_on_sensors_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981007 | CREATE UNIQUE INDEX sensors_pkey_8981007 ON citus_split_test_schema.sensors_8981007 USING btree (measureid, eventdatetime, measure_data) + sensors_8981008 | CREATE INDEX hash_index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981008 | CREATE INDEX index_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (lower((measureid)::text)) + sensors_8981008 | CREATE INDEX index_with_include_on_sensors_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981008 | CREATE UNIQUE INDEX sensors_pkey_8981008 ON citus_split_test_schema.sensors_8981008 USING btree (measureid, eventdatetime, measure_data) + sensors_8981015 | CREATE INDEX hash_index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING hash (((measure_data -> 'IsFailed'::text))) + sensors_8981015 | CREATE INDEX index_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (lower((measureid)::text)) + sensors_8981015 | CREATE INDEX index_with_include_on_sensors_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (((measure_data -> 'IsFailed'::text))) INCLUDE (measure_data, eventdatetime, measure_status) + sensors_8981015 | CREATE UNIQUE INDEX sensors_pkey_8981015 ON citus_split_test_schema.sensors_8981015 USING btree (measureid, eventdatetime, measure_data) +(12 rows) + + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + tablename | indexdef +--------------------------------------------------------------------- + table_with_index_rep_identity_8981011 | CREATE UNIQUE INDEX uqx_8981011 ON citus_split_test_schema.table_with_index_rep_identity_8981011 USING btree (key) + table_with_index_rep_identity_8981012 | CREATE UNIQUE INDEX uqx_8981012 ON citus_split_test_schema.table_with_index_rep_identity_8981012 USING btree (key) + table_with_index_rep_identity_8981021 | CREATE UNIQUE INDEX uqx_8981021 ON citus_split_test_schema.table_with_index_rep_identity_8981021 USING btree (key) +(3 rows) + + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + stxname +--------------------------------------------------------------------- + stats_on_sensors + stats_on_sensors_8981007 + stats_on_sensors_8981008 + stats_on_sensors_8981015 +(4 rows) + +-- END : Display current state +-- BEGIN: Should be able to change/drop constraints +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; +ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; +DROP STATISTICS stats_on_sensors; +DROP INDEX index_on_sensors_renamed; +ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist; +-- END: Should be able to change/drop constraints +-- BEGIN: Split second time on another schema +SET search_path TO public; +SET citus.next_shard_id TO 8981031; +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981007, + ARRAY['-2100000000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); +WARNING: replication slot "citus_shard_split_template_slot_8981007" does not exist +CONTEXT: while executing command on localhost:xxxxx + citus_split_shard_by_split_points +--------------------------------------------------------------------- + +(1 row) + +SET search_path TO "citus_split_test_schema"; +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; + shardid | logicalrelid | shardminvalue | shardmaxvalue | nodename | nodeport +--------------------------------------------------------------------- + 8981031 | sensors | -2147483648 | -2100000000 | localhost | 57637 + 8981032 | sensors | -2099999999 | -1073741824 | localhost | 57638 + 8981008 | sensors | -1073741823 | -1 | localhost | 57638 + 8981013 | sensors | 0 | 536870911 | localhost | 57637 + 8981014 | sensors | 536870912 | 1610612735 | localhost | 57637 + 8981015 | sensors | 1610612736 | 2147483647 | localhost | 57638 + 8981033 | colocated_dist_table | -2147483648 | -2100000000 | localhost | 57637 + 8981034 | colocated_dist_table | -2099999999 | -1073741824 | localhost | 57638 + 8981010 | colocated_dist_table | -1073741823 | -1 | localhost | 57638 + 8981016 | colocated_dist_table | 0 | 536870911 | localhost | 57637 + 8981017 | colocated_dist_table | 536870912 | 1610612735 | localhost | 57637 + 8981018 | colocated_dist_table | 1610612736 | 2147483647 | localhost | 57638 + 8981035 | table_with_index_rep_identity | -2147483648 | -2100000000 | localhost | 57637 + 8981036 | table_with_index_rep_identity | -2099999999 | -1073741824 | localhost | 57638 + 8981012 | table_with_index_rep_identity | -1073741823 | -1 | localhost | 57638 + 8981019 | table_with_index_rep_identity | 0 | 536870911 | localhost | 57637 + 8981020 | table_with_index_rep_identity | 536870912 | 1610612735 | localhost | 57637 + 8981021 | table_with_index_rep_identity | 1610612736 | 2147483647 | localhost | 57638 +(18 rows) + +-- END: Split second time on another schema +-- BEGIN: Validate Data Count +SELECT COUNT(*) FROM sensors; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM reference_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +SELECT COUNT(*) FROM colocated_dist_table; + count +--------------------------------------------------------------------- + 1001 +(1 row) + +-- END: Validate Data Count +--BEGIN : Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_split_test_schema" CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table citus_split_test_schema.sensors +drop cascades to table citus_split_test_schema.reference_table +drop cascades to table citus_split_test_schema.colocated_dist_table +drop cascades to table citus_split_test_schema.table_with_index_rep_identity +--END : Cleanup diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out index fabf31965..93913da41 100644 --- a/src/test/regress/expected/citus_sameer.out +++ b/src/test/regress/expected/citus_sameer.out @@ -22,13 +22,13 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \ SELECT * FROM citus_shards; table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size --------------------------------------------------------------------- - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8888 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 8887 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9995 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9992 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 57637 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9998 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390006 | localhost | 9997 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8888 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8887 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9995 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9992 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 57637 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9998 | 0 + table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9997 | 0 (7 rows) SELECT * FROM pg_dist_shard; @@ -62,7 +62,7 @@ SELECT citus_split_shard_by_split_points( ARRAY['0'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -WARNING: replication slot "citus_split_replicationslot_for_shard_1" does not exist +WARNING: replication slot "citus_shard_split_template_slot_1" does not exist CONTEXT: while executing command on localhost:xxxxx WARNING: connection claimed exclusively at transaction commit WARNING: connection claimed exclusively at transaction commit @@ -88,7 +88,7 @@ SELECT * FROM show_catalog; SELECT * FROM pg_subscription; oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications --------------------------------------------------------------------- - 17311 | 16384 | citus_shard_split_subscription_10 | 17310 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10} + 20669 | 16384 | citus_shard_split_subscription_10 | 20668 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10} (1 row) SELECT slot_name FROM pg_replication_slots; @@ -115,20 +115,20 @@ SELECT * FROM show_catalog; SELECT * FROM pg_publication; oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot --------------------------------------------------------------------- - 17371 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f - 17374 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f + 20728 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f + 20731 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f (2 rows) SELECT * FROM pg_subscription; oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications --------------------------------------------------------------------- - 17378 | 16384 | citus_shard_split_subscription_10 | 17377 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10} + 20735 | 16384 | citus_shard_split_subscription_10 | 20734 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10} (1 row) SELECT slot_name FROM pg_replication_slots; - slot_name + slot_name --------------------------------------------------------------------- - citus_split_replicationslot_for_shard_1 + citus_shard_split_template_slot_1 citus_split_16_10 citus_split_18_10 (3 rows) diff --git a/src/test/regress/expected/split_shard_replication_colocated_setup.out b/src/test/regress/expected/split_shard_replication_colocated_setup.out index aba290e38..a672587b6 100644 --- a/src/test/regress/expected/split_shard_replication_colocated_setup.out +++ b/src/test/regress/expected/split_shard_replication_colocated_setup.out @@ -78,8 +78,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup.out b/src/test/regress/expected/split_shard_replication_setup.out index f06a6dd88..5521253b8 100644 --- a/src/test/regress/expected/split_shard_replication_setup.out +++ b/src/test/regress/expected/split_shard_replication_setup.out @@ -71,7 +71,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ 1 (1 row) -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port SET search_path TO split_shard_replication_setup_schema; diff --git a/src/test/regress/expected/split_shard_replication_setup_local.out b/src/test/regress/expected/split_shard_replication_setup_local.out index 17750b458..66113d603 100644 --- a/src/test/regress/expected/split_shard_replication_setup_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_local.out @@ -19,7 +19,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ 1 (1 row) -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; CREATE SUBSCRIPTION local_subscription diff --git a/src/test/regress/expected/split_shard_replication_setup_remote_local.out b/src/test/regress/expected/split_shard_replication_setup_remote_local.out index 35c42e129..56899ab0e 100644 --- a/src/test/regress/expected/split_shard_replication_setup_remote_local.out +++ b/src/test/regress/expected/split_shard_replication_setup_remote_local.out @@ -18,8 +18,8 @@ WARNING: Previous split shard worflow was not successfully and could not comple 2 (1 row) -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1 CONNECTION 'host=localhost port=xxxxx user=postgres dbname=regression' diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 527ad6a17..91de90db7 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -8,16 +8,17 @@ test: tablespace test: foreign_key_to_reference_table # Split tests go here. #test: citus_sameer -test: split_shard_replication_setup -test: split_shard_replication_setup_remote_local -test: split_shard_replication_setup_local -test: split_shard_replication_colocated_setup -test: worker_split_copy_test -test: worker_split_binary_copy_test -test: worker_split_text_copy_test -test: citus_split_shard_by_split_points_negative -test: citus_split_shard_by_split_points -test: citus_split_shard_by_split_points_failure +#test: split_shard_replication_setup +#test: split_shard_replication_setup_remote_local +#test: split_shard_replication_setup_local +#test: split_shard_replication_colocated_setup +#test: worker_split_copy_test +#test: worker_split_binary_copy_test +#test: worker_split_text_copy_test +#test: citus_split_shard_by_split_points_negative +#test: citus_split_shard_by_split_points +#test: citus_split_shard_by_split_points_failure # Name citus_split_shard_by_split_points_columnar_partitioned was too long and being truncated. # use citus_split_shard_columnar_partitioned instead. -test: citus_split_shard_columnar_partitioned +#test: citus_split_shard_columnar_partitioned +test: citus_non_blocking_split_shards diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql new file mode 100644 index 000000000..5082ab7a5 --- /dev/null +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -0,0 +1,240 @@ +/* +Citus Shard Split Test.The test is model similar to 'shard_move_constraints'. +Here is a high level overview of test plan: + 1. Create a table 'sensors' (ShardCount = 2) to be split. Add indexes and statistics on this table. + 2. Create two other tables: 'reference_table' and 'colocated_dist_table', co-located with sensors. + 3. Create Foreign key constraints between the two co-located distributed tables. + 4. Load data into the three tables. + 5. Move one of the shards for 'sensors' to test ShardMove -> Split. + 6. Trigger Split on both shards of 'sensors'. This will also split co-located tables. + 7. Move one of the split shard to test Split -> ShardMove. + 8. Split an already split shard second time on a different schema. +*/ + +CREATE SCHEMA "citus_split_test_schema"; + +CREATE ROLE test_split_role WITH LOGIN; +GRANT USAGE, CREATE ON SCHEMA "citus_split_test_schema" TO test_split_role; +SET ROLE test_split_role; + +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981000; +SET citus.next_placement_id TO 8610000; +SET citus.shard_count TO 2; +SET citus.shard_replication_factor TO 1; + +-- BEGIN: Create table to split, along with other co-located tables. Add indexes, statistics etc. +CREATE TABLE sensors( + measureid integer, + eventdatetime date, + measure_data jsonb, + meaure_quantity decimal(15, 2), + measure_status char(1), + measure_comment varchar(44), + PRIMARY KEY (measureid, eventdatetime, measure_data)); + +CREATE INDEX index_on_sensors ON sensors(lower(measureid::text)); +ALTER INDEX index_on_sensors ALTER COLUMN 1 SET STATISTICS 1000; +CREATE INDEX hash_index_on_sensors ON sensors USING HASH((measure_data->'IsFailed')); +CREATE INDEX index_with_include_on_sensors ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime, measure_status); +CREATE STATISTICS stats_on_sensors (dependencies) ON measureid, eventdatetime FROM sensors; + +SELECT create_distributed_table('sensors', 'measureid', colocate_with:='none'); +-- END: Create table to split, along with other co-located tables. Add indexes, statistics etc. + +-- BEGIN: Create co-located distributed and reference tables. +CREATE TABLE reference_table (measureid integer PRIMARY KEY); +SELECT create_reference_table('reference_table'); + +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +CLUSTER colocated_dist_table USING colocated_dist_table_pkey; +SELECT create_distributed_table('colocated_dist_table', 'measureid', colocate_with:='sensors'); + +CREATE TABLE table_with_index_rep_identity(key int NOT NULL); +CREATE UNIQUE INDEX uqx ON table_with_index_rep_identity(key); +ALTER TABLE table_with_index_rep_identity REPLICA IDENTITY USING INDEX uqx; +CLUSTER table_with_index_rep_identity USING uqx; +SELECT create_distributed_table('table_with_index_rep_identity', 'key', colocate_with:='sensors'); +-- END: Create co-located distributed and reference tables. + +-- BEGIN : Create Foreign key constraints. +ALTER TABLE sensors ADD CONSTRAINT fkey_table_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid); +-- END : Create Foreign key constraints. + +-- BEGIN : Load data into tables. +INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO sensors SELECT i, '2020-01-05', '{}', 11011.10, 'A', 'I <3 Citus' FROM generate_series(0,1000)i; + +SELECT COUNT(*) FROM sensors; +SELECT COUNT(*) FROM reference_table; +SELECT COUNT(*) FROM colocated_dist_table; +-- END: Load data into tables. + +-- BEGIN : Display current state. +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; + +\c - - - :worker_1_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + +\c - - - :worker_2_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; +-- END : Display current state + +-- BEGIN : Move one shard before we split it. +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +SET citus.next_shard_id TO 8981007; +SET citus.defer_drop_after_shard_move TO OFF; + +SELECT citus_move_shard_placement(8981000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='force_logical'); +-- END : Move one shard before we split it. + +-- BEGIN : Set node id variables +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset +-- END : Set node id variables + +-- BEGIN : Split two shards : One with move and One without move. +-- Perform 2 way split +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981000, + ARRAY['-1073741824'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); + +-- Perform 3 way split +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981001, + ARRAY['536870911', '1610612735'], + ARRAY[:worker_1_node, :worker_1_node, :worker_2_node], + 'force_logical'); +-- END : Split two shards : One with move and One without move. + +-- BEGIN : Move a shard post split. +SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); +-- END : Move a shard post split. + +-- BEGIN : Display current state. +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; + +\c - - - :worker_1_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; + +\c - - - :worker_2_port + SET search_path TO "citus_split_test_schema", public, pg_catalog; + SET citus.show_shards_for_app_name_prefixes = '*'; + SELECT tbl.relname, fk."Constraint", fk."Definition" + FROM pg_catalog.pg_class tbl + JOIN public.table_fkeys fk on tbl.oid = fk.relid + WHERE tbl.relname like 'sensors_%' + ORDER BY 1, 2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'sensors_%' ORDER BY 1,2; + SELECT tablename, indexdef FROM pg_indexes WHERE tablename like 'table_with_index_rep_identity_%' ORDER BY 1,2; + SELECT stxname FROM pg_statistic_ext + WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('citus_split_test_schema') + ) + ORDER BY stxname ASC; +-- END : Display current state + +-- BEGIN: Should be able to change/drop constraints +\c - postgres - :master_port +SET ROLE test_split_role; +SET search_path TO "citus_split_test_schema"; +ALTER INDEX index_on_sensors RENAME TO index_on_sensors_renamed; +ALTER INDEX index_on_sensors_renamed ALTER COLUMN 1 SET STATISTICS 200; +DROP STATISTICS stats_on_sensors; +DROP INDEX index_on_sensors_renamed; +ALTER TABLE sensors DROP CONSTRAINT fkey_table_to_dist; +-- END: Should be able to change/drop constraints + +-- BEGIN: Split second time on another schema +SET search_path TO public; +SET citus.next_shard_id TO 8981031; +SELECT pg_catalog.citus_split_shard_by_split_points( + 8981007, + ARRAY['-2100000000'], + ARRAY[:worker_1_node, :worker_2_node], + 'force_logical'); + +SET search_path TO "citus_split_test_schema"; +SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport + FROM pg_dist_shard AS shard + INNER JOIN pg_dist_placement placement ON shard.shardid = placement.shardid + INNER JOIN pg_dist_node node ON placement.groupid = node.groupid + INNER JOIN pg_catalog.pg_class cls ON shard.logicalrelid = cls.oid + WHERE node.noderole = 'primary' AND (logicalrelid = 'sensors'::regclass OR logicalrelid = 'colocated_dist_table'::regclass OR logicalrelid = 'table_with_index_rep_identity'::regclass) + ORDER BY logicalrelid, shardminvalue::BIGINT; +-- END: Split second time on another schema + +-- BEGIN: Validate Data Count +SELECT COUNT(*) FROM sensors; +SELECT COUNT(*) FROM reference_table; +SELECT COUNT(*) FROM colocated_dist_table; +-- END: Validate Data Count + +--BEGIN : Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_split_test_schema" CASCADE; +--END : Cleanup diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql index acc59a4c8..0030e2d6e 100644 --- a/src/test/regress/sql/citus_sameer.sql +++ b/src/test/regress/sql/citus_sameer.sql @@ -67,5 +67,5 @@ SET search_path TO citus_split_shard_by_split_points_negative; SELECT * FROM show_catalog; SELECT * FROM pg_publication; SELECT * FROM pg_subscription; -SELECT slot_name FROM pg_replication_slots; +SELECT * FROM pg_replication_slots; SELECT * FROM table_to_split_100; diff --git a/src/test/regress/sql/split_shard_replication_colocated_setup.sql b/src/test/regress/sql/split_shard_replication_colocated_setup.sql index 8ef1f3090..3dd627e7c 100644 --- a/src/test/regress/sql/split_shard_replication_colocated_setup.sql +++ b/src/test/regress/sql/split_shard_replication_colocated_setup.sql @@ -76,9 +76,9 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset -SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset +SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_one), 'citus') \gset -SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset +SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_%s', :worker_2_node, :table_owner_two), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' \c - postgres - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup.sql b/src/test/regress/sql/split_shard_replication_setup.sql index 12195f751..a9a84b86d 100644 --- a/src/test/regress/sql/split_shard_replication_setup.sql +++ b/src/test/regress/sql/split_shard_replication_setup.sql @@ -71,7 +71,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); -SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker2 with copy_data to 'false' and derived replication slot name \c - - - :worker_2_port diff --git a/src/test/regress/sql/split_shard_replication_setup_local.sql b/src/test/regress/sql/split_shard_replication_setup_local.sql index f48519f86..4bdcb0887 100644 --- a/src/test/regress/sql/split_shard_replication_setup_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_local.sql @@ -18,7 +18,7 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 3, '0', '2147483647', :worker_1_node)::citus.split_shard_info ]); -SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' a BEGIN; diff --git a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql index ff2ebb9a5..bfb7dfed9 100644 --- a/src/test/regress/sql/split_shard_replication_setup_remote_local.sql +++ b/src/test/regress/sql/split_shard_replication_setup_remote_local.sql @@ -16,8 +16,8 @@ SELECT count(*) FROM worker_split_shard_replication_setup(ARRAY[ ROW(1, 3, '0', '2147483647', :worker_2_node)::citus.split_shard_info ]); -SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_1_node), 'citus') \gset -SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_split_%s_10', :worker_2_node), 'citus') \gset +SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_1_node), 'citus') \gset +SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_%s_10', :worker_2_node), 'citus') \gset -- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1' CREATE SUBSCRIPTION sub_worker1