Guc for old behaviour

guc_for_replicate_on_activate
Hadi Moshayedi 2020-04-07 14:04:43 -07:00
parent 601c2d6e54
commit 18744fc5f5
5 changed files with 84 additions and 1 deletions

View File

@ -60,6 +60,12 @@ int GroupSize = 1;
/* config variable managed via guc.c */
char *CurrentCluster = "default";
/*
* Config variable to control whether we should replicate reference tables on
* node activation or we should defer it to shard creation.
*/
bool ReplicateReferenceTablesOnActivate = false;
/* did current transaction modify pg_dist_node? */
bool TransactionModifiedNodeMetadata = false;
@ -381,6 +387,12 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
ReplicateAllDependenciesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
if (ReplicateReferenceTablesOnActivate)
{
ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
}
/*
* Let the maintenance daemon do the hard work of syncing the metadata.
* We prefer this because otherwise node activation might fail within

View File

@ -1370,6 +1370,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.replicate_reference_tables_on_activate",
NULL,
NULL,
&ReplicateReferenceTablesOnActivate,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
/* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus");
}

View File

@ -681,3 +681,63 @@ ReferenceTableReplicationFactor(void)
int replicationFactor = list_length(nodeList);
return replicationFactor;
}
/*
* ReplicateAllReferenceTablesToNode function finds all reference tables and
* replicates them to the given worker node. It also modifies pg_dist_colocation
* table to update the replication factor column when necessary. This function
* skips reference tables if that node already has healthy placement of that
* reference table to prevent unnecessary data transfer.
*/
void
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{
List *referenceTableList = ReferenceTableOidList();
/* if there is no reference table, we do not need to replicate anything */
if (list_length(referenceTableList) > 0)
{
List *referenceShardIntervalList = NIL;
/*
* We sort the reference table list to prevent deadlocks in concurrent
* ReplicateAllReferenceTablesToAllNodes calls.
*/
referenceTableList = SortList(referenceTableList, CompareOids);
Oid referenceTableId = InvalidOid;
foreach_oid(referenceTableId, referenceTableList)
{
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
referenceShardIntervalList = lappend(referenceShardIntervalList,
shardInterval);
}
if (ClusterHasKnownMetadataWorkers())
{
BlockWritesToShardList(referenceShardIntervalList);
}
ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, referenceShardIntervalList)
{
uint64 shardId = shardInterval->shardId;
LockShardDistributionMetadata(shardId, ExclusiveLock);
ReplicateShardToNode(shardInterval, nodeName, nodePort);
}
/* create foreign constraints between reference tables */
foreach_ptr(shardInterval, referenceShardIntervalList)
{
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
commandList);
}
}
}

View File

@ -22,6 +22,6 @@ extern uint32 CreateReferenceTableColocationId(void);
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);
#endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -61,6 +61,7 @@ typedef struct WorkerNode
extern int MaxWorkerNodesTracked;
extern char *WorkerListFileName;
extern char *CurrentCluster;
extern bool ReplicateReferenceTablesOnActivate;
/* Function declarations for finding worker nodes to place shards on */