From 18744fc5f535ca23ec07144ffcd5ac520cbc8c13 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 7 Apr 2020 14:04:43 -0700 Subject: [PATCH] Guc for old behaviour --- .../distributed/metadata/node_metadata.c | 12 ++++ src/backend/distributed/shared_library_init.c | 10 ++++ .../distributed/utils/reference_table_utils.c | 60 +++++++++++++++++++ .../distributed/reference_table_utils.h | 2 +- src/include/distributed/worker_manager.h | 1 + 5 files changed, 84 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 4cc9f5d17..60d35484b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -60,6 +60,12 @@ int GroupSize = 1; /* config variable managed via guc.c */ char *CurrentCluster = "default"; +/* + * Config variable to control whether we should replicate reference tables on + * node activation or we should defer it to shard creation. + */ +bool ReplicateReferenceTablesOnActivate = false; + /* did current transaction modify pg_dist_node? */ bool TransactionModifiedNodeMetadata = false; @@ -381,6 +387,12 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) ReplicateAllDependenciesToNode(newWorkerNode->workerName, newWorkerNode->workerPort); + if (ReplicateReferenceTablesOnActivate) + { + ReplicateAllReferenceTablesToNode(newWorkerNode->workerName, + newWorkerNode->workerPort); + } + /* * Let the maintenance daemon do the hard work of syncing the metadata. * We prefer this because otherwise node activation might fail within diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1df2b8eaf..82964ba03 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1370,6 +1370,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.replicate_reference_tables_on_activate", + NULL, + NULL, + &ReplicateReferenceTablesOnActivate, + false, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index b36334e52..da1c1ce71 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -681,3 +681,63 @@ ReferenceTableReplicationFactor(void) int replicationFactor = list_length(nodeList); return replicationFactor; } + + +/* + * ReplicateAllReferenceTablesToNode function finds all reference tables and + * replicates them to the given worker node. It also modifies pg_dist_colocation + * table to update the replication factor column when necessary. This function + * skips reference tables if that node already has healthy placement of that + * reference table to prevent unnecessary data transfer. + */ +void +ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) +{ + List *referenceTableList = ReferenceTableOidList(); + + /* if there is no reference table, we do not need to replicate anything */ + if (list_length(referenceTableList) > 0) + { + List *referenceShardIntervalList = NIL; + + /* + * We sort the reference table list to prevent deadlocks in concurrent + * ReplicateAllReferenceTablesToAllNodes calls. + */ + referenceTableList = SortList(referenceTableList, CompareOids); + Oid referenceTableId = InvalidOid; + foreach_oid(referenceTableId, referenceTableList) + { + List *shardIntervalList = LoadShardIntervalList(referenceTableId); + ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); + + referenceShardIntervalList = lappend(referenceShardIntervalList, + shardInterval); + } + + if (ClusterHasKnownMetadataWorkers()) + { + BlockWritesToShardList(referenceShardIntervalList); + } + + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, referenceShardIntervalList) + { + uint64 shardId = shardInterval->shardId; + + LockShardDistributionMetadata(shardId, ExclusiveLock); + + ReplicateShardToNode(shardInterval, nodeName, nodePort); + } + + /* create foreign constraints between reference tables */ + foreach_ptr(shardInterval, referenceShardIntervalList) + { + char *tableOwner = TableOwner(shardInterval->relationId); + List *commandList = CopyShardForeignConstraintCommandList(shardInterval); + + SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, + commandList); + } + } +} diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 4484e4411..0fac3c41d 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -22,6 +22,6 @@ extern uint32 CreateReferenceTableColocationId(void); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId); extern int CompareOids(const void *leftElement, const void *rightElement); extern int ReferenceTableReplicationFactor(void); - +extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); #endif /* REFERENCE_TABLE_UTILS_H_ */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index f5bb00ba9..336e1eedf 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -61,6 +61,7 @@ typedef struct WorkerNode extern int MaxWorkerNodesTracked; extern char *WorkerListFileName; extern char *CurrentCluster; +extern bool ReplicateReferenceTablesOnActivate; /* Function declarations for finding worker nodes to place shards on */