Flow completed

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-18 13:51:58 +05:30
parent b26bab32de
commit 10ea295d6c
6 changed files with 152 additions and 61 deletions

View File

@ -1120,6 +1120,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
List *shardSplitPointsList,
List *workersForPlacementList)
{
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
List *sourceColocatedShardIntervalList = ColocatedShardIntervalList(
shardIntervalToSplit);
@ -1136,6 +1140,24 @@ NonBlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
HTAB *shardSplitHashMapForPublication = CreateShardSplitInfoMapForPublication(
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
DropAllShardSplitLeftOvers(sourceShardToCopyNode, shardSplitHashMapForPublication);
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceShardToCopyNode
->
workerName,
sourceShardToCopyNode
->
workerPort,
superUser,
databaseName);
ClaimConnectionExclusively(sourceConnection);
PG_TRY();
{
/*
@ -1149,11 +1171,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
shardGroupSplitIntervalListList,
workersForPlacementList);
/* char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( */
/* shardIntervalToSplit, sourceShardToCopyNode); */
/* DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, */
/* shardGroupSplitIntervalListList, workersForPlacementList, NULL); */
CreateDummyShardsForShardGroup(
sourceColocatedShardIntervalList,
@ -1161,12 +1178,58 @@ NonBlockingShardSplit(SplitOperation splitOperation,
sourceShardToCopyNode,
workersForPlacementList);
/*TODO: Refactor this method. BlockWrites is within this as of now, take it out */
SplitShardReplicationSetup(shardIntervalToSplit,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
sourceShardToCopyNode,
workersForPlacementList);
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/*Create Template Replication Slot */
/* DoSplitCopy */
/*worker_split_replication_setup_udf*/
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
workersForPlacementList);
/* Subscriber flow starts from here */
List *shardSplitSubscribersMetadataList =
PopulateShardSplitSubscriptionsMetadataList(
shardSplitHashMapForPublication, replicationSlotInfoList);
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitSubscribersMetadataList,
connectionFlags,
superUser, databaseName);
/* Create copies of template replication slot */
CreateReplicationSlots(sourceConnection, shardSplitSubscribersMetadataList);
CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList,
sourceShardToCopyNode,
superUser,
databaseName);
WaitForShardSplitRelationSubscriptionsBecomeReady(
shardSplitSubscribersMetadataList);
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
BlockWritesToShardList(sourceColocatedShardIntervalList);
sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
/*
* Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks
@ -1355,22 +1418,6 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit,
WorkerNode *sourceWorkerNode,
List *destinationWorkerNodesList)
{
/* / * // / *Create Template replication slot * / * / */
/* / * char *templateSnapShotName = CreateTemplateReplicationSlotAndReturnSnapshot( * / */
/* / * shardIntervalToSplit, sourceWorkerNode); * / */
/* List *shardSplitPubSubMetadata = CreateShardSplitInfoMapForPublication( */
/* sourceColocatedShardIntervalList, */
/* shardGroupSplitIntervalListList, */
/* destinationWorkerNodesList, */
/* replicationSlotInfoList); */
/* earlier the above method used to take replication slot info as information */
/* LogicallyReplicateSplitShards(sourceWorkerNode, shardSplitPubSubMetadata, */
/* sourceColocatedShardIntervalList, */
/* shardGroupSplitIntervalListList, */
/* destinationWorkerNodesList); */
char *superUser = CitusExtensionOwnerName();
char *databaseName = get_database_name(MyDatabaseId);
int connectionFlags = FORCE_NEW_CONNECTION;
@ -1424,7 +1471,7 @@ SplitShardReplicationSetup(ShardInterval *shardIntervalToSplit,
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList);
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
destinationWorkerNodesList);

View File

@ -2080,7 +2080,7 @@ GetSubscriptionPosition(MultiConnection *connection, Bitmapset *tableOwnerIds,
void
CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName, char *databaseName,
char *publicationName,
char *publicationName, char *slotName,
Oid ownerId)
{
StringInfo createSubscriptionCommand = makeStringInfo();
@ -2107,11 +2107,12 @@ CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
appendStringInfo(createSubscriptionCommand,
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
"WITH (citus_use_authinfo=true, enabled=false)",
"WITH (citus_use_authinfo=true, enabled=false, create_slot=false, copy_data=false, slot_name='%s')",
quote_identifier(ShardSubscriptionName(ownerId,
SHARD_SPLIT_SUBSCRIPTION_PREFIX)),
quote_literal_cstr(conninfo->data),
quote_identifier(publicationName));
quote_identifier(publicationName),
slotName);
ExecuteCriticalRemoteCommand(connection, createSubscriptionCommand->data);
pfree(createSubscriptionCommand->data);

View File

@ -32,6 +32,7 @@ ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwner
nodeId,
List *
replicationSlotInfoList);
static void CreateShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
@ -149,30 +150,6 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
}
ShardSplitSubscriberMetadata *
CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
List *replicationSlotInfoList)
{
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0(
sizeof(ShardSplitSubscriberMetadata));
shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId;
char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
ReplicationSlotInfo *replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
if (nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{
shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo;
break;
}
}
return shardSplitSubscriberMetadata;
}
void
LogicallyReplicateSplitShards(WorkerNode *sourceWorkerNode,
List *shardSplitPubSubMetadataList,
@ -281,6 +258,7 @@ CreateShardSplitSubscriptions(List *targetNodeConnectionList,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId),
shardSplitPubSubMetadata->slotInfo->slotName,
ownerId);
}
}
@ -576,9 +554,67 @@ PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
shardSplitSubscriptionMetadataList = lappend(shardSplitSubscriptionMetadataList,
shardSplitSubscriberMetadata);
/*replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo); */
}
return shardSplitSubscriptionMetadataList;
}
ShardSplitSubscriberMetadata *
CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
List *replicationSlotInfoList)
{
ShardSplitSubscriberMetadata *shardSplitSubscriberMetadata = palloc0(
sizeof(ShardSplitSubscriberMetadata));
shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId;
char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
ReplicationSlotInfo *replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
{
if (nodeId == replicationSlotInfo->targetNodeId &&
strcmp(tableOwnerName, replicationSlotInfo->tableOwnerName) == 0)
{
shardSplitSubscriberMetadata->slotInfo = replicationSlotInfo;
break;
}
}
PrintShardSplitPubSubMetadata(shardSplitSubscriberMetadata);
return shardSplitSubscriberMetadata;
}
/*TODO(saawasek): Remove existing slots before creating newer ones */
/* extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, List * shardSplitSubscriberMetadataList); */
void
CreateReplicationSlots(MultiConnection *sourceNodeConnection,
List *shardSplitSubscriberMetadataList)
{
ShardSplitSubscriberMetadata *subscriberMetadata = NULL;
foreach_ptr(subscriberMetadata, shardSplitSubscriberMetadataList)
{
char *slotName = subscriberMetadata->slotInfo->slotName;
StringInfo createReplicationSlotCommand = makeStringInfo();
/* TODO(niupre): Replace pgoutput with an appropriate name (to e introduced in by saawasek's PR) */
appendStringInfo(createReplicationSlotCommand,
"SELECT * FROM pg_create_logical_replication_slot('%s','citus', false)",
slotName);
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceNodeConnection,
createReplicationSlotCommand->data,
&result);
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ReportResultError(sourceNodeConnection, result, ERROR);
}
PQclear(result);
ForgetResults(sourceNodeConnection);
}
}

View File

@ -38,7 +38,7 @@ extern void DropShardUser(MultiConnection *connection, char *username);
extern void CreateShardSubscription(MultiConnection *connection, char *sourceNodeName,
int sourceNodePort, char *userName,
char *databaseName,
char *publicationName,
char *publicationName, char *slotName,
Oid ownerId);
extern void WaitForRelationSubscriptionsBecomeReady(MultiConnection *targetConnection,

View File

@ -89,6 +89,8 @@ List * CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetad
int
connectionFlags, char *user,
char *databaseName);
extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,
List *shardSplitSubscriberMetadataList);
/*used for debuggin. Remove later*/
extern void PrintShardSplitPubSubMetadata(

View File

@ -48,15 +48,19 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT citus_split_shard_by_split_points(
1,
ARRAY['0'],
ARRAY[:worker_2_node, :worker_2_node],
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
-- On worker2, we want child shard 2 and dummy shard 1 --
-- on worker1, we want child shard 3 and 1 and dummy shard 2 --
INSERT INTO table_to_split values(100,'a');
INSERT INTO table_to_split values(400, 'a');
INSERT INTO table_to_split values(500, 'a');
\c - - - :worker_2_port
SET search_path TO citus_split_shard_by_split_points_negative;
SELECT * FROM show_catalog;
SELECT * FROM pg_subscription;
SELECT slot_name FROM pg_replication_slots;
SELECT * FROM table_to_split_101;
\c - - - :worker_1_port
SET search_path TO citus_split_shard_by_split_points_negative;
@ -64,3 +68,4 @@ SELECT * FROM show_catalog;
SELECT * FROM pg_publication;
SELECT * FROM pg_subscription;
SELECT slot_name FROM pg_replication_slots;
SELECT * FROM table_to_split_100;