diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 39f746ff8..235a51395 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -79,6 +79,8 @@ static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList); +static bool CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, + WorkerNode *workerNode); static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, @@ -548,6 +550,55 @@ BlockingShardSplit(SplitOperation splitOperation, } +/* Check if a relation with given name already exists on the worker node */ +static bool +CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *workerNode) +{ + char *schemaName = get_namespace_name( + get_rel_namespace(shardInterval->relationId)); + char *shardName = get_rel_name(shardInterval->relationId); + AppendShardIdToName(&shardName, shardInterval->shardId); + + StringInfo checkShardExistsQuery = makeStringInfo(); + appendStringInfo(checkShardExistsQuery, + "SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s');", + schemaName, + shardName); + + int connectionFlags = 0; + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + workerNode->workerName, + workerNode->workerPort, + CitusExtensionOwnerName(), + get_database_name( + MyDatabaseId)); + + PGresult *result = NULL; + int queryResult = ExecuteOptionalRemoteCommand(connection, + checkShardExistsQuery->data, &result); + if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg( + "Cannot check if relation %s already exists for split on worker %s:%d", + ConstructQualifiedShardName(shardInterval), + connection->hostname, + connection->port))); + + PQclear(result); + ForgetResults(connection); + + return false; + } + + char *checkExists = PQgetvalue(result, 0, 0); + PQclear(result); + ForgetResults(connection); + + return strcmp(checkExists, "t") == 0; +} + + /* Create ShardGroup split children on a list of corresponding workers. */ static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, @@ -578,16 +629,35 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, splitShardCreationCommandList, shardInterval->shardId); - /* Log resource for cleanup in case of failure only. */ - CleanupPolicy policy = CLEANUP_ON_FAILURE; - InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName( - shardInterval), - workerPlacementNode->groupId, - policy); + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. + */ + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + workerPlacementNode); - /* Create new split child shard on the specified placement list */ - CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->workerName, + workerPlacementNode->workerPort))); + } + else + { + CleanupPolicy policy = CLEANUP_ON_FAILURE; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + workerPlacementNode->groupId, + policy); + + /* Create new split child shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, + workerPlacementNode); + } } } } @@ -1459,23 +1529,43 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, splitShardCreationCommandList, shardInterval->shardId); - /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, - * we want to cleanup irrespective of operation success or failure. + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. */ - CleanupPolicy policy = CLEANUP_ALWAYS; - InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName( - shardInterval), - workerPlacementNode->groupId, - policy); + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + workerPlacementNode); - /* Create dummy source shard on the specified placement list */ - CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + workerPlacementNode->workerName, + workerPlacementNode->workerPort))); + } + else + { + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + workerPlacementNode->groupId, + policy); - /* Add dummy source shard entry created for placement node in map */ - AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, - workerPlacementNode->nodeId, - shardInterval); + /* Create dummy source shard on the specified placement list */ + CreateObjectOnPlacement(splitShardCreationCommandList, + workerPlacementNode); + + /* Add dummy source shard entry created for placement node in map */ + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, + workerPlacementNode->nodeId, + shardInterval); + } } } @@ -1504,23 +1594,42 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList, splitShardCreationCommandList, shardInterval->shardId); - /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, - * we want to cleanup irrespective of operation success or failure. + /* Log resource for cleanup in case of failure only. + * Before we log a record, do a best effort check to see if a shard with same name exists. + * This is because, it will cause shard creation to fail and we will end up cleaning the + * old shard. We don't want that. */ - CleanupPolicy policy = CLEANUP_ALWAYS; - InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, - ConstructQualifiedShardName( - shardInterval), - sourceWorkerNode->groupId, - policy); + bool relationExists = CheckIfRelationWithSameNameExists(shardInterval, + sourceWorkerNode); - /* Create dummy split child shard on source worker node */ - CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + if (relationExists) + { + ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), + errmsg("Relation %s already exists on worker %s:%d.", + ConstructQualifiedShardName(shardInterval), + sourceWorkerNode->workerName, + sourceWorkerNode->workerPort))); + } + else + { + /* Log shard in pg_dist_cleanup. Given dummy shards are transient resources, + * we want to cleanup irrespective of operation success or failure. + */ + CleanupPolicy policy = CLEANUP_ALWAYS; + InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT, + ConstructQualifiedShardName( + shardInterval), + sourceWorkerNode->groupId, + policy); - /* Add dummy split child shard entry created on source node */ - AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, - sourceWorkerNode->nodeId, - shardInterval); + /* Create dummy split child shard on source worker node */ + CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); + + /* Add dummy split child shard entry created on source node */ + AddDummyShardEntryInMap(mapOfPlacementToDummyShardList, + sourceWorkerNode->nodeId, + shardInterval); + } } } } diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out index 4ea61e03c..aa3faaf84 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out @@ -72,8 +72,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_1_node], 'block_writes'); -ERROR: relation "sensors_8981002" already exists -CONTEXT: while executing command on localhost:xxxxx +ERROR: Relation citus_split_failure_test_schema.sensors_8981002 already exists on worker localhost:xxxxx. -- BEGIN : Split Shard, which is expected to fail. -- BEGIN : Ensure tables were cleaned from worker \c - - - :worker_1_port