Adding comments to the workflow

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-24 14:16:37 +05:30
parent 0a6875de5a
commit 9073564900
5 changed files with 49 additions and 339 deletions

View File

@ -1220,7 +1220,7 @@ TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow)
/*
* SplitShard API to split a given shard (or shard group) in blocking fashion
* SplitShard API to split a given shard (or shard group) in non-blocking fashion
* based on specified split points to a set of destination nodes.
* 'splitOperation' : Customer operation that triggered split.
* 'shardIntervalToSplit' : Source shard interval to be split.
@ -1253,6 +1253,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
/* Create hashmap to group shards for publication-subscription management */
HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
@ -1273,115 +1274,143 @@ NonBlockingShardSplit(SplitOperation splitOperation,
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
/* Non-Blocking shard split workflow starts here */
PG_TRY();
{
/* Physically create split children. */
/* 1) Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* 2) Create dummy shards due logical replication constraints.
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information.
*/
CreateDummyShardsForShardGroup(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
/* 3) Create Publications. */
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* Create Template Replication Slot */
/*
* 4) Create template replication Slot. It returns a snapshot. The snapshot remains
* valid till the lifetime of the session that creates it. The connection is closed
* at the end of the workflow.
*/
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* DoSplitCopy */
/* 5) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList,
snapShotName);
/*worker_split_replication_setup_udf*/
/* 6) Execute 'worker_split_shard_replication_setup UDF */
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
/* Subscriber flow starts from here */
/*
* Subscriber flow starts from here.
* Populate 'ShardSplitSubscriberMetadata' for subscription management.
*/
List *shardSplitSubscribersMetadataList =
PopulateShardSplitSubscriptionsMetadataList(
shardSplitHashMapForPublication, replicationSlotInfoList);
/* Create connections to the target nodes. TODO: can be refactored */
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitSubscribersMetadataList,
connectionFlags,
superUser, databaseName);
/* Create copies of template replication slot */
/* 7) Create copies of template replication slot */
char *templateSlotName = ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId);
CreateReplicationSlots(sourceConnection, templateSlotName,
shardSplitSubscribersMetadataList);
/* 8) Create subscriptions on target nodes */
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
sourceShardToCopyNode,
superUser,
databaseName);
/* 9) Wait for subscriptions to be ready */
WaitForShardSplitRelationSubscriptionsBecomeReady(
shardSplitSubscribersMetadataList);
/* 10) Wait for subscribers to catchup till source LSN */
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* 11) Create Auxilary structures */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
/* 12) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* 13) Block writes on source shards */
BlockWritesToShardList(sourceColocatedShardIntervalList);
/* 14) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/* Drop Subscribers */
/* 15) Drop Subscribers */
DropShardSplitSubsriptions(shardSplitSubscribersMetadataList);
/* Drop Publications */
/* 16) Drop Publications */
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 17) TODO(saawasek): Try dropping replication slots explicitly */
/*
* Drop old shards and delete related metadata. Have to do that before
* 18) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards).
*/
DropShardList(sourceColocatedShardIntervalList);
/* Insert new shard and placement metdata */
/* 19) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList);
/*
* Create foreign keys if exists after the metadata changes happening in
* 20) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata.
*/
CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList);
/* 21) Drop dummy shards.
* TODO(saawasek):Refactor and pass hashmap.Currently map is global variable */
DropDummyShards();
/* Close source connection */
/* 22) Close source connection */
CloseConnection(sourceConnection);
/* Close all subscriber connections */
/* 23) Close all subscriber connections */
CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList);
/* Close connection of template replication slot */
/* 24) Close connection of template replication slot */
CloseConnection(templateSlotConnection);
}
PG_CATCH();
@ -1389,6 +1418,9 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
/*TODO(saawasek): Add checks to open new connection if sourceConnection is not valid anymore.*/
DropAllShardSplitLeftOvers(sourceConnection, shardSplitHashMapForPublication);
DropDummyShards();
PG_RE_THROW();

View File

@ -51,10 +51,6 @@ static void PopulateShardSplitInfoInSM(ShardSplitInfoSMHeader *shardSplitInfoSMH
static void ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static void CreatePublishersForSplitChildren(HTAB *shardInfoHashMap);
StringInfo GetSoureAndDestinationShardNames(List *shardSplitInfoList);
char * ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo);
/*
* worker_split_shard_replication_setup UDF creates in-memory data structures
* to store the meta information about the shard undergoing split and new split
@ -422,113 +418,6 @@ ParseShardSplitInfoFromDatum(Datum shardSplitInfoDatum,
}
static void
CreatePublishersForSplitChildren(HTAB *shardInfoHashMap)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
int splitInfoIndex = 0;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32_t nodeId = entry->key.nodeId;
uint32_t tableOwnerId = entry->key.tableOwnerId;
int connectionFlags = FORCE_NEW_CONNECTION;
printf("Sameer getting new connection \n");
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
"localhost",
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
StringInfo shardNamesForPublication = GetSoureAndDestinationShardNames(
entry->shardSplitInfoList);
StringInfo command = makeStringInfo();
appendStringInfo(command, "CREATE PUBLICATION sameerpub_%u_%u FOR TABLE %s",
nodeId, tableOwnerId, shardNamesForPublication->data);
ExecuteCriticalRemoteCommand(sourceConnection, command->data);
printf("Sameer UserName: %s \n", GetUserNameFromId(tableOwnerId, false));
}
}
StringInfo
GetSoureAndDestinationShardNames(List *shardSplitInfoList)
{
HASHCTL info;
int flags = HASH_ELEM | HASH_CONTEXT;
/* initialise the hash table */
memset(&info, 0, sizeof(info));
info.keysize = sizeof(uint64);
info.entrysize = sizeof(uint64);
info.hcxt = CurrentMemoryContext;
HTAB *sourceShardIdSet = hash_create("Source ShardId Set", 128, &info, flags);
/* Get child shard names */
StringInfo allShardNames = makeStringInfo();
bool addComma = false;
ShardSplitInfo *shardSplitInfo = NULL;
foreach_ptr(shardSplitInfo, shardSplitInfoList)
{
/* add source shard id to the hash table to get list of unique source shard ids */
bool found = false;
uint64 sourceShardId = shardSplitInfo->sourceShardId;
hash_search(sourceShardIdSet, &sourceShardId, HASH_ENTER, &found);
if (addComma)
{
appendStringInfo(allShardNames, ",");
}
/* Append fully qualified split child shard name */
char *childShardName = ConstructFullyQualifiedSplitChildShardName(shardSplitInfo);
appendStringInfo(allShardNames, childShardName);
addComma = true;
}
HASH_SEQ_STATUS status;
hash_seq_init(&status, sourceShardIdSet);
uint64 *sourceShardIdEntry = NULL;
while ((sourceShardIdEntry = hash_seq_search(&status)) != NULL)
{
ShardInterval *sourceShardInterval = LoadShardInterval(*sourceShardIdEntry);
char *sourceShardName = ConstructQualifiedShardName(sourceShardInterval);
if (addComma)
{
appendStringInfo(allShardNames, ",");
}
appendStringInfo(allShardNames, sourceShardName);
addComma = true;
}
return allShardNames;
}
char *
ConstructFullyQualifiedSplitChildShardName(ShardSplitInfo *shardSplitInfo)
{
Oid schemaId = get_rel_namespace(shardSplitInfo->distributedTableOid);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(shardSplitInfo->distributedTableOid);
char *shardName = pstrdup(tableName);
AppendShardIdToName(&shardName, shardSplitInfo->splitChildShardId);
shardName = quote_qualified_identifier(schemaName, shardName);
return shardName;
}
static void
ReturnReplicationSlotInfo(HTAB *shardInfoHashMap, Tuplestorestate *tupleStore, TupleDesc
tupleDescriptor)

View File

@ -251,6 +251,7 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList
targetNodeConnectionList = lappend(targetNodeConnectionList, targetConnection);
/* Cache the connections for each subscription */
shardSplitSubscriberMetadata->targetNodeConnection = targetConnection;
}

View File

@ -1,141 +0,0 @@
-- Negative test cases for citus_split_shard_by_split_points UDF.
CREATE SCHEMA citus_split_shard_by_split_points_negative;
SET search_path TO citus_split_shard_by_split_points_negative;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 1;
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
-- Shard1 | -2147483648 | -1073741825
-- Shard2 | -1073741824 | -1
-- Shard3 | 0 | 1073741823
-- Shard4 | 1073741824 | 2147483647
SELECT create_distributed_table('table_to_split','id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
--SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT * FROM citus_shards;
table_name | shardid | shard_name | citus_table_type | colocation_id | nodename | nodeport | shard_size
---------------------------------------------------------------------
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8888 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 8887 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9995 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9992 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 57637 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9998 | 0
table_to_split | 1 | citus_split_shard_by_split_points_negative.table_to_split_1 | distributed | 1390009 | localhost | 9997 | 0
(7 rows)
SELECT * FROM pg_dist_shard;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
---------------------------------------------------------------------
table_to_split | 1 | t | -2147483648 | 2147483647
(1 row)
--SET client_min_messages TO LOG;
--SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','p','')
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
-- UDF fails for range partitioned tables.
\c - - - :master_port
--SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT citus_split_shard_by_split_points(
1,
ARRAY['0'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
WARNING: replication slot "citus_shard_split_template_slot_1" does not exist
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
WARNING: connection claimed exclusively at transaction commit
citus_split_shard_by_split_points
---------------------------------------------------------------------
(1 row)
INSERT INTO table_to_split values(100,'a');
INSERT INTO table_to_split values(400, 'a');
INSERT INTO table_to_split values(500, 'a');
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_101 | postgres
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
20669 | 16384 | citus_shard_split_subscription_10 | 20668 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_18_10 | off | {citus_shard_split_publication_18_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
(0 rows)
SELECT * FROM table_to_split_101;
id | value
---------------------------------------------------------------------
100 | a
500 | a
(2 rows)
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
Schema | Name | Owner
---------------------------------------------------------------------
citus_split_shard_by_split_points_negative | table_to_split | postgres
citus_split_shard_by_split_points_negative | table_to_split_100 | postgres
(2 rows)
SELECT * FROM pg_publication;
oid | pubname | pubowner | puballtables | pubinsert | pubupdate | pubdelete | pubtruncate | pubviaroot
---------------------------------------------------------------------
20728 | citus_shard_split_publication_16_10 | 10 | f | t | t | t | t | f
20731 | citus_shard_split_publication_18_10 | 10 | f | t | t | t | t | f
(2 rows)
SELECT * FROM pg_subscription;
oid | subdbid | subname | subowner | subenabled | subbinary | substream | subconninfo | subslotname | subsynccommit | subpublications
---------------------------------------------------------------------
20735 | 16384 | citus_shard_split_subscription_10 | 20734 | t | f | f | host='localhost' port=xxxxx user='postgres' dbname='regression' connect_timeout=20 sslmode=prefer | citus_split_16_10 | off | {citus_shard_split_publication_16_10}
(1 row)
SELECT slot_name FROM pg_replication_slots;
slot_name
---------------------------------------------------------------------
citus_shard_split_template_slot_1
citus_split_16_10
citus_split_18_10
(3 rows)
SELECT * FROM table_to_split_100;
id | value
---------------------------------------------------------------------
400 | a
(1 row)

View File

@ -1,71 +0,0 @@
-- Negative test cases for citus_split_shard_by_split_points UDF.
CREATE SCHEMA citus_split_shard_by_split_points_negative;
SET search_path TO citus_split_shard_by_split_points_negative;
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 1;
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
-- Shard1 | -2147483648 | -1073741825
-- Shard2 | -1073741824 | -1
-- Shard3 | 0 | 1073741823
-- Shard4 | 1073741824 | 2147483647
SELECT create_distributed_table('table_to_split','id');
--SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_to_split');
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT * FROM citus_shards;
SELECT * FROM pg_dist_shard;
--SET client_min_messages TO LOG;
--SET citus.log_remote_commands TO on;
CREATE OR REPLACE VIEW show_catalog AS SELECT n.nspname as "Schema",
c.relname as "Name",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','p','')
AND n.nspname <> 'pg_catalog'
AND n.nspname !~ '^pg_toast'
AND n.nspname <> 'information_schema'
AND pg_catalog.pg_table_is_visible(c.oid)
ORDER BY 1,2;
-- UDF fails for range partitioned tables.
\c - - - :master_port
--SET citus.log_remote_commands TO on;
SET citus.next_shard_id TO 100;
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset
SELECT citus_split_shard_by_split_points(
1,
ARRAY['0'],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
INSERT INTO table_to_split values(100,'a');
INSERT INTO table_to_split values(400, 'a');
INSERT INTO table_to_split values(500, 'a');
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
SELECT * FROM pg_subscription;
SELECT slot_name FROM pg_replication_slots;
SELECT * FROM table_to_split_101;
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
SELECT * FROM pg_publication;
SELECT * FROM pg_subscription;
SELECT * FROM pg_replication_slots;
SELECT * FROM table_to_split_100;