mirror of https://github.com/citusdata/citus.git
Change split logic to avoid EnsureReferenceTablesExistOnAllNodesExtended (#6208)
Co-authored-by: Marco Slot <marco.slot@gmail.com>pull/6291/head
parent
bd13836648
commit
e6b1845931
|
@ -501,6 +501,8 @@ SplitShard(SplitMode splitMode,
|
||||||
|
|
||||||
List *workersForPlacementList = GetWorkerNodesFromWorkerIds(nodeIdsForPlacementList);
|
List *workersForPlacementList = GetWorkerNodesFromWorkerIds(nodeIdsForPlacementList);
|
||||||
|
|
||||||
|
ErrorIfNotAllNodesHaveReferenceTableReplicas(workersForPlacementList);
|
||||||
|
|
||||||
List *sourceColocatedShardIntervalList = NIL;
|
List *sourceColocatedShardIntervalList = NIL;
|
||||||
if (colocatedShardIntervalList == NIL)
|
if (colocatedShardIntervalList == NIL)
|
||||||
{
|
{
|
||||||
|
@ -517,7 +519,6 @@ SplitShard(SplitMode splitMode,
|
||||||
|
|
||||||
if (splitMode == BLOCKING_SPLIT)
|
if (splitMode == BLOCKING_SPLIT)
|
||||||
{
|
{
|
||||||
EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES);
|
|
||||||
BlockingShardSplit(
|
BlockingShardSplit(
|
||||||
splitOperation,
|
splitOperation,
|
||||||
splitWorkflowId,
|
splitWorkflowId,
|
||||||
|
|
|
@ -50,6 +50,7 @@ static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval,
|
||||||
int nodePort);
|
int nodePort);
|
||||||
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
|
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
|
||||||
static List * ReplicatedMetadataSyncedDistributedTableList(void);
|
static List * ReplicatedMetadataSyncedDistributedTableList(void);
|
||||||
|
static bool NodeHasAllReferenceTableReplicas(WorkerNode *workerNode);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
|
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
|
||||||
|
@ -688,3 +689,81 @@ ReplicateAllReferenceTablesToNode(WorkerNode *workerNode)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfNotAllNodesHaveReferenceTableReplicas throws an error when one of the
|
||||||
|
* nodes in the list does not have reference table replicas.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList)
|
||||||
|
{
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
|
||||||
|
foreach_ptr(workerNode, workerNodeList)
|
||||||
|
{
|
||||||
|
if (!NodeHasAllReferenceTableReplicas(workerNode))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("reference tables have not been replicated to "
|
||||||
|
"node %s:%d yet",
|
||||||
|
workerNode->workerName,
|
||||||
|
workerNode->workerPort),
|
||||||
|
errdetail("Reference tables are lazily replicated after "
|
||||||
|
"adding a node, but must exist before shards can "
|
||||||
|
"be created on that node."),
|
||||||
|
errhint("Run SELECT replicate_reference_tables(); to "
|
||||||
|
"ensure reference tables exist on all nodes.")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* NodeHasAllReferenceTablesReplicas returns whether the given worker node has reference
|
||||||
|
* table replicas. If there are no reference tables the function returns true.
|
||||||
|
*
|
||||||
|
* This function does not do any locking, so the situation could change immediately after,
|
||||||
|
* though we can only ever transition from false to true, so only "false" could be the
|
||||||
|
* incorrect answer.
|
||||||
|
*
|
||||||
|
* In the case where the function returns true because no reference tables exist
|
||||||
|
* on the node, a reference table could be created immediately after. However, the
|
||||||
|
* creation logic guarantees that this reference table will be created on all the
|
||||||
|
* nodes, so our answer was correct.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
NodeHasAllReferenceTableReplicas(WorkerNode *workerNode)
|
||||||
|
{
|
||||||
|
List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||||
|
|
||||||
|
if (list_length(referenceTableIdList) == 0)
|
||||||
|
{
|
||||||
|
/* no reference tables exist */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid referenceTableId = linitial_oid(referenceTableIdList);
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
||||||
|
if (list_length(shardIntervalList) != 1)
|
||||||
|
{
|
||||||
|
/* check for corrupt metadata */
|
||||||
|
ereport(ERROR, (errmsg("reference table \"%s\" can only have 1 shard",
|
||||||
|
get_rel_name(referenceTableId))));
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||||
|
List *shardPlacementList = ActiveShardPlacementList(shardInterval->shardId);
|
||||||
|
|
||||||
|
ShardPlacement *placement = NULL;
|
||||||
|
foreach_ptr(placement, shardPlacementList)
|
||||||
|
{
|
||||||
|
if (placement->groupId == workerNode->groupId)
|
||||||
|
{
|
||||||
|
/* our worker has a reference table placement */
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
|
@ -26,5 +26,6 @@ extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
|
||||||
bool localOnly);
|
bool localOnly);
|
||||||
extern int CompareOids(const void *leftElement, const void *rightElement);
|
extern int CompareOids(const void *leftElement, const void *rightElement);
|
||||||
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
|
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
|
||||||
|
extern void ErrorIfNotAllNodesHaveReferenceTableReplicas(List *workerNodeList);
|
||||||
|
|
||||||
#endif /* REFERENCE_TABLE_UTILS_H_ */
|
#endif /* REFERENCE_TABLE_UTILS_H_ */
|
||||||
|
|
|
@ -1150,6 +1150,38 @@ CREATE TABLE test (x int, y int references ref(a));
|
||||||
SELECT create_distributed_table('test','x');
|
SELECT create_distributed_table('test','x');
|
||||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
END;
|
END;
|
||||||
|
-- verify the split fails if we still need to replicate reference tables
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
citus_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus_add_node('localhost', :worker_2_port);
|
||||||
|
citus_add_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1370022
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT
|
||||||
|
citus_split_shard_by_split_points(shardid,
|
||||||
|
ARRAY[(shardminvalue::int + (shardmaxvalue::int - shardminvalue::int)/2)::text],
|
||||||
|
ARRAY[nodeid, nodeid],
|
||||||
|
'force_logical')
|
||||||
|
FROM
|
||||||
|
pg_dist_shard, pg_dist_node
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'replicate_reference_table.test'::regclass AND nodename = 'localhost' AND nodeport = :worker_2_port
|
||||||
|
ORDER BY shardid LIMIT 1;
|
||||||
|
ERROR: reference tables have not been replicated to node localhost:xxxxx yet
|
||||||
|
DETAIL: Reference tables are lazily replicated after adding a node, but must exist before shards can be created on that node.
|
||||||
|
HINT: Run SELECT replicate_reference_tables(); to ensure reference tables exist on all nodes.
|
||||||
-- test adding an invalid node while we have reference tables to replicate
|
-- test adding an invalid node while we have reference tables to replicate
|
||||||
-- set client message level to ERROR and verbosity to terse to supporess
|
-- set client message level to ERROR and verbosity to terse to supporess
|
||||||
-- OS-dependent host name resolution warnings
|
-- OS-dependent host name resolution warnings
|
||||||
|
|
|
@ -700,6 +700,21 @@ CREATE TABLE test (x int, y int references ref(a));
|
||||||
SELECT create_distributed_table('test','x');
|
SELECT create_distributed_table('test','x');
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
-- verify the split fails if we still need to replicate reference tables
|
||||||
|
SELECT citus_remove_node('localhost', :worker_2_port);
|
||||||
|
SELECT create_distributed_table('test','x');
|
||||||
|
SELECT citus_add_node('localhost', :worker_2_port);
|
||||||
|
SELECT
|
||||||
|
citus_split_shard_by_split_points(shardid,
|
||||||
|
ARRAY[(shardminvalue::int + (shardmaxvalue::int - shardminvalue::int)/2)::text],
|
||||||
|
ARRAY[nodeid, nodeid],
|
||||||
|
'force_logical')
|
||||||
|
FROM
|
||||||
|
pg_dist_shard, pg_dist_node
|
||||||
|
WHERE
|
||||||
|
logicalrelid = 'replicate_reference_table.test'::regclass AND nodename = 'localhost' AND nodeport = :worker_2_port
|
||||||
|
ORDER BY shardid LIMIT 1;
|
||||||
|
|
||||||
-- test adding an invalid node while we have reference tables to replicate
|
-- test adding an invalid node while we have reference tables to replicate
|
||||||
-- set client message level to ERROR and verbosity to terse to supporess
|
-- set client message level to ERROR and verbosity to terse to supporess
|
||||||
-- OS-dependent host name resolution warnings
|
-- OS-dependent host name resolution warnings
|
||||||
|
|
Loading…
Reference in New Issue