From bf6b12e351b9839af710731aa5db30b3b54ed8d9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 8 Mar 2020 00:49:06 +0100 Subject: [PATCH] Defer reference table replication to shard creation time --- .../commands/create_distributed_table.c | 6 + .../distributed/master/master_create_shards.c | 2 + .../distributed/master/master_repair_shards.c | 14 ++ .../distributed/metadata/node_metadata.c | 2 - .../distributed/sql/citus--9.2-4--9.3-2.sql | 1 + .../udfs/replicate_reference_tables/9.3-1.sql | 7 + .../replicate_reference_tables/latest.sql | 7 + .../distributed/utils/reference_table_utils.c | 233 +++++++++++++----- src/backend/distributed/utils/resource_lock.c | 30 +++ .../distributed/reference_table_utils.h | 3 +- src/include/distributed/resource_lock.h | 4 + .../expected/propagate_extension_commands.out | 2 - 12 files changed, 246 insertions(+), 65 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/replicate_reference_tables/9.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index aaa95af78..28d6795f5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -346,6 +346,12 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, colocationId, replicationModel, viaDeprecatedAPI); + /* + * Make sure that existing reference tables have been replicated to all the nodes + * such that we can create foreign keys and joins work immediately after creation. + */ + EnsureReferenceTablesExistOnAllNodes(); + /* we need to calculate these variables before creating distributed metadata */ bool localTableEmpty = LocalTableEmpty(relationId); Oid colocatedTableId = ColocatedTableId(colocationId); diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 128957c18..ce2239c5a 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -86,6 +86,8 @@ master_create_worker_shards(PG_FUNCTION_ARGS) ObjectAddressSet(tableAddress, RelationRelationId, distributedTableId); EnsureDependenciesExistOnAllNodes(&tableAddress); + EnsureReferenceTablesExistOnAllNodes(); + CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor, useExclusiveConnections); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index ed6f3f897..c3b24f50f 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -28,6 +28,7 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" +#include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -449,6 +450,19 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, targetNodeName, targetNodePort); } + if (!IsReferenceTable(distributedTableId)) + { + /* + * When copying a shard to a new node, we should first ensure that reference + * tables are present such that joins work immediately after copying the shard. + * When copying a reference table, we are probably trying to achieve just that. + * + * Since this a long-running operation we do this after the error checks, but + * before taking metadata locks. + */ + EnsureReferenceTablesExistOnAllNodes(); + } + /* * CopyColocatedShardPlacement function copies given shard with its co-located * shards. diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 2b869d4f9..798dad789 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -371,8 +371,6 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) EnsureNoModificationsHaveBeenDone(); ReplicateAllDependenciesToNode(newWorkerNode->workerName, newWorkerNode->workerPort); - ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, - newWorkerNode->workerPort); /* * Let the maintenance daemon do the hard work of syncing the metadata. diff --git a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql index 0f763f4d7..25ba9cba0 100644 --- a/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql +++ b/src/backend/distributed/sql/citus--9.2-4--9.3-2.sql @@ -4,3 +4,4 @@ #include "udfs/citus_extradata_container/9.3-2.sql" #include "udfs/update_distributed_table_colocation/9.3-2.sql" +#include "udfs/replicate_reference_tables/9.3-1.sql" diff --git a/src/backend/distributed/sql/udfs/replicate_reference_tables/9.3-1.sql b/src/backend/distributed/sql/udfs/replicate_reference_tables/9.3-1.sql new file mode 100644 index 000000000..556899eb2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/replicate_reference_tables/9.3-1.sql @@ -0,0 +1,7 @@ +CREATE FUNCTION pg_catalog.replicate_reference_tables() + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$replicate_reference_tables$$; +COMMENT ON FUNCTION pg_catalog.replicate_reference_tables() + IS 'replicate reference tables to all nodes'; +REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql b/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql new file mode 100644 index 000000000..556899eb2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql @@ -0,0 +1,7 @@ +CREATE FUNCTION pg_catalog.replicate_reference_tables() + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$replicate_reference_tables$$; +COMMENT ON FUNCTION pg_catalog.replicate_reference_tables() + IS 'replicate reference tables to all nodes'; +REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC; diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index ebf5350dd..8430a9e76 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -24,18 +24,24 @@ #include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "postmaster/postmaster.h" #include "storage/lmgr.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" /* local function forward declarations */ +static List * WorkersWithoutReferenceTablePlacement(uint64 shardId); +static void CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement, + WorkerNode *newWorkerNode); static void ReplicateSingleShardTableToAllNodes(Oid relationId); static void ReplicateShardToAllNodes(ShardInterval *shardInterval); static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, @@ -44,6 +50,173 @@ static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); +PG_FUNCTION_INFO_V1(replicate_reference_tables); + + +/* + * IsReferenceTable returns whether the given relation ID identifies a reference + * table. + */ +bool +IsReferenceTable(Oid relationId) +{ + CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); + + if (!tableEntry->isCitusTable) + { + return false; + } + + if (tableEntry->partitionMethod != DISTRIBUTE_BY_NONE) + { + return false; + } + + return true; +} + + +/* + * replicate_reference_tables is a UDF to ensure that allreference tables are + * replicated to all nodes. + */ +Datum +replicate_reference_tables(PG_FUNCTION_ARGS) +{ + EnsureReferenceTablesExistOnAllNodes(); + + PG_RETURN_VOID(); +} + + +/* + * EnsureReferenceTablesExistOnAllNodes ensures that a shard placement for every + * reference table exists on all nodes. If a node does not have a set of shard + * placements, then master_copy_shard_placement is called in a subtransaction + * to pull the data to the new node. + */ +void +EnsureReferenceTablesExistOnAllNodes(void) +{ + List *referenceTableIdList = ReferenceTableOidList(); + if (list_length(referenceTableIdList) == 0) + { + /* no reference tables exist */ + return; + } + + Oid referenceTableId = linitial_oid(referenceTableIdList); + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + if (list_length(shardIntervalList) == 0) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard", + get_rel_name(referenceTableId)))); + } + + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + uint64 shardId = shardInterval->shardId; + + /* prevent this funcion from running concurrently with itself */ + int colocationId = TableColocationId(referenceTableId); + LockColocationId(colocationId, ExclusiveLock); + + List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId); + if (list_length(newWorkersList) == 0) + { + /* nothing to do, no need for lock */ + UnlockColocationId(colocationId, ExclusiveLock); + return; + } + + /* TODO: ensure reference tables have not been modified in this transaction */ + + bool missingOk = false; + ShardPlacement *sourceShardPlacement = ActiveShardPlacement(shardId, missingOk); + if (sourceShardPlacement == NULL) + { + /* check for corrupt metadata */ + ereport(ERROR, (errmsg("reference table shard " UINT64_FORMAT " does not " + "have an active shard placement", + shardId))); + } + + WorkerNode *newWorkerNode = NULL; + foreach_ptr(newWorkerNode, newWorkersList) + { + CopyShardPlacementToNewWorkerNode(sourceShardPlacement, newWorkerNode); + } + + /* + * Unblock other backends, they will probably observe that there are no + * more worker nodes without placements, unless nodes were added concurrently + */ + UnlockColocationId(colocationId, ExclusiveLock); +} + + +/* + * WorkersWithoutReferenceTablePlacement returns a list of workers (WorkerNode) that + * do not yet have a placement for the given reference table shard ID, but are + * supposed to. + */ +static List * +WorkersWithoutReferenceTablePlacement(uint64 shardId) +{ + List *workersWithoutPlacements = NIL; + + List *shardPlacementList = ActiveShardPlacementList(shardId); + + /* we only take an access share lock, otherwise we'll hold up master_add_node */ + List *workerNodeList = ReferenceTablePlacementNodeList(AccessShareLock); + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) + { + char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + bool missingWorkerOk = true; + ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, + nodeName, nodePort, + missingWorkerOk); + if (targetPlacement == NULL) + { + workersWithoutPlacements = lappend(workersWithoutPlacements, workerNode); + } + } + + return workersWithoutPlacements; +} + + +/* + * CopyShardPlacementToNewWorkerNode runs master_copy_shard_placement in a + * subtransaction by connecting to localhost. + */ +static void +CopyShardPlacementToNewWorkerNode(ShardPlacement *sourceShardPlacement, + WorkerNode *newWorkerNode) +{ + int connectionFlags = OUTSIDE_TRANSACTION; + StringInfo queryString = makeStringInfo(); + const char *userName = CitusExtensionOwnerName(); + + MultiConnection *connection = GetNodeUserDatabaseConnection( + connectionFlags, "localhost", PostPortNumber, + userName, NULL); + + appendStringInfo(queryString, + "SELECT master_copy_shard_placement(" + UINT64_FORMAT ", %s, %d, %s, %d, do_repair := false)", + sourceShardPlacement->shardId, + quote_literal_cstr(sourceShardPlacement->nodeName), + sourceShardPlacement->nodePort, + quote_literal_cstr(newWorkerNode->workerName), + newWorkerNode->workerPort); + + ExecuteCriticalRemoteCommand(connection, queryString->data); +} /* @@ -110,66 +283,6 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) } -/* - * ReplicateAllReferenceTablesToNode function finds all reference tables and - * replicates them to the given worker node. It also modifies pg_dist_colocation - * table to update the replication factor column when necessary. This function - * skips reference tables if that node already has healthy placement of that - * reference table to prevent unnecessary data transfer. - */ -void -ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) -{ - List *referenceTableList = ReferenceTableOidList(); - - /* if there is no reference table, we do not need to replicate anything */ - if (list_length(referenceTableList) > 0) - { - List *referenceShardIntervalList = NIL; - - /* - * We sort the reference table list to prevent deadlocks in concurrent - * ReplicateAllReferenceTablesToAllNodes calls. - */ - referenceTableList = SortList(referenceTableList, CompareOids); - Oid referenceTableId = InvalidOid; - foreach_oid(referenceTableId, referenceTableList) - { - List *shardIntervalList = LoadShardIntervalList(referenceTableId); - ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); - - referenceShardIntervalList = lappend(referenceShardIntervalList, - shardInterval); - } - - if (ClusterHasKnownMetadataWorkers()) - { - BlockWritesToShardList(referenceShardIntervalList); - } - - ShardInterval *shardInterval = NULL; - foreach_ptr(shardInterval, referenceShardIntervalList) - { - uint64 shardId = shardInterval->shardId; - - LockShardDistributionMetadata(shardId, ExclusiveLock); - - ReplicateShardToNode(shardInterval, nodeName, nodePort); - } - - /* create foreign constraints between reference tables */ - foreach_ptr(shardInterval, referenceShardIntervalList) - { - char *tableOwner = TableOwner(shardInterval->relationId); - List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - commandList); - } - } -} - - /* * ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates * it to all worker nodes, and the coordinator if it has been added by the user diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 71f5221c5..5bdf2e570 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -319,6 +319,36 @@ IntToLockMode(int mode) } +/* + * LockColocationId returns after acquiring a co-location ID lock, typically used + * for rebalancing and replication. + */ +void +LockColocationId(int colocationId, LOCKMODE lockMode) +{ + LOCKTAG tag; + const bool sessionLock = false; + const bool dontWait = false; + + SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId); + (void) LockAcquire(&tag, lockMode, sessionLock, dontWait); +} + + +/* + * UnlockColocationId releases a co-location ID lock. + */ +void +UnlockColocationId(int colocationId, LOCKMODE lockMode) +{ + LOCKTAG tag; + const bool sessionLock = false; + + SET_LOCKTAG_REBALANCE_COLOCATION(tag, (int64) colocationId); + LockRelease(&tag, lockMode, sessionLock); +} + + /* * LockShardDistributionMetadata returns after grabbing a lock for distribution * metadata related to the specified shard, blocking if required. Any locks diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 6bf552511..073aa731a 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -16,8 +16,9 @@ #include "listutils.h" +extern bool IsReferenceTable(Oid relationId); +extern void EnsureReferenceTablesExistOnAllNodes(void); extern uint32 CreateReferenceTableColocationId(void); -extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); extern List * ReferenceTableOidList(void); extern int CompareOids(const void *leftElement, const void *rightElement); diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 25f50f0c8..bb1e6ccdd 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -102,6 +102,10 @@ extern void UnlockShardResource(uint64 shardId, LOCKMODE lockmode); extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); +/* Lock a co-location group */ +extern void LockColocationId(int colocationId, LOCKMODE lockMode); +extern void UnlockColocationId(int colocationId, LOCKMODE lockMode); + /* Lock multiple shards for safe modification */ extern void LockShardListMetadata(List *shardIntervalList, LOCKMODE lockMode); extern void LockShardsInPlacementListMetadata(List *shardPlacementList, diff --git a/src/test/regress/expected/propagate_extension_commands.out b/src/test/regress/expected/propagate_extension_commands.out index de6ded967..8138a6e32 100644 --- a/src/test/regress/expected/propagate_extension_commands.out +++ b/src/test/regress/expected/propagate_extension_commands.out @@ -247,7 +247,6 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam -- and add the other node SELECT 1 from master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "ref_table_2" to the node localhost:xxxxx ?column? --------------------------------------------------------------------- 1 @@ -443,7 +442,6 @@ BEGIN; COMMIT; -- add the node back SELECT 1 from master_add_node('localhost', :worker_2_port); -NOTICE: Replicating reference table "t3" to the node localhost:xxxxx ?column? --------------------------------------------------------------------- 1