Rebase shard_split changes on main merge

niupre/TestDeferredDropAndCleanup
Nitish Upreti 2022-08-30 17:36:59 -07:00
parent 231b8ac719
commit c8b7817bec
1 changed files with 208 additions and 238 deletions

View File

@ -34,6 +34,7 @@
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shared_library_init.h" #include "distributed/shared_library_init.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
@ -45,6 +46,9 @@
#include "distributed/shard_rebalancer.h" #include "distributed/shard_rebalancer.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
/* declarations for dynamic loading */
bool DeferShardDeleteOnSplit = true;
/* /*
* Entry for map that tracks ShardInterval -> Placement Node * Entry for map that tracks ShardInterval -> Placement Node
* created by split workflow. * created by split workflow.
@ -73,12 +77,13 @@ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit, ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList, List *shardSplitPointsList,
List *nodeIdsForPlacementList); List *nodeIdsForPlacementList);
static bool CheckIfRelationWithSameNameExists(ShardInterval *shardInterval,
WorkerNode *workerNode);
static void ErrorIfModificationAndSplitInTheSameTransaction(SplitOperation static void ErrorIfModificationAndSplitInTheSameTransaction(SplitOperation
splitOperation); splitOperation);
static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -87,7 +92,7 @@ static HTAB * CreateWorkerForPlacementSet(List *workersForPlacementList);
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList, List *workersForPlacementList,
bool includeReplicaIdentity); bool includeReplicaIdentity);
static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfDummyShardToPlacement); static void CreateReplicaIdentitiesForDummyShards(HTAB *mapOfPlacementToDummyShardList);
static void CreateObjectOnPlacement(List *objectCreationCommandList, static void CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerNode); WorkerNode *workerNode);
static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList, static List * CreateSplitIntervalsForShardGroup(List *sourceColocatedShardList,
@ -131,8 +136,6 @@ static void CreatePartitioningHierarchy(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow);
static HTAB * CreateEmptyMapForShardsCreatedByWorkflow();
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
static StringInfo CreateSplitShardReplicationSetupUDF( static StringInfo CreateSplitShardReplicationSetupUDF(
List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList,
@ -148,10 +151,8 @@ static List * ExecuteSplitShardReplicationSetupUDF(WorkerNode *sourceWorkerNode,
DistributionColumnMap * DistributionColumnMap *
distributionColumnOverrides); distributionColumnOverrides);
static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode); static void ExecuteSplitShardReleaseSharedMemory(WorkerNode *sourceWorkerNode);
static void AddDummyShardEntryInMap(HTAB *mapOfDummyShards, uint32 targetNodeId, static void AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId,
ShardInterval *shardInterval); ShardInterval *shardInterval);
static void DropDummyShards(HTAB *mapOfDummyShardToPlacement);
static void DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval);
static uint64 GetNextShardIdForSplitChild(void); static uint64 GetNextShardIdForSplitChild(void);
static void AcquireNonblockingSplitLock(Oid relationId); static void AcquireNonblockingSplitLock(Oid relationId);
static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList); static List * GetWorkerNodesFromWorkerIds(List *nodeIdsForPlacementList);
@ -515,6 +516,9 @@ SplitShard(SplitMode splitMode,
/* use the user-specified shard ID as the split workflow ID */ /* use the user-specified shard ID as the split workflow ID */
uint64 splitWorkflowId = shardIntervalToSplit->shardId; uint64 splitWorkflowId = shardIntervalToSplit->shardId;
/* Start operation to prepare for generating cleanup records */
StartNewOperationNeedingCleanup();
if (splitMode == BLOCKING_SPLIT) if (splitMode == BLOCKING_SPLIT)
{ {
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
@ -539,70 +543,9 @@ SplitShard(SplitMode splitMode,
PlacementMovedUsingLogicalReplicationInTX = true; PlacementMovedUsingLogicalReplicationInTX = true;
} }
}
bool isSuccess = true;
/* CompleteNewOperationNeedingCleanup(isSuccess);
* ShardIntervalHashCode computes the hash code for a Shardinterval using
* shardId.
*/
static uint32
ShardIntervalHashCode(const void *key, Size keySize)
{
const ShardInterval *shardInterval = (const ShardInterval *) key;
const uint64 *shardId = &(shardInterval->shardId);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + tag_hash(shardId, sizeof(uint64));
return result;
}
/*
* ShardIntervalHashCompare compares two shard intervals using shard id.
*/
static int
ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize)
{
const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey;
const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey;
int shardIdCompare = 0;
/* first, compare by shard id */
if (intervalLhs->shardId < intervalRhs->shardId)
{
shardIdCompare = -1;
}
else if (intervalLhs->shardId > intervalRhs->shardId)
{
shardIdCompare = 1;
}
return shardIdCompare;
}
/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */
static HTAB *
CreateEmptyMapForShardsCreatedByWorkflow()
{
HASHCTL info = { 0 };
info.keysize = sizeof(ShardInterval);
info.entrysize = sizeof(ShardCreatedByWorkflowEntry);
info.hash = ShardIntervalHashCode;
info.match = ShardIntervalHashCompare;
info.hcxt = CurrentMemoryContext;
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map",
32, &info, hashFlags);
return splitChildrenCreatedByWorkflow;
} }
@ -635,13 +578,10 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardNode = WorkerNode *sourceShardNode =
ActiveShardPlacementWorkerNode(firstShard->shardId); ActiveShardPlacementWorkerNode(firstShard->shardId);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
PG_TRY(); PG_TRY();
{ {
/* Physically create split children. */ /* Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/* For Blocking split, copy isn't snapshotted */ /* For Blocking split, copy isn't snapshotted */
@ -690,7 +630,8 @@ BlockingShardSplit(SplitOperation splitOperation,
ShutdownAllConnections(); ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */ /* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); bool isSuccess = false;
CompleteNewOperationNeedingCleanup(isSuccess);
PG_RE_THROW(); PG_RE_THROW();
} }
@ -701,10 +642,58 @@ BlockingShardSplit(SplitOperation splitOperation,
} }
/* Check if a relation with given name already exists on the worker node */
static bool
CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *workerNode)
{
char *schemaName = get_namespace_name(
get_rel_namespace(shardInterval->relationId));
char *shardName = get_rel_name(shardInterval->relationId);
AppendShardIdToName(&shardName, shardInterval->shardId);
StringInfo checkShardExistsQuery = makeStringInfo();
appendStringInfo(checkShardExistsQuery,
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s');",
schemaName,
shardName);
int connectionFlags = 0;
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
workerNode->workerName,
workerNode->workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
PGresult *result = NULL;
int queryResult = ExecuteOptionalRemoteCommand(connection,
checkShardExistsQuery->data, &result);
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"Cannot check if relation %s already exists for split on worker %s:%d",
ConstructQualifiedShardName(shardInterval),
connection->hostname,
connection->port)));
PQclear(result);
ForgetResults(connection);
return false;
}
char *checkExists = PQgetvalue(result, 0, 0);
PQclear(result);
ForgetResults(connection);
return strcmp(checkExists, "t") == 0;
}
/* Create ShardGroup split children on a list of corresponding workers. */ /* Create ShardGroup split children on a list of corresponding workers. */
static void static void
CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList) List *workersForPlacementList)
{ {
/* /*
@ -732,16 +721,35 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create new split child shard on the specified placement list */ /* Log resource for cleanup in case of failure only.
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); * Before we log a record, do a best effort check to see if a shard with same name exists.
* This is because, it will cause shard creation to fail and we will end up cleaning the
* old shard. We don't want that.
*/
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
workerPlacementNode);
ShardCreatedByWorkflowEntry entry; if (relationExists)
entry.shardIntervalKey = shardInterval; {
entry.workerNodeValue = workerPlacementNode; ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
bool found = false; errmsg("Relation %s already exists on worker %s:%d.",
hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER, ConstructQualifiedShardName(shardInterval),
&found); workerPlacementNode->workerName,
Assert(!found); workerPlacementNode->workerPort)));
}
else
{
CleanupPolicy policy = CLEANUP_ON_FAILURE;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
policy);
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
workerPlacementNode);
}
} }
} }
} }
@ -1336,20 +1344,37 @@ DropShardList(List *shardIntervalList)
/* get shard name */ /* get shard name */
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
char storageType = shardInterval->storageType; if (DeferShardDeleteOnSplit)
if (storageType == SHARD_STORAGE_TABLE)
{ {
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND, /* Log shard in pg_dist_cleanup.
qualifiedShardName); * Parent shards are to be dropped only on sucess after split workflow is complete,
* so mark the policy as 'CLEANUP_DEFERRED_ON_SUCCESS'.
* We also log cleanup record in the current transaction. If the current transaction rolls back,
* we do not generate a record at all.
*/
CleanupPolicy policy = CLEANUP_DEFERRED_ON_SUCCESS;
InsertCleanupRecordInCurrentTransaction(CLEANUP_SHARD_PLACEMENT,
qualifiedShardName,
placement->groupId,
policy);
} }
else if (storageType == SHARD_STORAGE_FOREIGN) else
{ {
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND, char storageType = shardInterval->storageType;
qualifiedShardName); if (storageType == SHARD_STORAGE_TABLE)
} {
appendStringInfo(dropQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
}
else if (storageType == SHARD_STORAGE_FOREIGN)
{
appendStringInfo(dropQuery, DROP_FOREIGN_TABLE_COMMAND,
qualifiedShardName);
}
/* drop old shard */ /* drop old shard */
SendCommandToWorker(workerName, workerPort, dropQuery->data); SendCommandToWorker(workerName, workerPort, dropQuery->data);
}
} }
/* delete shard row */ /* delete shard row */
@ -1358,50 +1383,6 @@ DropShardList(List *shardIntervalList)
} }
/*
* In case of failure, TryDropSplitShardsOnFailure drops in-progress shard placements from both the
* coordinator and mx nodes.
*/
static void
TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow)
{
HASH_SEQ_STATUS status;
ShardCreatedByWorkflowEntry *entry;
hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow);
while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0)
{
ShardInterval *shardInterval = entry->shardIntervalKey;
WorkerNode *workerPlacementNode = entry->workerNodeValue;
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection(
connectionFlags,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connnection,
dropShardQuery->data,
NULL /* pgResult */);
}
}
/* /*
* AcquireNonblockingSplitLock does not allow concurrent nonblocking splits, because we share memory and * AcquireNonblockingSplitLock does not allow concurrent nonblocking splits, because we share memory and
* replication slots. * replication slots.
@ -1488,11 +1469,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
databaseName); databaseName);
ClaimConnectionExclusively(sourceConnection); ClaimConnectionExclusively(sourceConnection);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
HTAB *mapOfDummyShardToPlacement = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
MultiConnection *sourceReplicationConnection = MultiConnection *sourceReplicationConnection =
GetReplicationConnection(sourceShardToCopyNode->workerName, GetReplicationConnection(sourceShardToCopyNode->workerName,
sourceShardToCopyNode->workerPort); sourceShardToCopyNode->workerPort);
@ -1501,8 +1477,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
PG_TRY(); PG_TRY();
{ {
/* 1) Physically create split children. */ /* 1) Physically create split children. */
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/* /*
@ -1510,8 +1485,10 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth * Refer to the comment section of 'CreateDummyShardsForShardGroup' for indepth
* information. * information.
*/ */
HTAB *mapOfPlacementToDummyShardList = CreateSimpleHash(NodeAndOwner,
GroupedShardSplitInfos);
CreateDummyShardsForShardGroup( CreateDummyShardsForShardGroup(
mapOfDummyShardToPlacement, mapOfPlacementToDummyShardList,
sourceColocatedShardIntervalList, sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList, shardGroupSplitIntervalListList,
sourceShardToCopyNode, sourceShardToCopyNode,
@ -1526,7 +1503,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* initial COPY phase, like we do for the replica identities on the * initial COPY phase, like we do for the replica identities on the
* target shards. * target shards.
*/ */
CreateReplicaIdentitiesForDummyShards(mapOfDummyShardToPlacement); CreateReplicaIdentitiesForDummyShards(mapOfPlacementToDummyShardList);
/* 4) Create Publications. */ /* 4) Create Publications. */
CreatePublications(sourceConnection, publicationInfoHash); CreatePublications(sourceConnection, publicationInfoHash);
@ -1680,11 +1657,6 @@ NonBlockingShardSplit(SplitOperation splitOperation,
CreateForeignKeyConstraints(shardGroupSplitIntervalListList, CreateForeignKeyConstraints(shardGroupSplitIntervalListList,
workersForPlacementList); workersForPlacementList);
/*
* 24) Drop dummy shards.
*/
DropDummyShards(mapOfDummyShardToPlacement);
/* /*
* 24) Release shared memory allocated by worker_split_shard_replication_setup udf * 24) Release shared memory allocated by worker_split_shard_replication_setup udf
* at source node. * at source node.
@ -1705,14 +1677,14 @@ NonBlockingShardSplit(SplitOperation splitOperation,
/* end ongoing transactions to enable us to clean up */ /* end ongoing transactions to enable us to clean up */
ShutdownAllConnections(); ShutdownAllConnections();
/* Do a best effort cleanup of shards created on workers in the above block */ /* Do a best effort cleanup of shards created on workers in the above block
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); * TODO(niupre): We don't need to do this once shard cleaner can clean replication
* artifacts.
*/
DropAllLogicalReplicationLeftovers(SHARD_SPLIT); DropAllLogicalReplicationLeftovers(SHARD_SPLIT);
DropDummyShards(mapOfDummyShardToPlacement); bool isSuccess = false;
CompleteNewOperationNeedingCleanup(isSuccess);
ExecuteSplitShardReleaseSharedMemory(sourceShardToCopyNode);
PG_RE_THROW(); PG_RE_THROW();
} }
@ -1743,7 +1715,7 @@ NonBlockingShardSplit(SplitOperation splitOperation,
* Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created. * Note 2 : Given there is an overlap of source and destination in Worker0, Shard1_1 and Shard2_1 need not be created.
*/ */
static void static void
CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement, CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
List *sourceColocatedShardIntervalList, List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList, List *shardGroupSplitIntervalListList,
WorkerNode *sourceWorkerNode, WorkerNode *sourceWorkerNode,
@ -1780,13 +1752,43 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create dummy source shard on the specified placement list */ /* Log resource for cleanup in case of failure only.
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); * Before we log a record, do a best effort check to see if a shard with same name exists.
* This is because, it will cause shard creation to fail and we will end up cleaning the
* old shard. We don't want that.
*/
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
workerPlacementNode);
/* Add dummy source shard entry created for placement node in map */ if (relationExists)
AddDummyShardEntryInMap(mapOfDummyShardToPlacement, {
workerPlacementNode->nodeId, ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
shardInterval); errmsg("Relation %s already exists on worker %s:%d.",
ConstructQualifiedShardName(shardInterval),
workerPlacementNode->workerName,
workerPlacementNode->workerPort)));
}
else
{
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
workerPlacementNode->groupId,
policy);
/* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList,
workerPlacementNode);
/* Add dummy source shard entry created for placement node in map */
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
workerPlacementNode->nodeId,
shardInterval);
}
} }
} }
@ -1815,12 +1817,42 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
/* Create dummy split child shard on source worker node */ /* Log resource for cleanup in case of failure only.
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); * Before we log a record, do a best effort check to see if a shard with same name exists.
* This is because, it will cause shard creation to fail and we will end up cleaning the
* old shard. We don't want that.
*/
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
sourceWorkerNode);
/* Add dummy split child shard entry created on source node */ if (relationExists)
AddDummyShardEntryInMap(mapOfDummyShardToPlacement, sourceWorkerNode->nodeId, {
shardInterval); ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("Relation %s already exists on worker %s:%d.",
ConstructQualifiedShardName(shardInterval),
sourceWorkerNode->workerName,
sourceWorkerNode->workerPort)));
}
else
{
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
* we want to cleanup irrespective of operation success or failure.
*/
CleanupPolicy policy = CLEANUP_ALWAYS;
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
ConstructQualifiedShardName(
shardInterval),
sourceWorkerNode->groupId,
policy);
/* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
/* Add dummy split child shard entry created on source node */
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
sourceWorkerNode->nodeId,
shardInterval);
}
} }
} }
} }
@ -2076,7 +2108,7 @@ ParseReplicationSlotInfoFromResult(PGresult *result)
* of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap. * of logical replication. We cautiously delete only the dummy shards added in the DummyShardHashMap.
*/ */
static void static void
AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId, AddDummyShardEntryInMap(HTAB *mapOfPlacementToDummyShardList, uint32 targetNodeId,
ShardInterval *shardInterval) ShardInterval *shardInterval)
{ {
NodeAndOwner key; NodeAndOwner key;
@ -2085,7 +2117,7 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
bool found = false; bool found = false;
GroupedDummyShards *nodeMappingEntry = GroupedDummyShards *nodeMappingEntry =
(GroupedDummyShards *) hash_search(mapOfDummyShardToPlacement, &key, (GroupedDummyShards *) hash_search(mapOfPlacementToDummyShardList, &key,
HASH_ENTER, HASH_ENTER,
&found); &found);
if (!found) if (!found)
@ -2098,68 +2130,6 @@ AddDummyShardEntryInMap(HTAB *mapOfDummyShardToPlacement, uint32 targetNodeId,
} }
/*
* DropDummyShards traverses the dummy shard map and drops shard at given node.
* It fails if the shard cannot be dropped.
*/
static void
DropDummyShards(HTAB *mapOfDummyShardToPlacement)
{
HASH_SEQ_STATUS status;
hash_seq_init(&status, mapOfDummyShardToPlacement);
GroupedDummyShards *entry = NULL;
while ((entry = (GroupedDummyShards *) hash_seq_search(&status)) != NULL)
{
uint32 nodeId = entry->key.nodeId;
WorkerNode *shardToBeDroppedNode = FindNodeWithNodeId(nodeId,
false /* missingOk */);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connection = GetNodeUserDatabaseConnection(
connectionFlags,
shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
List *dummyShardIntervalList = entry->shardIntervals;
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList)
{
DropDummyShard(connection, shardInterval);
}
CloseConnection(connection);
}
}
/*
* DropDummyShard drops a given shard on the node connection.
* It fails if the shard cannot be dropped.
*/
static void
DropDummyShard(MultiConnection *connection, ShardInterval *shardInterval)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
/*
* Since the dummy shard is expected to be present on the given node,
* fail if it cannot be dropped during cleanup.
*/
ExecuteCriticalRemoteCommand(
connection,
dropShardQuery->data);
}
/* /*
* CreateReplicaIdentitiesForDummyShards creates replica indentities for split * CreateReplicaIdentitiesForDummyShards creates replica indentities for split
* dummy shards. * dummy shards.