users/saawasek/non_blocking_split_integrated
Sameer Awasekar 2022-07-24 15:16:44 +05:30
parent 9073564900
commit f6c11bf09c
2 changed files with 16 additions and 10 deletions

View File

@ -107,19 +107,20 @@ static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *work
static StringInfo CreateSplitShardReplicationSetupUDF( static StringInfo CreateSplitShardReplicationSetupUDF(
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval); static char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
static void DropDummyShards(void); WorkerNode *sourceWorkerNode,
static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval); MultiConnection **
char * CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval, templateSlotConnection);
WorkerNode *sourceWorkerNode, static List * ParseReplicationSlotInfoFromResult(PGresult *result);
MultiConnection **
templateSlotConnection);
static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode, static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
List *destinationWorkerNodesList); List *destinationWorkerNodesList);
static List * ParseReplicationSlotInfoFromResult(PGresult *result); static void AddDummyShardEntryInMap(uint32 targetNodeId, ShardInterval *shardInterval);
static void DropDummyShards(void);
static void TryDroppingShard(MultiConnection *connection, ShardInterval *shardInterval);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
static const char *const SplitOperationName[] = static const char *const SplitOperationName[] =
@ -760,6 +761,10 @@ DoSplitCopy(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList,
splitShardIntervalList, splitShardIntervalList,
destinationWorkerNodesList); destinationWorkerNodesList);
/*
* TODO(saawasek):1)Potentially refactor query list into a different method.
* 2) Assign Distributed Txn(confirm)?
*/
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
StringInfo beginTransaction = makeStringInfo(); StringInfo beginTransaction = makeStringInfo();
appendStringInfo(beginTransaction, appendStringInfo(beginTransaction,
@ -1419,7 +1424,8 @@ NonBlockingShardSplit(SplitOperation splitOperation,
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
/*TODO(saawasek): Add checks to open new connection if sourceConnection is not valid anymore.*/ /*TODO(saawasek): Add checks to open new connection if sourceConnection is not valid anymore.*/
DropAllShardSplitLeftOvers(sourceConnection, shardSplitHashMapForPublication); DropAllShardSplitLeftOvers(sourceShardToCopyNode,
shardSplitHashMapForPublication);
DropDummyShards(); DropDummyShards();
@ -1746,7 +1752,6 @@ static List *
ParseReplicationSlotInfoFromResult(PGresult *result) ParseReplicationSlotInfoFromResult(PGresult *result)
{ {
int64 rowCount = PQntuples(result); int64 rowCount = PQntuples(result);
int64 colCount = PQnfields(result);
List *replicationSlotInfoList = NIL; List *replicationSlotInfoList = NIL;
for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++)

View File

@ -560,6 +560,7 @@ DropAllShardSplitLeftOvers(WorkerNode *sourceNode, HTAB *shardSplitHashMapForPub
ClaimConnectionExclusively(cleanupConnection); ClaimConnectionExclusively(cleanupConnection);
DropAllShardSplitSubscriptions(cleanupConnection); DropAllShardSplitSubscriptions(cleanupConnection);
DropAllShardSplitUsers(cleanupConnection); DropAllShardSplitUsers(cleanupConnection);
/* Close connection after cleanup */ /* Close connection after cleanup */