Add comments for functions

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-23 15:47:14 +05:30
parent 8c871bcd10
commit 5a5505db55
2 changed files with 285 additions and 141 deletions

View File

@ -1713,6 +1713,25 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
}
/*
* CreateSplitShardReplicationSetupUDF creates and returns
* parameterized 'worker_split_shard_replication_setup' UDF command.
*
* 'sourceShardSplitIntervalList' : Source shard interval to split.
* 'shardGroupSplitIntervalListList' : List of shard intervals for split children..
* 'destinationWorkerNodesList' : List of workers for split children placement.
*
* For example consider below input values:
* sourceColocatedShardIntervalList : [sourceShardInterval]
* shardGroupSplitIntervalListList : [<childFirstShardInterval, childSecondShardInterval>]
* destinationWorkerNodesList : [worker1, worker2]
*
* SELECT * FROM worker_split_shard_replication_setup(
* Array[
* ROW(sourceShardId, childFirstShardId, childFirstMinRange, childFirstMaxRange, worker1)::citus.split_shard_info,
* ROW(sourceShardId, childSecondShardId, childSecondMinRange, childSecondMaxRange, worker2)::citus.split_shard_info
* ]);
*/
StringInfo
CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,

View File

@ -44,6 +44,28 @@ static void DropAllShardSplitUsers(MultiConnection *cleanupConnection);
static void DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection);
/*
* CreateShardSplitInfoMapForPublication creates a hashmap that groups
* shards for creating publications and subscriptions.
*
* While creating publications and subscriptions, apart from table owners,
* placement of child shard matters too. To further understand this, please see
* the following example:
*
* Shard1(on Worker1) is to be split in Shard2 and Shard3 on Worker2 and Worker3 respectively.
* Lets assume the owner to be 'A'. The hashmap groups shard list in the following way.
*
* Map key
* ======= ------ ------
* <Worker2, 'A'> ------> |Shard2|-->|Shard1|
* ------ ------
*
* ------ ------
* <Worker3, 'A'> ------> |Shard3|-->|Shard1|
* ------ ------
* Shard1 is a dummy table that is to be created on Worker2 and Worker3.
* Based on the above placement, we would need to create two publications on the source node.
*/
HTAB *
CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
@ -78,6 +100,10 @@ CreateShardSplitInfoMapForPublication(List *sourceColocatedShardIntervalList,
}
/*
* AddPublishableShardEntryInMap adds a shard interval in the list
* of shards to be published.
*/
static void
AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval, bool
isChildShardInterval)
@ -91,19 +117,25 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
(NodeShardMappingEntry *) hash_search(ShardInfoHashMapForPublications, &key,
HASH_ENTER,
&found);
/* Create a new list for <nodeId, owner> pair */
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NIL;
}
/* Add child shard interval */
if (isChildShardInterval)
{
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList,
(ShardInterval *) shardInterval);
/* We return from here as the child interval is only added once in the list */
return;
}
/* Check if parent is already added */
ShardInterval *existingShardInterval = NULL;
foreach_ptr(existingShardInterval, nodeMappingEntry->shardSplitInfoList)
{
@ -120,32 +152,44 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
}
/*
* CreateShardSplitPublications creates publications on the source node.
*
* sourceConnection - Connection of source node.
*
* shardInfoHashMapForPublication - ShardIntervals are grouped by <owner, nodeId> key.
* A publication is created for list of
* ShardIntervals mapped by key.
*/
void
CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList,
WorkerNode *sourceWorkerNode,
char *superUser,
char *databaseName)
CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
MultiConnection *targetConnection = NULL;
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSubscription(targetConnection,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId),
shardSplitPubSubMetadata->slotInfo->slotName,
ownerId);
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
List *shardListForPublication = entry->shardSplitInfoList;
/* Create publication on shard list */
CreateShardSplitPublicationForNode(sourceConnection,
shardListForPublication,
nodeId,
tableOwnerId);
}
}
/*
* CreateShardSplitPublicationForNode creates a publication on source node
* for given shard list.
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
* related to split operations.
*/
static void
CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId)
@ -176,6 +220,9 @@ CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
}
/*
* ShardSplitPublicationName returns publication name for Shard Split operations.
*/
static char *
ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
{
@ -183,41 +230,6 @@ ShardSplitPublicationName(uint32_t nodeId, Oid ownerId)
}
void
WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList)
{
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds,
shardSplitPubSubMetadata->tableOwnerId);
WaitForRelationSubscriptionsBecomeReady(
shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
}
void
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *shardSplitPubSubMetadataList)
{
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds,
shardSplitPubSubMetadata->tableOwnerId);
WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection,
sourcePosition,
tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
}
List *
CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList, int
connectionFlags, char *user, char *databaseName)
@ -246,81 +258,16 @@ CreateTargetNodeConnectionsForShardSplit(List *shardSplitSubscribersMetadataList
}
char *
DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *sourceConnection)
{
/*
* To ensure SPLIT is idempotent drop any existing slot from
* previous failed operation.
*/
StringInfo dropReplicationSlotCommand = makeStringInfo();
appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
/* The Drop command can fail so ignore the response / result and proceed anyways */
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceConnection,
dropReplicationSlotCommand->data,
&result);
PQclear(result);
ForgetResults(sourceConnection);
/*
* PG 13 Function: pg_create_logical_replication_slot ( slot_name name, plugin name [, temporary boolean ] )
* PG 14 Function: pg_create_logical_replication_slot (slot_name name, plugin name [, temporary boolean, two_phase boolean ] )
* Return: record ( slot_name name, lsn pg_lsn )
* Note: Temporary slot are only live during the session's lifetime causing them to be dropped when the session ends.
* In our invocation 'two_phase' support is disabled.
*/
StringInfo createReplicationSlotCommand = makeStringInfo();
/* TODO(niupre): Replace pgoutput with an appropriate name (to be introduced in by saawasek's PR) */
/*TODO(saawasek): Try creating TEMPORAL once basic flow is ready and we have a testcase*/
appendStringInfo(createReplicationSlotCommand,
"CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
response = ExecuteOptionalRemoteCommand(sourceConnection,
createReplicationSlotCommand->data, &result);
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ReportResultError(sourceConnection, result, ERROR);
}
/*'snapshot_name' is second column where index starts from zero.
* We're using the pstrdup to copy the data into the current memory context */
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
return snapShotName;
}
void
CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
List *shardListForPublication = entry->shardSplitInfoList;
CreateShardSplitPublicationForNode(sourceConnection,
shardListForPublication,
nodeId,
tableOwnerId);
}
}
/*
* PopulateShardSplitSubscriptionsMetadataList returns a list of 'ShardSplitSubscriberMetadata'
* structure.
*
* shardSplitInfoHashMap - Shards are grouped by <owner, node id> key.
* For each key, we create a metadata structure. This facilitates easy
* publication-subscription management.
*
* replicationSlotInfoList - List of replication slot info.
*/
List *
PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
List *replicationSlotInfoList)
@ -346,6 +293,11 @@ PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
}
/*
* Creates a 'ShardSplitSubscriberMetadata' structure for given table owner, node id.
* It scans the list of 'ReplicationSlotInfo' to identify the corresponding slot
* to be used for given tableOwnerId and nodeId.
*/
ShardSplitSubscriberMetadata *
CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
List *replicationSlotInfoList)
@ -354,6 +306,11 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
sizeof(ShardSplitSubscriberMetadata));
shardSplitSubscriberMetadata->tableOwnerId = tableOwnerId;
/*
* Each 'ReplicationSlotInfo' belongs to a unique combination of node id and owner.
* Traverse the slot list to identify the corresponding slot for given
* table owner and node.
*/
char *tableOwnerName = GetUserNameFromId(tableOwnerId, false);
ReplicationSlotInfo *replicationSlotInfo = NULL;
foreach_ptr(replicationSlotInfo, replicationSlotInfoList)
@ -370,7 +327,166 @@ CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 nodeId,
}
/*TODO(saawasek): Remove existing slots before creating newer ones */
/*
* CreateShardSplitSubscriptions creates subscriptions for Shard Split operation.
* We follow Shard Split naming scheme for Publication-Subscription management.
*
* targetNodeConnectionList - List of connections to target nodes on which
* subscriptions have to be created.
*
* shardSplitSubscriberMetadataList - List of subscriber metadata.
*
* sourceWorkerNode - Source node.
*/
void
CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitSubscriberMetadataList,
WorkerNode *sourceWorkerNode,
char *superUser,
char *databaseName)
{
MultiConnection *targetConnection = NULL;
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
forboth_ptr(targetConnection, targetNodeConnectionList,
shardSplitPubSubMetadata, shardSplitSubscriberMetadataList)
{
uint32 publicationForNodeId = shardSplitPubSubMetadata->slotInfo->targetNodeId;
Oid ownerId = shardSplitPubSubMetadata->tableOwnerId;
CreateShardSubscription(targetConnection,
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort,
superUser,
databaseName,
ShardSplitPublicationName(publicationForNodeId, ownerId),
shardSplitPubSubMetadata->slotInfo->slotName,
ownerId);
}
}
/*
* WaitForShardSplitRelationSubscriptionsBecomeReady waits for a list of subscriptions
* to be come ready. This method invokes 'WaitForRelationSubscriptionsBecomeReady' for each
* subscription.
*/
void
WaitForShardSplitRelationSubscriptionsBecomeReady(List *shardSplitPubSubMetadataList)
{
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds,
shardSplitPubSubMetadata->tableOwnerId);
WaitForRelationSubscriptionsBecomeReady(
shardSplitPubSubMetadata->targetNodeConnection, tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
}
/*
* WaitForShardSplitRelationSubscriptionsToBeCaughtUp waits until subscriptions are caught up till
* the source LSN. This method invokes 'WaitForShardSubscriptionToCatchUp' for each subscription.
*/
void
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(XLogRecPtr sourcePosition,
List *shardSplitPubSubMetadataList)
{
ShardSplitSubscriberMetadata *shardSplitPubSubMetadata = NULL;
foreach_ptr(shardSplitPubSubMetadata, shardSplitPubSubMetadataList)
{
Bitmapset *tableOwnerIds = NULL;
tableOwnerIds = bms_add_member(tableOwnerIds,
shardSplitPubSubMetadata->tableOwnerId);
WaitForShardSubscriptionToCatchUp(shardSplitPubSubMetadata->targetNodeConnection,
sourcePosition,
tableOwnerIds,
SHARD_SPLIT_SUBSCRIPTION_PREFIX);
}
}
char *
DropExistingIfAnyAndCreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection *sourceConnection)
{
/*
* To ensure SPLIT is idempotent drop any existing slot from
* previous failed operation.
*/
StringInfo dropReplicationSlotCommand = makeStringInfo();
appendStringInfo(dropReplicationSlotCommand, "SELECT pg_drop_replication_slot('%s')",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
/* The Drop command can fail so ignore the response / result and proceed anyways */
PGresult *result = NULL;
int response = ExecuteOptionalRemoteCommand(sourceConnection,
dropReplicationSlotCommand->data,
&result);
PQclear(result);
ForgetResults(sourceConnection);
/*
* Note: Temporary slot are only live during the session's lifetime causing them to be dropped when the session ends.
*/
StringInfo createReplicationSlotCommand = makeStringInfo();
appendStringInfo(createReplicationSlotCommand,
"CREATE_REPLICATION_SLOT %s LOGICAL citus EXPORT_SNAPSHOT;",
ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId));
response = ExecuteOptionalRemoteCommand(sourceConnection,
createReplicationSlotCommand->data, &result);
if (response != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ReportResultError(sourceConnection, result, ERROR);
}
/*'snapshot_name' is second column where index starts from zero.
* We're using the pstrdup to copy the data into the current memory context */
char *snapShotName = pstrdup(PQgetvalue(result, 0, 2 /* columIndex */));
return snapShotName;
}
/*
* ShardSplitTemplateReplicationSlotName returns name of template replication slot
* following the shard split naming scheme.
*/
char *
ShardSplitTemplateReplicationSlotName(uint64 shardId)
{
return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId);
}
/*
* CreateReplicationSlots creates copies of template replication slot
* on the source node.
*
* sourceNodeConnection - Source node connection.
*
* templateSlotName - Template replication slot name whose copies have to be created.
* This slot holds a LSN from which the logical replication
* begins.
*
* shardSplitSubscriberMetadataList - List of 'ShardSplitSubscriberMetadata. '
*
* 'ShardSplitSubscriberMetadata' contains replication slot name that is used
* to create copies of template replication slot on source node. These slot names are returned by
* 'worker_split_shard_replication_setup' UDF and each slot is responsible for a specific
* split range. We try multiple attemtps to clean up these replicaton slot copies in the
* below order to be on safer side.
* 1. Clean up before starting shard split workflow.
* 2. Implicitly dropping slots while dropping subscriptions.
* 3. Explicitly dropping slots which would have skipped over from 2.
*/
void
CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlotName,
List *shardSplitSubscriberMetadataList)
@ -401,16 +517,6 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot
}
/*
* ShardSplitTemplateReplicationSlotName returns name of template replication slot.
*/
char *
ShardSplitTemplateReplicationSlotName(uint64 shardId)
{
return psprintf("%s%lu", SHARD_SPLIT_TEMPLATE_REPLICATION_SLOT_PREFIX, shardId);
}
/*
* ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'.
* 'replication_slot_info' is a tuple with below format:
@ -449,7 +555,7 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
/*
* DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles
* and replication slots on all nodes. These might have been left there after
* and replication slots. These might have been left there after
* the coordinator crashed during a shard split. It's important to delete them
* for two reasons:
* 1. Starting new shard split might fail when they exist, because it cannot
@ -513,6 +619,11 @@ DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPub
}
/*
* DropAllShardSplitSubscriptions drops all the existing subscriptions that
* match our shard split naming scheme on the node that the connection points
* to.
*/
void
DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
{
@ -529,6 +640,11 @@ DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection)
}
/*
* DropAllShardSplitPublications drops all the existing publications that
* match our shard split naming scheme on the node that the connection points
* to.
*/
static void
DropAllShardSplitPublications(MultiConnection *connection)
{
@ -545,6 +661,10 @@ DropAllShardSplitPublications(MultiConnection *connection)
}
/*
* DropAllShardSplitUsers drops all the users that match our shard split naming
* scheme. The users are temporary created for shard splits.
*/
static void
DropAllShardSplitUsers(MultiConnection *connection)
{
@ -561,6 +681,11 @@ DropAllShardSplitUsers(MultiConnection *connection)
}
/*
* DropAllShardSplitReplicationSlots drops all the existing replication slots
* that match shard split naming scheme on the node that the connection
* points to.
*/
static void
DropAllShardSplitReplicationSlots(MultiConnection *cleanupConnection)
{