Add comments for the functions

users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-23 22:58:00 +05:30
parent 5a5505db55
commit b4f8226f96
3 changed files with 172 additions and 161 deletions

View File

@ -63,10 +63,6 @@ static void CreateAndCopySplitShardsForShardGroup(
static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, static void CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -113,7 +109,7 @@ static StringInfo CreateSplitShardReplicationSetupUDF(
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval);
static void DropDummyShards(void); static void DropDummyShards(void);
void TryDropShard(MultiConnection *connection, ShardInterval *shardInterval); static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval);
char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
MultiConnection ** MultiConnection **
@ -123,6 +119,7 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static List * ParseReplicationSlotInfoFromResult(PGresult *result);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
static const char *const SplitOperationName[] = static const char *const SplitOperationName[] =
@ -1400,39 +1397,28 @@ NonBlockingShardSplit(SplitOperation splitOperation,
} }
/* Create ShardGroup split children on a list of corresponding workers. */ /*
static void * Given we are using PG logical replication infrastructure there are some constraints
CreateSplitShardsForShardGroupTwo(WorkerNode *sourceShardNode, * that need to met around matching table names in source and target nodes:
List *sourceColocatedShardIntervalList, * The restrictions in context of split are:
List *shardGroupSplitIntervalListList, * Constraint 1: Dummy source shard(s) from shard group must exist on all destination nodes.
List *workersForPlacementList) * Constraint 2: Dummy target shards from shard group must exist on source node.
{ * Example :
/* Iterate on shard interval list for shard group */ * Shard1[1-200] is co-located with Shard2[1-200] in Worker0.
List *shardIntervalList = NULL; * We are splitting 2-way to worker0 (same node) and worker1 (different node).
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) *
{ * Non-Dummy shards (expected from Split):
/* Iterate on split shard interval list and corresponding placement worker */ * In Worker0 --> Shard1_1 and Shard2_1.
ShardInterval *shardInterval = NULL; * In Worker1 --> Shard1_2 and Shard2_2.
WorkerNode *workerPlacementNode = NULL; *
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, * Dummy shards:
workersForPlacementList) * From constraint 1, we need to create: Dummy Shard1 and Shard2 in Worker0. Dummy Shard1 and Shard2 in Worker1
{ * Note 1 : Given there is an overlap of source and destination in Worker0, Shard1 and Shard2 need not be created.
/* Populate list of commands necessary to create shard interval on destination */ * Be very careful here, dropping Shard1, Shard2 with customer data to create dummy Shard1, Shard2 on worker0 is catastrophic.
List *splitShardCreationCommandList = GetPreLoadTableCreationCommands( *
shardInterval->relationId, * From constraint 2, we need to create: Dummy Shard1_1, Shard2_1, Shard1_2 and Shard2_2 in Worker0.
false, /* includeSequenceDefaults */ * Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created.
NULL /* auto add columnar options for cstore tables */); */
splitShardCreationCommandList = WorkerApplyShardDDLCommandList(
splitShardCreationCommandList,
shardInterval->shardId);
/* Create new split child shard on the specified worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
}
}
}
static void static void
CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList, CreateDummyShardsForShardGroup(List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
@ -1546,89 +1532,13 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
} }
static void /*
AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval) * CreateTemplateReplicationSlotAndReturnSnapshot creates a replication slot
{ * and returns its snapshot. This slot acts as a 'Template' for creating
NodeShardMappingKey key; * replication slot copies used for logical replication.
key.nodeId = targetNodeId; *
key.tableOwnerId = TableOwnerOid(shardInterval->relationId); * The snapshot remains valid till the lifetime of the session that creates it.
*/
bool found = false;
NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER,
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NIL;
}
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
}
static void
DropDummyShards()
{
/* Return if no dummy shards are created */
if (DummyShardInfoHashMap == NULL)
{
return;
}
HASH_SEQ_STATUS status;
hash_seq_init(&status, DummyShardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
connectionFlags |= FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
List *dummyShardIntervalList = entry->shardSplitInfoList;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList)
{
TryDropShard(connection, shardInterval);
}
CloseConnection(connection);
}
}
void
TryDropShard(MultiConnection *connection, ShardInterval *shardInterval)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connection,
dropShardQuery->data,
NULL /* pgResult */);
}
char * char *
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -1646,17 +1556,25 @@ CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
get_database_name( get_database_name(
MyDatabaseId)); MyDatabaseId));
ClaimConnectionExclusively(sourceConnection); ClaimConnectionExclusively(sourceConnection);
/*
* Try to drop leftover template replication slot if any from previous operation
* and create new one.
*/
char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval, char *snapShotName = DropExistingIfAnyAndCreateTemplateReplicationSlot(shardInterval,
sourceConnection); sourceConnection);
*templateSlotConnection = sourceConnection; *templateSlotConnection = sourceConnection;
return snapShotName; return snapShotName;
} }
/*
* ExecuteSplitShardReplicationSetupUDF executes
* 'worker_split_shard_replication_setup' UDF on source shard node
* and returns list of ReplicationSlotInfo.
*/
static List * static List *
ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
@ -1668,6 +1586,7 @@ ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
destinationWorkerNodesList); destinationWorkerNodesList);
/* Force a new connection to execute the UDF */
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags, MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode-> sourceWorkerNode->
@ -1784,3 +1703,132 @@ CreateSplitShardReplicationSetupUDF(List *sourceColocatedShardIntervalList,
return splitShardReplicationUDF; return splitShardReplicationUDF;
} }
/*
* ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'.
* 'replication_slot_info' is a tuple with below format:
* <targetNodeId, tableOwnerName, replicationSlotName>
*/
static List *
ParseReplicationSlotInfoFromResult(PGresult *result)
{
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
sizeof(ReplicationSlotInfo));
char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* We're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex,
1 /* table owner name column */));
/* Replication slot name */
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex,
2 /* slot name column */));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
}
return replicationSlotInfoList;
}
/*
* AddDummyShardEntryInMap adds shard entry into hash map to keep track
* of dummy shards that are created. These shards are cleanedup after split completes.
*
* This is a cautious measure to keep track of dummy shards created for constraints
* of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap.
*/
static void
AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval)
{
NodeShardMappingKey key;
key.nodeId = targetNodeId;
key.tableOwnerId = TableOwnerOid(shardInterval->relationId);
bool found = false;
NodeShardMappingEntry *nodeMappingEntry =
(NodeShardMappingEntry *) hash_search(DummyShardInfoHashMap, &key, HASH_ENTER,
&found);
if (!found)
{
nodeMappingEntry->shardSplitInfoList = NIL;
}
nodeMappingEntry->shardSplitInfoList =
lappend(nodeMappingEntry->shardSplitInfoList, (ShardInterval *) shardInterval);
}
static void
DropDummyShards()
{
/* Return if no dummy shards are created */
if (DummyShardInfoHashMap == NULL)
{
return;
}
HASH_SEQ_STATUS status;
hash_seq_init(&status, DummyShardInfoHashMap);
NodeShardMappingEntry *entry = NULL;
while ((entry = (NodeShardMappingEntry *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
connectionFlags |= FORCE_NEW_CONNECTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
List *dummyShardIntervalList = entry->shardSplitInfoList;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList)
{
TryDroppingShard(connection, shardInterval);
}
CloseConnection(connection);
}
}
/*
* TryDroppingShard drops a given shard on the source node connection.
*/
static void
TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connection,
dropShardQuery->data,
NULL /* pgResult */);
}

View File

@ -517,42 +517,6 @@ CreateReplicationSlots(MultiConnection *sourceNodeConnection, char *templateSlot
} }
/*
* ParseReplicationSlotInfoFromResult parses custom datatype 'replication_slot_info'.
* 'replication_slot_info' is a tuple with below format:
* <targetNodeId, tableOwnerName, replicationSlotName>
*/
List *
ParseReplicationSlotInfoFromResult(PGresult *result)
{
int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
ReplicationSlotInfo *replicationSlotInfo = (ReplicationSlotInfo *) palloc0(
sizeof(ReplicationSlotInfo));
char *targeNodeIdString = PQgetvalue(result, rowIndex, 0 /* nodeId column*/);
replicationSlotInfo->targetNodeId = strtoul(targeNodeIdString, NULL, 10);
/* We're using the pstrdup to copy the data into the current memory context */
replicationSlotInfo->tableOwnerName = pstrdup(PQgetvalue(result, rowIndex,
1 /* table owner name column */));
/* Replication slot name */
replicationSlotInfo->slotName = pstrdup(PQgetvalue(result, rowIndex,
2 /* slot name column */));
replicationSlotInfoList = lappend(replicationSlotInfoList, replicationSlotInfo);
}
return replicationSlotInfoList;
}
/* /*
* DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles * DropAllShardSplitLeftOvers drops shard split subscriptions, publications, roles
* and replication slots. These might have been left there after * and replication slots. These might have been left there after

View File

@ -61,7 +61,6 @@ extern int NodeShardMappingHashCompare(const void *left, const void *right, Size
extern HTAB * SetupHashMapForShardInfo(void); extern HTAB * SetupHashMapForShardInfo(void);
/* Functions for subscriber metadata management */ /* Functions for subscriber metadata management */
extern List * ParseReplicationSlotInfoFromResult(PGresult *result);
extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap, extern List * PopulateShardSplitSubscriptionsMetadataList(HTAB *shardSplitInfoHashMap,
List *replicationSlotInfoList); List *replicationSlotInfoList);
extern HTAB * CreateShardSplitInfoMapForPublication( extern HTAB * CreateShardSplitInfoMapForPublication(
@ -73,7 +72,7 @@ extern HTAB * CreateShardSplitInfoMapForPublication(
extern void CreateShardSplitPublications(MultiConnection *sourceConnection, extern void CreateShardSplitPublications(MultiConnection *sourceConnection,
HTAB *shardInfoHashMapForPublication); HTAB *shardInfoHashMapForPublication);
extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList, extern void CreateShardSplitSubscriptions(List *targetNodeConnectionList,
List *shardSplitPubSubMetadataList, List *shardSplitSubscriberMetadataList,
WorkerNode *sourceWorkerNode, char *superUser, WorkerNode *sourceWorkerNode, char *superUser,
char *databaseName); char *databaseName);
extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection, extern void CreateReplicationSlots(MultiConnection *sourceNodeConnection,