From 1c16060bd6f147a82c9787394654e5b384dbbcf0 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 18 Jul 2022 00:47:42 -0700 Subject: [PATCH] Only clean shards created by workflow --- .../distributed/operations/shard_split.c | 183 +++++++++++++----- ...us_split_shard_by_split_points_failure.out | 17 +- ...us_split_shard_by_split_points_failure.sql | 6 +- 3 files changed, 145 insertions(+), 61 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index f5c015e62..93231797d 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "common/hashfn.h" #include "nodes/pg_list.h" #include "utils/array.h" #include "distributed/utils/array_type.h" @@ -35,16 +36,29 @@ #include "distributed/multi_physical_planner.h" #include "distributed/deparse_shard_query.h" +/* + * Entry for map that tracks ShardInterval -> Placement Node + * created by split workflow. + */ +typedef struct ShardCreatedByWorkflowEntry +{ + ShardInterval *shardIntervalKey; + WorkerNode *workerNodeValue; +} ShardCreatedByWorkflowEntry; + /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); -static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, +static void CreateAndCopySplitShardsForShardGroup( + HTAB *mapOfShardToPlacementCreatedByWorkflow, + WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); +static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); @@ -70,8 +84,8 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListLi List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); +static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); +static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); /* Customize error message strings based on operation type */ @@ -399,6 +413,70 @@ SplitShard(SplitMode splitMode, } +/* + * ShardIntervalHashCode computes the hash code for a shard from the + * placement's shard id. + */ +static uint32 +ShardIntervalHashCode(const void *key, Size keySize) +{ + const ShardInterval *shardInterval = (const ShardInterval *) key; + const uint64 *shardId = &(shardInterval->shardId); + + /* standard hash function outlined in Effective Java, Item 8 */ + uint32 result = 17; + result = 37 * result + tag_hash(shardId, sizeof(uint64)); + + return result; +} + + +/* + * ShardIntervalHashCompare compares two shard intervals using shard id. + */ +static int +ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize) +{ + const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey; + const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey; + + int shardIdCompare = 0; + + /* first, compare by shard id */ + if (intervalLhs->shardId < intervalRhs->shardId) + { + shardIdCompare = -1; + } + else if (intervalLhs->shardId > intervalRhs->shardId) + { + shardIdCompare = 1; + } + + return shardIdCompare; +} + + +/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */ +static HTAB * +CreateEmptyMapForShardsCreatedByWorkflow() +{ + HASHCTL info = { 0 }; + info.keysize = sizeof(ShardInterval); + info.entrysize = sizeof(ShardCreatedByWorkflowEntry); + info.hash = ShardIntervalHashCode; + info.match = ShardIntervalHashCompare; + info.hcxt = CurrentMemoryContext; + + /* we don't have value field as it's a set */ + info.entrysize = info.keysize; + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map", + 32, &info, hashFlags); + return splitChildrenCreatedByWorkflow; +} + + /* * SplitShard API to split a given shard (or shard group) in blocking fashion * based on specified split points to a set of destination nodes. @@ -431,6 +509,9 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); + + HTAB *mapOfShardToPlacementCreatedByWorkflow = + CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); { /* @@ -439,6 +520,7 @@ BlockingShardSplit(SplitOperation splitOperation, * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). */ CreateAndCopySplitShardsForShardGroup( + mapOfShardToPlacementCreatedByWorkflow, sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, @@ -466,8 +548,7 @@ BlockingShardSplit(SplitOperation splitOperation, PG_CATCH(); { /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, - workersForPlacementList); + TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); PG_RE_THROW(); } @@ -479,7 +560,8 @@ BlockingShardSplit(SplitOperation splitOperation, /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, +CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* @@ -509,6 +591,14 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + + ShardCreatedByWorkflowEntry entry; + entry.shardIntervalKey = shardInterval; + entry.workerNodeValue = workerPlacementNode; + bool found = false; + hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER, + &found); + Assert(!found); } } } @@ -591,12 +681,14 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, * on a list of corresponding workers. */ static void -CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, +CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, + shardGroupSplitIntervalListList, workersForPlacementList); DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, @@ -986,49 +1078,40 @@ DropShardList(List *shardIntervalList) * coordinator and mx nodes. */ static void -TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, - List *workersForPlacementList) +TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) { - List *shardIntervalList = NIL; + HASH_SEQ_STATUS status; + ShardCreatedByWorkflowEntry *entry; - /* - * Iterate over all the shards in the shard group. - */ - foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow); + while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0) { - ShardInterval *shardInterval = NULL; - WorkerNode *workerPlacementNode = NULL; + ShardInterval *shardInterval = entry->shardIntervalKey; + WorkerNode *workerPlacementNode = entry->workerNodeValue; + + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + StringInfo dropShardQuery = makeStringInfo(); + + /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ + appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + + int connectionFlags = FOR_DDL; + connectionFlags |= OUTSIDE_TRANSACTION; + MultiConnection *connnection = GetNodeUserDatabaseConnection( + connectionFlags, + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + CurrentUserName(), + NULL /* databaseName */); /* - * Iterate on split shards list for a given shard and perform drop. + * Perform a drop in best effort manner. + * The shard may or may not exist and the connection could have died. */ - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) - { - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connnection = GetNodeUserDatabaseConnection( - connectionFlags, - workerPlacementNode->workerName, - workerPlacementNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. - */ - ExecuteOptionalRemoteCommand( - connnection, - dropShardQuery->data, - NULL /* pgResult */); - } + ExecuteOptionalRemoteCommand( + connnection, + dropShardQuery->data, + NULL /* pgResult */); } } diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out index e6c3e8e66..4ea61e03c 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out @@ -26,9 +26,8 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with: -- BEGIN : Switch to worker and create split shards already so workflow fails. \c - - - :worker_1_port SET search_path TO "citus_split_failure_test_schema"; -CREATE TABLE sensors_8981001( - measureid integer, - eventdatetime date); +-- Don't create sensors_8981001, workflow will create and clean it. +-- Create rest of the shards so that the workflow fails, but will not clean them. CREATE TABLE sensors_8981002( measureid integer, eventdatetime date); @@ -53,14 +52,13 @@ SELECT tbl.relname --------------------------------------------------------------------- sensors sensors_890000 - sensors_8981001 sensors_8981002 sensors_colocated sensors_colocated_890001 sensors_colocated_8981003 sensors_colocated_8981004 sensors_nodelete -(9 rows) +(8 rows) -- END : Switch to worker and create split shards already so workflow fails. -- BEGIN : Set node id variables @@ -74,7 +72,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_1_node], 'block_writes'); -ERROR: relation "sensors_8981001" already exists +ERROR: relation "sensors_8981002" already exists CONTEXT: while executing command on localhost:xxxxx -- BEGIN : Split Shard, which is expected to fail. -- BEGIN : Ensure tables were cleaned from worker @@ -85,14 +83,17 @@ SELECT tbl.relname FROM pg_catalog.pg_class tbl WHERE tbl.relname like 'sensors%' ORDER BY 1; - relname + relname --------------------------------------------------------------------- sensors sensors_890000 + sensors_8981002 sensors_colocated sensors_colocated_890001 + sensors_colocated_8981003 + sensors_colocated_8981004 sensors_nodelete -(5 rows) +(8 rows) -- END : Ensure tables were cleaned from worker --BEGIN : Cleanup diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql index 12d79b74b..0eb5e8c04 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql @@ -21,9 +21,9 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with: -- BEGIN : Switch to worker and create split shards already so workflow fails. \c - - - :worker_1_port SET search_path TO "citus_split_failure_test_schema"; -CREATE TABLE sensors_8981001( - measureid integer, - eventdatetime date); + +-- Don't create sensors_8981001, workflow will create and clean it. +-- Create rest of the shards so that the workflow fails, but will not clean them. CREATE TABLE sensors_8981002( measureid integer,