diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 551af6620..b84a897ef 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -1220,7 +1220,7 @@ TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) /* - * SplitShard API to split a given shard (or shard group) in blocking fashion + * SplitShard API to split a given shard (or shard group) in non-blocking fashion * based on specified split points to a set of destination nodes. * 'splitOperation' : Customer operation that triggered split. * 'shardIntervalToSplit' : Source shard interval to be split. @@ -1253,6 +1253,7 @@ NonBlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); + /* Create hashmap to group shards for publication-subscription management */ HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, @@ -1273,115 +1274,143 @@ NonBlockingShardSplit(SplitOperation splitOperation, HTAB *mapOfShardToPlacementCreatedByWorkflow = CreateEmptyMapForShardsCreatedByWorkflow(); + + /* Non-Blocking shard split workflow starts here */ PG_TRY(); { - /* Physically create split children. */ + /* 1) Physically create split children. */ CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, shardGroupSplitIntervalListList, workersForPlacementList); - + /* + * 2) Create dummy shards due logical replication constraints. + * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth + * information. + */ CreateDummyShardsForShardGroup( sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, sourceShardToCopyNode, workersForPlacementList); + /* 3) Create Publications. */ CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); - /* Create Template Replication Slot */ + /* + * 4) Create template replication Slot. It returns a snapshot. The snapshot remains + * valid till the lifetime of the session that creates it. The connection is closed + * at the end of the workflow. + */ MultiConnection *templateSlotConnection = NULL; char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection); - /* DoSplitCopy */ + + /* 5) Do snapshotted Copy */ DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList, snapShotName); - /*worker_split_replication_setup_udf*/ + /* 6) Execute 'worker_split_shard_replication_setup UDF */ List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, workersForPlacementList); - /* Subscriber flow starts from here */ + /* + * Subscriber flow starts from here. + * Populate 'ShardSplitSubscriberMetadata' for subscription management. + */ List *shardSplitSubscribersMetadataList = PopulateShardSplitSubscriptionsMetadataList( shardSplitHashMapForPublication, replicationSlotInfoList); + /* Create connections to the target nodes. TODO: can be refactored */ List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( shardSplitSubscribersMetadataList, connectionFlags, superUser, databaseName); - /* Create copies of template replication slot */ + /* 7) Create copies of template replication slot */ char *templateSlotName = ShardSplitTemplateReplicationSlotName( shardIntervalToSplit->shardId); CreateReplicationSlots(sourceConnection, templateSlotName, shardSplitSubscribersMetadataList); + /* 8) Create subscriptions on target nodes */ CreateShardSplitSubscriptions(targetNodeConnectionList, shardSplitSubscribersMetadataList, sourceShardToCopyNode, superUser, databaseName); + /* 9) Wait for subscriptions to be ready */ WaitForShardSplitRelationSubscriptionsBecomeReady( shardSplitSubscribersMetadataList); + /* 10) Wait for subscribers to catchup till source LSN */ XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); + /* 11) Create Auxilary structures */ CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, workersForPlacementList); + /* 12) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); + /* 13) Block writes on source shards */ BlockWritesToShardList(sourceColocatedShardIntervalList); + /* 14) Wait for subscribers to catchup till source LSN */ sourcePosition = GetRemoteLogPosition(sourceConnection); WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, shardSplitSubscribersMetadataList); - /* Drop Subscribers */ + /* 15) Drop Subscribers */ DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); - /* Drop Publications */ + /* 16) Drop Publications */ DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); + /* 17) TODO(saawasek): Try dropping replication slots explicitly */ + /* - * Drop old shards and delete related metadata. Have to do that before + * 18) Drop old shards and delete related metadata. Have to do that before * creating the new shard metadata, because there's cross-checks * preventing inconsistent metadata (like overlapping shards). */ DropShardList(sourceColocatedShardIntervalList); - /* Insert new shard and placement metdata */ + /* 19) Insert new shard and placement metdata */ InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, workersForPlacementList); /* - * Create foreign keys if exists after the metadata changes happening in + * 20) Create foreign keys if exists after the metadata changes happening in * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * key creation depends on the new metadata. */ CreateForeignKeyConstraints(shardGroupSplitIntervalListList, workersForPlacementList); + /* 21) Drop dummy shards. + * TODO(saawasek):Refactor and pass hashmap.Currently map is global variable */ DropDummyShards(); - /* Close source connection */ + + /* 22) Close source connection */ CloseConnection(sourceConnection); - /* Close all subscriber connections */ + /* 23) Close all subscriber connections */ CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); - /* Close connection of template replication slot */ + /* 24) Close connection of template replication slot */ CloseConnection(templateSlotConnection); } PG_CATCH(); @@ -1389,6 +1418,9 @@ NonBlockingShardSplit(SplitOperation splitOperation, /* Do a best effort cleanup of shards created on workers in the above block */ TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); + /*TODO(saawasek): Add checks to open new connection if sourceConnection is not valid anymore.*/ + DropAllShardSplitLeftOvers(sourceConnection, shardSplitHashMapForPublication); + DropDummyShards(); PG_RE_THROW(); diff --git a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c index abcfc042a..d73a6fb4c 100644 --- a/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c +++ b/src/backend/distributed/operations/worker_split_shard_replication_setup_udf.c @@ -51,10 +51,6 @@ static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMH static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap); -StringInfo GetSoureAndDestinationShardNames(List *shardSplitInfoList); -char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo); - /* * worker_split_shard_replication_setup UDF creates in-memory data structures * to store the meta information about the shard undergoing split and new split @@ -422,113 +418,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum, } -static void -CreatePublishersForSplitChildren(HTAB *shardInfoHashMap) -{ - HASH_SEQ_STATUS status; - hash_seq_init(&status, shardInfoHashMap); - - NodeShardMappingEntry *entry = NULL; - int splitInfoIndex = 0; - while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL) - { - uint32_t nodeId = entry->key.nodeId; - uint32_t tableOwnerId = entry->key.tableOwnerId; - - int connectionFlags = FORCE_NEW_CONNECTION; - printf("Sameer getting new connection \n"); - MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, - "localhost", - PostPortNumber, - CitusExtensionOwnerName(), - get_database_name( - MyDatabaseId)); - StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames( - entry->shardSplitInfoList); - - StringInfo command = makeStringInfo(); - appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s", - nodeId, tableOwnerId, shardNamesForPublication->data); - ExecuteCriticalRemoteCommand(sourceConnection, command->data); - printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false)); - } -} - - -StringInfo -GetSoureAndDestinationShardNames(List *shardSplitInfoList) -{ - HASHCTL info; - int flags = HASH_ELEM | HASH_CONTEXT; - - /* initialise the hash table */ - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(uint64); - info.entrysize = sizeof(uint64); - info.hcxt = CurrentMemoryContext; - - HTAB *sourceShardIdSet = hash_create("Source ShardId Set", 128, &info, flags); - - /* Get child shard names */ - StringInfo allShardNames = makeStringInfo(); - bool addComma = false; - - ShardSplitInfo *shardSplitInfo = NULL; - foreach_ptr(shardSplitInfo, shardSplitInfoList) - { - /* add source shard id to the hash table to get list of unique source shard ids */ - bool found = false; - uint64 sourceShardId = shardSplitInfo->sourceShardId; - hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found); - - if (addComma) - { - appendStringInfo(allShardNames, ","); - } - - /* Append fully qualified split child shard name */ - char *childShardName = ConstructFullyQualifiedSplitChildShardName(shardSplitInfo); - appendStringInfo(allShardNames, childShardName); - addComma = true; - } - - - HASH_SEQ_STATUS status; - hash_seq_init(&status, sourceShardIdSet); - uint64 *sourceShardIdEntry = NULL; - while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL) - { - ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry); - char *sourceShardName = ConstructQualifiedShardName(sourceShardInterval); - - if (addComma) - { - appendStringInfo(allShardNames, ","); - } - - appendStringInfo(allShardNames, sourceShardName); - addComma = true; - } - - return allShardNames; -} - - -char * -ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo) -{ - Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid); - char *schemaName = get_namespace_name(schemaId); - char *tableName = get_rel_name(shardSplitInfo->distributedTableOid); - - char *shardName = pstrdup(tableName); - AppendShardIdToName(&shardName, shardSplitInfo->splitChildShardId); - shardName = quote_qualified_identifier(schemaName, shardName); - - return shardName; -} - - static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) diff --git a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c index 5b75f4c0a..1353a18d6 100644 --- a/src/backend/distributed/shardsplit/shardsplit_logical_replication.c +++ b/src/backend/distributed/shardsplit/shardsplit_logical_replication.c @@ -251,6 +251,7 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection); + /* Cache the connections for each subscription */ shardSplitSubscriberMetadata->targetNodeConnection = targetConnection; } diff --git a/src/test/regress/expected/citus_sameer.out b/src/test/regress/expected/citus_sameer.out deleted file mode 100644 index 93913da41..000000000 --- a/src/test/regress/expected/citus_sameer.out +++ /dev/null @@ -1,141 +0,0 @@ --- Negative test cases for citus_split_shard_by_split_points UDF. -CREATE SCHEMA citus_split_shard_by_split_points_negative; -SET search_path TO citus_split_shard_by_split_points_negative; -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 1; -SET citus.next_shard_id TO 1; -CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); --- Shard1 | -2147483648 | -1073741825 --- Shard2 | -1073741824 | -1 --- Shard3 | 0 | 1073741823 --- Shard4 | 1073741824 | 2147483647 -SELECT create_distributed_table('table_to_split','id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - ---SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -SELECT * FROM citus_shards; - table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size ---------------------------------------------------------------------- - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8888 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8887 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9995 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9992 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 57637 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9998 | 0 - table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9997 | 0 -(7 rows) - -SELECT * FROM pg_dist_shard; - logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue ---------------------------------------------------------------------- - table_to_split | 1 | t | -2147483648 | 2147483647 -(1 row) - ---SET client_min_messages TO LOG; ---SET citus.log_remote_commands TO on; -CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", - c.relname as "Name", - pg_catalog.pg_get_userbyid(c.relowner) as "Owner" -FROM pg_catalog.pg_class c - LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace -WHERE c.relkind IN ('r','p','') - AND n.nspname <> 'pg_catalog' - AND n.nspname !~ '^pg_toast' - AND n.nspname <> 'information_schema' - AND pg_catalog.pg_table_is_visible(c.oid) -ORDER BY 1,2; --- UDF fails for range partitioned tables. -\c - - - :master_port ---SET citus.log_remote_commands TO on; -SET citus.next_shard_id TO 100; -SET search_path TO citus_split_shard_by_split_points_negative; -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset -SELECT citus_split_shard_by_split_points( - 1, - ARRAY['0'], - ARRAY[:worker_1_node, :worker_2_node], - 'force_logical'); -WARNING: replication slot "citus_shard_split_template_slot_1" does not exist -CONTEXT: while executing command on localhost:xxxxx -WARNING: connection claimed exclusively at transaction commit -WARNING: connection claimed exclusively at transaction commit -WARNING: connection claimed exclusively at transaction commit -WARNING: connection claimed exclusively at transaction commit - citus_split_shard_by_split_points ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO table_to_split values(100,'a'); -INSERT INTO table_to_split values(400, 'a'); -INSERT INTO table_to_split values(500, 'a'); -\c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points_negative; -SELECT * FROM show_catalog; - Schema | Name | Owner ---------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_101 | postgres -(2 rows) - -SELECT * FROM pg_subscription; - oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications ---------------------------------------------------------------------- - 20669 | 16384 | citus_shard_split_subscription_10 | 20668 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10} -(1 row) - -SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- -(0 rows) - -SELECT * FROM table_to_split_101; - id | value ---------------------------------------------------------------------- - 100 | a - 500 | a -(2 rows) - -\c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points_negative; -SELECT * FROM show_catalog; - Schema | Name | Owner ---------------------------------------------------------------------- - citus_split_shard_by_split_points_negative | table_to_split | postgres - citus_split_shard_by_split_points_negative | table_to_split_100 | postgres -(2 rows) - -SELECT * FROM pg_publication; - oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot ---------------------------------------------------------------------- - 20728 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f - 20731 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f -(2 rows) - -SELECT * FROM pg_subscription; - oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications ---------------------------------------------------------------------- - 20735 | 16384 | citus_shard_split_subscription_10 | 20734 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10} -(1 row) - -SELECT slot_name FROM pg_replication_slots; - slot_name ---------------------------------------------------------------------- - citus_shard_split_template_slot_1 - citus_split_16_10 - citus_split_18_10 -(3 rows) - -SELECT * FROM table_to_split_100; - id | value ---------------------------------------------------------------------- - 400 | a -(1 row) - diff --git a/src/test/regress/sql/citus_sameer.sql b/src/test/regress/sql/citus_sameer.sql deleted file mode 100644 index 0030e2d6e..000000000 --- a/src/test/regress/sql/citus_sameer.sql +++ /dev/null @@ -1,71 +0,0 @@ --- Negative test cases for citus_split_shard_by_split_points UDF. - -CREATE SCHEMA citus_split_shard_by_split_points_negative; -SET search_path TO citus_split_shard_by_split_points_negative; -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 1; -SET citus.next_shard_id TO 1; - -CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char); -CREATE TABLE table_second (id bigserial PRIMARY KEY, value char); --- Shard1 | -2147483648 | -1073741825 --- Shard2 | -1073741824 | -1 --- Shard3 | 0 | 1073741823 --- Shard4 | 1073741824 | 2147483647 -SELECT create_distributed_table('table_to_split','id'); ---SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split'); - -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -SELECT * FROM citus_shards; -SELECT * FROM pg_dist_shard; - ---SET client_min_messages TO LOG; ---SET citus.log_remote_commands TO on; - -CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema", - c.relname as "Name", - pg_catalog.pg_get_userbyid(c.relowner) as "Owner" -FROM pg_catalog.pg_class c - LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace -WHERE c.relkind IN ('r','p','') - AND n.nspname <> 'pg_catalog' - AND n.nspname !~ '^pg_toast' - AND n.nspname <> 'information_schema' - AND pg_catalog.pg_table_is_visible(c.oid) -ORDER BY 1,2; - --- UDF fails for range partitioned tables. -\c - - - :master_port ---SET citus.log_remote_commands TO on; -SET citus.next_shard_id TO 100; -SET search_path TO citus_split_shard_by_split_points_negative; - -SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset -SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset - -SELECT citus_split_shard_by_split_points( - 1, - ARRAY['0'], - ARRAY[:worker_1_node, :worker_2_node], - 'force_logical'); - -INSERT INTO table_to_split values(100,'a'); -INSERT INTO table_to_split values(400, 'a'); -INSERT INTO table_to_split values(500, 'a'); - -\c - - - :worker_2_port -SET search_path TO citus_split_shard_by_split_points_negative; -SELECT * FROM show_catalog; -SELECT * FROM pg_subscription; -SELECT slot_name FROM pg_replication_slots; -SELECT * FROM table_to_split_101; - -\c - - - :worker_1_port -SET search_path TO citus_split_shard_by_split_points_negative; -SELECT * FROM show_catalog; -SELECT * FROM pg_publication; -SELECT * FROM pg_subscription; -SELECT * FROM pg_replication_slots; -SELECT * FROM table_to_split_100;