Only clean shards created by workflow

pull/6029/head
Nitish Upreti 2022-07-18 00:47:42 -07:00
parent b1405d5eaf
commit 1c16060bd6
3 changed files with 145 additions and 61 deletions

View File

@ -10,6 +10,7 @@
*/
#include "postgres.h"
#include "common/hashfn.h"
#include "nodes/pg_list.h"
#include "utils/array.h"
#include "distributed/utils/array_type.h"
@ -35,16 +36,29 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/deparse_shard_query.h"
/*
* Entry for map that tracks ShardInterval -> Placement Node
* created by split workflow.
*/
typedef struct ShardCreatedByWorkflowEntry
{
ShardInterval *shardIntervalKey;
WorkerNode *workerNodeValue;
} ShardCreatedByWorkflowEntry;
/* Function declarations */
static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation,
ShardInterval *shardIntervalToSplit,
List *shardSplitPointsList,
List *nodeIdsForPlacementList);
static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
static void CreateAndCopySplitShardsForShardGroup(
HTAB *mapOfShardToPlacementCreatedByWorkflow,
WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
@ -70,8 +84,8 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListLi
List *workersForPlacementList);
static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow);
static HTAB * CreateEmptyMapForShardsCreatedByWorkflow();
static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
/* Customize error message strings based on operation type */
@ -399,6 +413,70 @@ SplitShard(SplitMode splitMode,
}
/*
* ShardIntervalHashCode computes the hash code for a shard from the
* placement's shard id.
*/
static uint32
ShardIntervalHashCode(const void *key, Size keySize)
{
const ShardInterval *shardInterval = (const ShardInterval *) key;
const uint64 *shardId = &(shardInterval->shardId);
/* standard hash function outlined in Effective Java, Item 8 */
uint32 result = 17;
result = 37 * result + tag_hash(shardId, sizeof(uint64));
return result;
}
/*
* ShardIntervalHashCompare compares two shard intervals using shard id.
*/
static int
ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize)
{
const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey;
const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey;
int shardIdCompare = 0;
/* first, compare by shard id */
if (intervalLhs->shardId < intervalRhs->shardId)
{
shardIdCompare = -1;
}
else if (intervalLhs->shardId > intervalRhs->shardId)
{
shardIdCompare = 1;
}
return shardIdCompare;
}
/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */
static HTAB *
CreateEmptyMapForShardsCreatedByWorkflow()
{
HASHCTL info = { 0 };
info.keysize = sizeof(ShardInterval);
info.entrysize = sizeof(ShardCreatedByWorkflowEntry);
info.hash = ShardIntervalHashCode;
info.match = ShardIntervalHashCompare;
info.hcxt = CurrentMemoryContext;
/* we don't have value field as it's a set */
info.entrysize = info.keysize;
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map",
32, &info, hashFlags);
return splitChildrenCreatedByWorkflow;
}
/*
* SplitShard API to split a given shard (or shard group) in blocking fashion
* based on specified split points to a set of destination nodes.
@ -431,6 +509,9 @@ BlockingShardSplit(SplitOperation splitOperation,
WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId,
false /* missingOk */);
HTAB *mapOfShardToPlacementCreatedByWorkflow =
CreateEmptyMapForShardsCreatedByWorkflow();
PG_TRY();
{
/*
@ -439,6 +520,7 @@ BlockingShardSplit(SplitOperation splitOperation,
* Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints).
*/
CreateAndCopySplitShardsForShardGroup(
mapOfShardToPlacementCreatedByWorkflow,
sourceShardToCopyNode,
sourceColocatedShardIntervalList,
shardGroupSplitIntervalListList,
@ -466,8 +548,7 @@ BlockingShardSplit(SplitOperation splitOperation,
PG_CATCH();
{
/* Do a best effort cleanup of shards created on workers in the above block */
TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList,
workersForPlacementList);
TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow);
PG_RE_THROW();
}
@ -479,7 +560,8 @@ BlockingShardSplit(SplitOperation splitOperation,
/* Create ShardGroup split children on a list of corresponding workers. */
static void
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
/*
@ -509,6 +591,14 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
/* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
ShardCreatedByWorkflowEntry entry;
entry.shardIntervalKey = shardInterval;
entry.workerNodeValue = workerPlacementNode;
bool found = false;
hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER,
&found);
Assert(!found);
}
}
}
@ -591,12 +681,14 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
* on a list of corresponding workers.
*/
static void
CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode,
CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
WorkerNode *sourceShardNode,
List *sourceColocatedShardIntervalList,
List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
{
CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList,
CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow,
shardGroupSplitIntervalListList,
workersForPlacementList);
DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList,
@ -986,49 +1078,40 @@ DropShardList(List *shardIntervalList)
* coordinator and mx nodes.
*/
static void
TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList)
TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow)
{
List *shardIntervalList = NIL;
HASH_SEQ_STATUS status;
ShardCreatedByWorkflowEntry *entry;
/*
* Iterate over all the shards in the shard group.
*/
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow);
while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0)
{
ShardInterval *shardInterval = NULL;
WorkerNode *workerPlacementNode = NULL;
ShardInterval *shardInterval = entry->shardIntervalKey;
WorkerNode *workerPlacementNode = entry->workerNodeValue;
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection(
connectionFlags,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
/*
* Iterate on split shards list for a given shard and perform drop.
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode,
workersForPlacementList)
{
char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
StringInfo dropShardQuery = makeStringInfo();
/* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
int connectionFlags = FOR_DDL;
connectionFlags |= OUTSIDE_TRANSACTION;
MultiConnection *connnection = GetNodeUserDatabaseConnection(
connectionFlags,
workerPlacementNode->workerName,
workerPlacementNode->workerPort,
CurrentUserName(),
NULL /* databaseName */);
/*
* Perform a drop in best effort manner.
* The shard may or may not exist and the connection could have died.
*/
ExecuteOptionalRemoteCommand(
connnection,
dropShardQuery->data,
NULL /* pgResult */);
}
ExecuteOptionalRemoteCommand(
connnection,
dropShardQuery->data,
NULL /* pgResult */);
}
}

View File

@ -26,9 +26,8 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:
-- BEGIN : Switch to worker and create split shards already so workflow fails.
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
CREATE TABLE sensors_8981001(
measureid integer,
eventdatetime date);
-- Don't create sensors_8981001, workflow will create and clean it.
-- Create rest of the shards so that the workflow fails, but will not clean them.
CREATE TABLE sensors_8981002(
measureid integer,
eventdatetime date);
@ -53,14 +52,13 @@ SELECT tbl.relname
---------------------------------------------------------------------
sensors
sensors_890000
sensors_8981001
sensors_8981002
sensors_colocated
sensors_colocated_890001
sensors_colocated_8981003
sensors_colocated_8981004
sensors_nodelete
(9 rows)
(8 rows)
-- END : Switch to worker and create split shards already so workflow fails.
-- BEGIN : Set node id variables
@ -74,7 +72,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_1_node],
'block_writes');
ERROR: relation "sensors_8981001" already exists
ERROR: relation "sensors_8981002" already exists
CONTEXT: while executing command on localhost:xxxxx
-- BEGIN : Split Shard, which is expected to fail.
-- BEGIN : Ensure tables were cleaned from worker
@ -85,14 +83,17 @@ SELECT tbl.relname
FROM pg_catalog.pg_class tbl
WHERE tbl.relname like 'sensors%'
ORDER BY 1;
relname
relname
---------------------------------------------------------------------
sensors
sensors_890000
sensors_8981002
sensors_colocated
sensors_colocated_890001
sensors_colocated_8981003
sensors_colocated_8981004
sensors_nodelete
(5 rows)
(8 rows)
-- END : Ensure tables were cleaned from worker
--BEGIN : Cleanup

View File

@ -21,9 +21,9 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:
-- BEGIN : Switch to worker and create split shards already so workflow fails.
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
CREATE TABLE sensors_8981001(
measureid integer,
eventdatetime date);
-- Don't create sensors_8981001, workflow will create and clean it.
-- Create rest of the shards so that the workflow fails, but will not clean them.
CREATE TABLE sensors_8981002(
measureid integer,