1)Revert the original order of workflow

2) Introduce GetNextShardIdForSplitChild method
users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-08-04 16:26:15 +05:30
parent b92b00e3cf
commit 00c3830bee
3 changed files with 124 additions and 105 deletions

View File

@ -39,6 +39,7 @@
#include "distributed/shardsplit_logical_replication.h" #include "distributed/shardsplit_logical_replication.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
#include "postmaster/postmaster.h"
/* /*
* Entry for map that tracks ShardInterval -> Placement Node * Entry for map that tracks ShardInterval -> Placement Node
@ -127,6 +128,7 @@ static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId,
ShardInterval *shardInterval); ShardInterval *shardInterval);
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement); static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval); static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
static uint64 GetNextShardIdForSplitChild(void);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
@ -963,7 +965,7 @@ CreateSplitIntervalsForShard(ShardInterval *sourceShard,
{ {
ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard); ShardInterval *splitChildShardInterval = CopyShardInterval(sourceShard);
splitChildShardInterval->shardIndex = -1; splitChildShardInterval->shardIndex = -1;
splitChildShardInterval->shardId = GetNextShardId(); splitChildShardInterval->shardId = GetNextShardIdForSplitChild();
splitChildShardInterval->minValueExists = true; splitChildShardInterval->minValueExists = true;
splitChildShardInterval->minValue = currentSplitChildMinValue; splitChildShardInterval->minValue = currentSplitChildMinValue;
@ -1302,30 +1304,13 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Non-Blocking shard split workflow starts here */ /* Non-Blocking shard split workflow starts here */
PG_TRY(); PG_TRY();
{ {
/* /* 1) Physically create split children. */
* 1) Create empty publications. Tables will be added after
* template replication slot and split shards are created.
*/
CreateShardSplitEmptyPublications(sourceConnection,
shardSplitHashMapForPublication);
/*
* 2) Create template replication Slot. It returns a snapshot. The snapshot remains
* valid till the lifetime of the session that creates it. The connection is closed
* at the end of the workflow.
*/
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* 3) Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/* /*
* 4) Create dummy shards due to PG logical replication constraints. * 2) Create dummy shards due to PG logical replication constraints.
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information. * information.
*/ */
@ -1339,17 +1324,24 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateReplicaIdentities(mapOfDummyShardToPlacement, CreateReplicaIdentities(mapOfDummyShardToPlacement,
shardGroupSplitIntervalListList, workersForPlacementList); shardGroupSplitIntervalListList, workersForPlacementList);
/* 3) Create Publications. */
CreateShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 5) Alter Publications and add split shards for logical replication */ /*
AlterShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); * 4) Create template replication Slot. It returns a snapshot. The snapshot remains
* valid till the lifetime of the session that creates it. The connection is closed
* at the end of the workflow.
*/
MultiConnection *templateSlotConnection = NULL;
char *snapShotName = CreateTemplateReplicationSlotAndReturnSnapshot(
shardIntervalToSplit, sourceShardToCopyNode, &templateSlotConnection);
/* 5) Do snapshotted Copy */
/* 6) Do snapshotted Copy */
DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList, DoSplitCopy(sourceShardToCopyNode, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, workersForPlacementList, shardGroupSplitIntervalListList, workersForPlacementList,
snapShotName); snapShotName);
/* 7) Execute 'worker_split_shard_replication_setup UDF */ /* 6) Execute 'worker_split_shard_replication_setup UDF */
List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF( List *replicationSlotInfoList = ExecuteSplitShardReplicationSetupUDF(
sourceShardToCopyNode, sourceShardToCopyNode,
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
@ -1364,19 +1356,19 @@ NonBlockingShardSplit(SplitOperation splitOperation,
PopulateShardSplitSubscriptionsMetadataList( PopulateShardSplitSubscriptionsMetadataList(
shardSplitHashMapForPublication, replicationSlotInfoList); shardSplitHashMapForPublication, replicationSlotInfoList);
/* Create connections to the target nodes. TODO: can be refactored */ /* Create connections to the target nodes */
List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit( List *targetNodeConnectionList = CreateTargetNodeConnectionsForShardSplit(
shardSplitSubscribersMetadataList, shardSplitSubscribersMetadataList,
connectionFlags, connectionFlags,
superUser, databaseName); superUser, databaseName);
/* 8) Create copies of template replication slot */ /* 7) Create copies of template replication slot */
char *templateSlotName = ShardSplitTemplateReplicationSlotName( char *templateSlotName = ShardSplitTemplateReplicationSlotName(
shardIntervalToSplit->shardId); shardIntervalToSplit->shardId);
CreateReplicationSlots(sourceConnection, templateSlotName, CreateReplicationSlots(sourceConnection, templateSlotName,
shardSplitSubscribersMetadataList); shardSplitSubscribersMetadataList);
/* 9) Create subscriptions on target nodes */ /* 8) Create subscriptions on target nodes */
CreateShardSplitSubscriptions(targetNodeConnectionList, CreateShardSplitSubscriptions(targetNodeConnectionList,
shardSplitSubscribersMetadataList, shardSplitSubscribersMetadataList,
sourceShardToCopyNode, sourceShardToCopyNode,
@ -1386,40 +1378,40 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* Used for testing */ /* Used for testing */
ConflictOnlyWithIsolationTesting(); ConflictOnlyWithIsolationTesting();
/* 10) Wait for subscriptions to be ready */ /* 9) Wait for subscriptions to be ready */
WaitForShardSplitRelationSubscriptionsBecomeReady( WaitForShardSplitRelationSubscriptionsBecomeReady(
shardSplitSubscribersMetadataList); shardSplitSubscribersMetadataList);
/* 11) Wait for subscribers to catchup till source LSN */ /* 10) Wait for subscribers to catchup till source LSN */
XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection); XLogRecPtr sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList); shardSplitSubscribersMetadataList);
/* 12) Create Auxilary structures */ /* 11) Create Auxilary structures */
CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList, CreateAuxiliaryStructuresForShardGroup(shardGroupSplitIntervalListList,
workersForPlacementList, workersForPlacementList,
false /* includeReplicaIdentity*/); false /* includeReplicaIdentity*/);
/* 13) Wait for subscribers to catchup till source LSN */ /* 12) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList); shardSplitSubscribersMetadataList);
/* 14) Block writes on source shards */ /* 13) Block writes on source shards */
BlockWritesToShardList(sourceColocatedShardIntervalList); BlockWritesToShardList(sourceColocatedShardIntervalList);
/* 15) Wait for subscribers to catchup till source LSN */ /* 14) Wait for subscribers to catchup till source LSN */
sourcePosition = GetRemoteLogPosition(sourceConnection); sourcePosition = GetRemoteLogPosition(sourceConnection);
WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition, WaitForShardSplitRelationSubscriptionsToBeCaughtUp(sourcePosition,
shardSplitSubscribersMetadataList); shardSplitSubscribersMetadataList);
/* 16) Drop Subscribers */ /* 15) Drop Subscribers */
DropShardSplitSubsriptions(shardSplitSubscribersMetadataList); DropShardSplitSubsriptions(shardSplitSubscribersMetadataList);
/* 17) Drop Publications */ /* 16) Drop Publications */
DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication); DropShardSplitPublications(sourceConnection, shardSplitHashMapForPublication);
/* 18) Drop replication slots /* 17) Drop replication slots
* Drop template and subscriber replication slots * Drop template and subscriber replication slots
*/ */
DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName( DropShardReplicationSlot(sourceConnection, ShardSplitTemplateReplicationSlotName(
@ -1427,13 +1419,13 @@ NonBlockingShardSplit(SplitOperation splitOperation,
DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList); DropShardSplitReplicationSlots(sourceConnection, replicationSlotInfoList);
/* /*
* 19) Drop old shards and delete related metadata. Have to do that before * 18) Drop old shards and delete related metadata. Have to do that before
* creating the new shard metadata, because there's cross-checks * creating the new shard metadata, because there's cross-checks
* preventing inconsistent metadata (like overlapping shards). * preventing inconsistent metadata (like overlapping shards).
*/ */
DropShardList(sourceColocatedShardIntervalList); DropShardList(sourceColocatedShardIntervalList);
/* 20) Insert new shard and placement metdata */ /* 19) Insert new shard and placement metdata */
InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList, InsertSplitChildrenShardMetadata(shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
@ -1441,7 +1433,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
workersForPlacementList); workersForPlacementList);
/* /*
* 21) Create foreign keys if exists after the metadata changes happening in * 20) Create foreign keys if exists after the metadata changes happening in
* DropShardList() and InsertSplitChildrenShardMetadata() because the foreign * DropShardList() and InsertSplitChildrenShardMetadata() because the foreign
* key creation depends on the new metadata. * key creation depends on the new metadata.
*/ */
@ -1449,17 +1441,17 @@ NonBlockingShardSplit(SplitOperation splitOperation,
workersForPlacementList); workersForPlacementList);
/* /*
* 22) Drop dummy shards. * 21) Drop dummy shards.
*/ */
DropDummyShards(mapOfDummyShardToPlacement); DropDummyShards(mapOfDummyShardToPlacement);
/* 23) Close source connection */ /* 22) Close source connection */
CloseConnection(sourceConnection); CloseConnection(sourceConnection);
/* 24) Close all subscriber connections */ /* 23) Close all subscriber connections */
CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList); CloseShardSplitSubscriberConnections(shardSplitSubscribersMetadataList);
/* 25) Close connection of template replication slot */ /* 24) Close connection of template replication slot */
CloseConnection(templateSlotConnection); CloseConnection(templateSlotConnection);
} }
PG_CATCH(); PG_CATCH();
@ -1969,3 +1961,63 @@ CreateReplicaIdentities(HTAB *mapOfDummyShardToPlacement,
shardToBeDroppedNode->workerPort); shardToBeDroppedNode->workerPort);
} }
} }
/*
* GetNextShardIdForSplitChild returns shard id to be used for split child.
* The function connects to the local node through a new connection and gets the next
* sequence. This prevents self deadlock when 'CREATE_REPLICATION_SLOT' is executed
* as a part of nonblocking split workflow.
*/
static uint64
GetNextShardIdForSplitChild()
{
uint64 shardId = 0;
/*
* In regression tests, we would like to generate shard IDs consistently
* even if the tests run in parallel. Instead of the sequence, we can use
* the next_shard_id GUC to specify which shard ID the current session should
* generate next. The GUC is automatically increased by 1 every time a new
* shard ID is generated.
*/
if (NextShardId > 0)
{
shardId = NextShardId;
NextShardId += 1;
return shardId;
}
StringInfo nextValueCommand = makeStringInfo();
appendStringInfo(nextValueCommand, "SELECT nextval(%s);", quote_literal_cstr(
"pg_catalog.pg_dist_shardid_seq"));
int connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlag,
LocalHostName,
PostPortNumber,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection, nextValueCommand->data,
&result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1 ||
PQnfields(result) != 1)
{
PQclear(result);
ForgetResults(connection);
CloseConnection(connection);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"Could not generate next shard id while executing shard splits.")));
}
shardId = SafeStringToUint64(PQgetvalue(result, 0, 0 /* nodeId column*/));
CloseConnection(connection);
return shardId;
}

View File

@ -33,14 +33,14 @@ static HTAB *ShardInfoHashMapForPublications = NULL;
static void AddPublishableShardEntryInMap(uint32 targetNodeId, static void AddPublishableShardEntryInMap(uint32 targetNodeId,
ShardInterval *shardInterval, bool ShardInterval *shardInterval, bool
isChildShardInterval); isChildShardInterval);
static void AlterShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
ownerId);
ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32 ShardSplitSubscriberMetadata * CreateShardSplitSubscriberMetadata(Oid tableOwnerId, uint32
nodeId, nodeId,
List * List *
replicationSlotInfoList); replicationSlotInfoList);
static void CreateShardSplitPublicationForNode(MultiConnection *connection,
List *shardList,
uint32_t publicationForTargetNodeId, Oid
tableOwner);
static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId); static char * ShardSplitPublicationName(uint32_t nodeId, Oid ownerId);
static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection); static void DropAllShardSplitSubscriptions(MultiConnection *cleanupConnection);
static void DropAllShardSplitPublications(MultiConnection *cleanupConnection); static void DropAllShardSplitPublications(MultiConnection *cleanupConnection);
@ -174,44 +174,7 @@ AddPublishableShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval,
/* /*
* CreateShardSplitEmptyPublications creates empty publications on the source node. * CreateShardSplitPublications creates publications on the source node.
* Due to a sporadic bug in PG, we have to create publications before we create replication slot.
* After the template replication slot is created, these empty publications are altered
* with actual tables to be replicated.
* More details about the bug can be found in the below mailing link.
* (https://www.postgresql.org/message-id/20191010115752.2d0f27af%40firost).
*
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
* related to split operations.
*/
void
CreateShardSplitEmptyPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, shardInfoHashMapForPublication);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId;
StringInfo createEmptyPublicationCommand = makeStringInfo();
appendStringInfo(createEmptyPublicationCommand, "CREATE PUBLICATION %s",
ShardSplitPublicationName(nodeId, tableOwnerId));
ExecuteCriticalRemoteCommand(sourceConnection,
createEmptyPublicationCommand->data);
pfree(createEmptyPublicationCommand->data);
pfree(createEmptyPublicationCommand);
}
}
/*
* AlterShardSplitPublications alters publications on the source node.
* It adds split shards for logical replication.
* *
* sourceConnection - Connection of source node. * sourceConnection - Connection of source node.
* *
@ -220,7 +183,7 @@ CreateShardSplitEmptyPublications(MultiConnection *sourceConnection,
* ShardIntervals mapped by key. * ShardIntervals mapped by key.
*/ */
void void
AlterShardSplitPublications(MultiConnection *sourceConnection, CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication) HTAB *shardInfoHashMapForPublication)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
@ -232,7 +195,9 @@ AlterShardSplitPublications(MultiConnection *sourceConnection,
uint32 nodeId = entry->key.nodeId; uint32 nodeId = entry->key.nodeId;
uint32 tableOwnerId = entry->key.tableOwnerId; uint32 tableOwnerId = entry->key.tableOwnerId;
List *shardListForPublication = entry->shardSplitInfoList; List *shardListForPublication = entry->shardSplitInfoList;
AlterShardSplitPublicationForNode(sourceConnection,
/* Create publication on shard list */
CreateShardSplitPublicationForNode(sourceConnection,
shardListForPublication, shardListForPublication,
nodeId, nodeId,
tableOwnerId); tableOwnerId);
@ -241,17 +206,19 @@ AlterShardSplitPublications(MultiConnection *sourceConnection,
/* /*
* AlterShardSplitPublicationForNode adds shards that have to be replicated * CreateShardSplitPublicationForNode creates a publication on source node
* for a given publication. * for given shard list.
* We follow the 'SHARD_SPLIT_X_PREFIX' naming scheme for creating publications
* related to split operations.
*/ */
static void static void
AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList, CreateShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
uint32_t publicationForTargetNodeId, Oid ownerId) uint32_t publicationForTargetNodeId, Oid ownerId)
{ {
StringInfo alterPublicationCommand = makeStringInfo(); StringInfo createPublicationCommand = makeStringInfo();
bool prefixWithComma = false; bool prefixWithComma = false;
appendStringInfo(alterPublicationCommand, "ALTER PUBLICATION %s ADD TABLE ", appendStringInfo(createPublicationCommand, "CREATE PUBLICATION %s FOR TABLE ",
ShardSplitPublicationName(publicationForTargetNodeId, ownerId)); ShardSplitPublicationName(publicationForTargetNodeId, ownerId));
ShardInterval *shard = NULL; ShardInterval *shard = NULL;
@ -261,16 +228,16 @@ AlterShardSplitPublicationForNode(MultiConnection *connection, List *shardList,
if (prefixWithComma) if (prefixWithComma)
{ {
appendStringInfoString(alterPublicationCommand, ","); appendStringInfoString(createPublicationCommand, ",");
} }
appendStringInfoString(alterPublicationCommand, shardName); appendStringInfoString(createPublicationCommand, shardName);
prefixWithComma = true; prefixWithComma = true;
} }
ExecuteCriticalRemoteCommand(connection, alterPublicationCommand->data); ExecuteCriticalRemoteCommand(connection, createPublicationCommand->data);
pfree(alterPublicationCommand->data); pfree(createPublicationCommand->data);
pfree(alterPublicationCommand); pfree(createPublicationCommand);
} }

View File

@ -87,7 +87,7 @@ extern List * CreateTargetNodeConnectionsForShardSplit(
char *databaseName); char *databaseName);
/* Functions to drop publisher-subscriber resources */ /* Functions to drop publisher-subscriber resources */
extern void CreateShardSplitEmptyPublications(MultiConnection *sourceConnection, extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication); HTAB *shardInfoHashMapForPublication);
extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit, extern char * CreateTemplateReplicationSlot(ShardInterval *shardIntervalToSplit,
MultiConnection * MultiConnection *