From 6e4862c57fe23846c1d9df2251125f6a8808fc7b Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 2 Sep 2020 15:09:52 +0200 Subject: [PATCH 1/2] expose transfermode for ensure reference table existance --- .../distributed/operations/repair_shards.c | 2 +- .../distributed/utils/reference_table_utils.c | 18 +++++++++++++++++- .../distributed/reference_table_utils.h | 1 + 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 30f9b65c8..5f3379ba2 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -420,7 +420,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, * Since this a long-running operation we do this after the error checks, but * before taking metadata locks. */ - EnsureReferenceTablesExistOnAllNodes(); + EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); } CopyShardTables(colocatedShardList, sourceNodeName, sourceNodePort, diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index e576066ff..be4e3de94 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -77,6 +77,22 @@ replicate_reference_tables(PG_FUNCTION_ARGS) */ void EnsureReferenceTablesExistOnAllNodes(void) +{ + EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_BLOCK_WRITES); +} + + +/* + * EnsureReferenceTablesExistOnAllNodesExtended ensures that a shard placement for every + * reference table exists on all nodes. If a node does not have a set of shard placements, + * then master_copy_shard_placement is called in a subtransaction to pull the data to the + * new node. + * + * The transferMode is passed on to the implementation of the copy to control the locks + * and transferMode. + */ +void +EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) { /* * Prevent this function from running concurrently with itself. @@ -192,7 +208,7 @@ EnsureReferenceTablesExistOnAllNodes(void) StringInfo placementCopyCommand = CopyShardPlacementToWorkerNodeQuery(sourceShardPlacement, newWorkerNode, - TRANSFER_MODE_BLOCK_WRITES); + transferMode); ExecuteCriticalRemoteCommand(connection, placementCopyCommand->data); RemoteTransactionCommit(connection); } diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0ef44c601..34bd0c9ab 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -19,6 +19,7 @@ #include "distributed/metadata_cache.h" extern void EnsureReferenceTablesExistOnAllNodes(void); +extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern uint32 CreateReferenceTableColocationId(void); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); extern int CompareOids(const void *leftElement, const void *rightElement); From bbf42063a7a7e6639700aab0aecbf9a1d3a58812 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Wed, 2 Sep 2020 15:23:00 +0200 Subject: [PATCH 2/2] export LookupShardTransferMode --- src/backend/distributed/operations/repair_shards.c | 3 +-- src/include/distributed/coordinator_protocol.h | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 5f3379ba2..bd83093cd 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -44,7 +44,6 @@ #include "utils/palloc.h" /* local function forward declarations */ -static char LookupShardTransferMode(Oid shardReplicationModeOid); static void ErrorIfTableCannotBeReplicated(Oid relationId); static void RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNodePort, const char *targetNodeName, @@ -226,7 +225,7 @@ ErrorIfTableCannotBeReplicated(Oid relationId) * LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum * values to a char. */ -static char +char LookupShardTransferMode(Oid shardReplicationModeOid) { char shardReplicationMode = 0; diff --git a/src/include/distributed/coordinator_protocol.h b/src/include/distributed/coordinator_protocol.h index ce774acc3..50743a60f 100644 --- a/src/include/distributed/coordinator_protocol.h +++ b/src/include/distributed/coordinator_protocol.h @@ -176,5 +176,6 @@ extern ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, extern ShardPlacement * SearchShardPlacementInListOrError(List *shardPlacementList, const char *nodeName, uint32 nodePort); +extern char LookupShardTransferMode(Oid shardReplicationModeOid); #endif /* COORDINATOR_PROTOCOL_H */