diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index bc4ad208b..f7116d565 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -501,6 +501,8 @@ SplitShard(SplitMode splitMode, List *workersForPlacementList = GetWorkerNodesFromWorkerIds(nodeIdsForPlacementList); + ErrorIfNotAllNodesHaveReferenceTableReplicas(workersForPlacementList); + List *sourceColocatedShardIntervalList = NIL; if (colocatedShardIntervalList == NIL) { @@ -517,7 +519,6 @@ SplitShard(SplitMode splitMode, if (splitMode == BLOCKING_SPLIT) { - EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); BlockingShardSplit( splitOperation, splitWorkflowId, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 23e608a18..f4b2f4be3 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -50,6 +50,7 @@ static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, int nodePort); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); +static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); @@ -688,3 +689,81 @@ ReplicateAllReferenceTablesToNode(WorkerNode *workerNode) } } } + + +/* + * ErrorIfNotAllNodesHaveReferenceTableReplicas throws an error when one of the + * nodes in the list does not have reference table replicas. + */ +void +ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList) +{ + WorkerNode *workerNode = NULL; + + foreach_ptr(workerNode, workerNodeList) + { + if (!NodeHasAllReferenceTableReplicas(workerNode)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("reference tables have not been replicated to " + "node %s:%d yet", + workerNode->workerName, + workerNode->workerPort), + errdetail("Reference tables are lazily replicated after " + "adding a node, but must exist before shards can " + "be created on that node."), + errhint("Run SELECT replicate_reference_tables(); to " + "ensure reference tables exist on all nodes."))); + } + } +} + + +/* + * NodeHasAllReferenceTablesReplicas returns whether the given worker node has reference + * table replicas. If there are no reference tables the function returns true. + * + * This function does not do any locking, so the situation could change immediately after, + * though we can only ever transition from false to true, so only "false" could be the + * incorrect answer. + * + * In the case where the function returns true because no reference tables exist + * on the node, a reference table could be created immediately after. However, the + * creation logic guarantees that this reference table will be created on all the + * nodes, so our answer was correct. + */ +static bool +NodeHasAllReferenceTableReplicas(WorkerNode *workerNode) +{ + List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); + + if (list_length(referenceTableIdList) == 0) + { + /* no reference tables exist */ + return true; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) != 1) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" can only have 1 shard", + get_rel_name(referenceTableId)))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId); + + ShardPlacement *placement = NULL; + foreach_ptr(placement, shardPlacementList) + { + if (placement->groupId == workerNode->groupId) + { + /* our worker has a reference table placement */ + return true; + } + } + + return false; +} diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index a26f16630..80b282126 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -26,5 +26,6 @@ extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode); +extern void ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 2dd670108..afec8052b 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -1150,6 +1150,38 @@ CREATE TABLE test (x int, y int references ref(a)); SELECT create_distributed_table('test','x'); ERROR: canceling the transaction since it was involved in a distributed deadlock END; +-- verify the split fails if we still need to replicate reference tables +SELECT citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('test','x'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_add_node('localhost', :worker_2_port); + citus_add_node +--------------------------------------------------------------------- + 1370022 +(1 row) + +SELECT + citus_split_shard_by_split_points(shardid, + ARRAY[(shardminvalue::int + (shardmaxvalue::int - shardminvalue::int)/2)::text], + ARRAY[nodeid, nodeid], + 'force_logical') +FROM + pg_dist_shard, pg_dist_node +WHERE + logicalrelid = 'replicate_reference_table.test'::regclass AND nodename = 'localhost' AND nodeport = :worker_2_port +ORDER BY shardid LIMIT 1; +ERROR: reference tables have not been replicated to node localhost:xxxxx yet +DETAIL: Reference tables are lazily replicated after adding a node, but must exist before shards can be created on that node. +HINT: Run SELECT replicate_reference_tables(); to ensure reference tables exist on all nodes. -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index c34007ad7..172d08f35 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -700,6 +700,21 @@ CREATE TABLE test (x int, y int references ref(a)); SELECT create_distributed_table('test','x'); END; +-- verify the split fails if we still need to replicate reference tables +SELECT citus_remove_node('localhost', :worker_2_port); +SELECT create_distributed_table('test','x'); +SELECT citus_add_node('localhost', :worker_2_port); +SELECT + citus_split_shard_by_split_points(shardid, + ARRAY[(shardminvalue::int + (shardmaxvalue::int - shardminvalue::int)/2)::text], + ARRAY[nodeid, nodeid], + 'force_logical') +FROM + pg_dist_shard, pg_dist_node +WHERE + logicalrelid = 'replicate_reference_table.test'::regclass AND nodename = 'localhost' AND nodeport = :worker_2_port +ORDER BY shardid LIMIT 1; + -- test adding an invalid node while we have reference tables to replicate -- set client message level to ERROR and verbosity to terse to supporess -- OS-dependent host name resolution warnings