Dependency update

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-23 17:46:37 +03:00
parent 48c5ce8960
commit 6598a23963
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
3 changed files with 14 additions and 56 deletions

View File

@ -405,9 +405,7 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands);
CitusExtensionOwnerName(),
ddlCommands);
} }

View File

@ -863,21 +863,14 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
{ {
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
if (ShouldPropagate() && !NodeIsCoordinator(newWorkerNode)) Assert(ShouldPropagate());
if (!NodeIsCoordinator(newWorkerNode))
{ {
ClearDistributedObjectsWithMetadataFromNode(newWorkerNode);
PropagateNodeWideObjects(newWorkerNode); PropagateNodeWideObjects(newWorkerNode);
ReplicateAllDependenciesToNode(newWorkerNode->workerName, ReplicateAllDependenciesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort); newWorkerNode->workerPort);
} }
else if (!NodeIsCoordinator(newWorkerNode))
{
ereport(WARNING, (errmsg("citus.enable_object_propagation is off, not "
"creating distributed objects on worker"),
errdetail("distributed objects are only kept in sync when "
"citus.enable_object_propagation is set to on. "
"Newly activated nodes will not get these "
"objects created")));
}
if (ReplicateReferenceTablesOnActivate) if (ReplicateReferenceTablesOnActivate)
{ {
@ -890,12 +883,14 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
* We prefer this because otherwise node activation might fail within * We prefer this because otherwise node activation might fail within
* transaction blocks. * transaction blocks.
*/ */
if (ClusterHasDistributedFunctionWithDistArgument()) // TODO: Doesn't make sense to have that here as we won't handle placement metadata
// with maintenance daemon anymore
/* if (ClusterHasDistributedFunctionWithDistArgument())
{ {
SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata, SetWorkerColumnLocalOnly(newWorkerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true)); BoolGetDatum(true));
TriggerMetadataSyncOnCommit(); TriggerMetadataSyncOnCommit();
} }*/
} }
} }
@ -1180,33 +1175,13 @@ ActivateNode(char *nodeName, int nodePort)
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete); forceRemoteDelete);
/* SetUpDistributedTableWithDependencies(workerNode);
* Since coordinator node already has both objects and related metadata
* we don't need to recreate them.
*/
if (NodeIsPrimary(workerNode))
{
if (workerNode->groupId != COORDINATOR_GROUP_ID)
{
/* TODO: Consider calling function below according to other states like primary/secondary */
/* Should we check syncMetadata always on as well? */
ClearDistributedObjectsWithMetadataFromNode(workerNode);
SetUpDistributedTableWithDependencies(workerNode);
}
else if (ReplicateReferenceTablesOnActivate)
{
// We only need to replicate reference table to the coordinator node
ReplicateAllReferenceTablesToNode(workerNode->workerName,
workerNode->workerPort);
}
}
if (syncMetadata) if (syncMetadata)
{ {
StartMetadataSyncToNode(nodeName, nodePort); StartMetadataSyncToNode(nodeName, nodePort);
if (workerNode->groupId != COORDINATOR_GROUP_ID) if (!NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
{ {
SetUpMultipleDistributedTableIntegrations(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode); SetUpObjectMetadata(workerNode);

View File

@ -329,8 +329,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
/* /*
* ReplicateShardToNode function replicates given shard to the given worker node * ReplicateShardToNode function replicates given shard to the given worker node
* in a separate transaction. If the worker already has * in a separate transaction. If the worker already has
* a replica of the shard this is a no-op. This function also modifies metadata * a replica of the shard this is a no-op.
* by inserting/updating related rows in pg_dist_placement.
* *
* IMPORTANT: This should only be used to replicate shards of a reference * IMPORTANT: This should only be used to replicate shards of a reference
* table. * table.
@ -371,17 +370,13 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
nodePort))); nodePort)));
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, tableOwner, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
ddlCommandList); ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort); int32 groupId = GroupForNode(nodeName, nodePort);
uint64 placementId = GetNextPlacementId(); uint64 placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0, InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, 0,
groupId); groupId);
// Since having a duplicate on pg_dist_placement can cause issue, we don't add
// it to all nodes here. Caller of this function must propagate pg_dist_placement to
// other nodes if it is required.
} }
@ -544,6 +539,8 @@ ReferenceTableReplicationFactor(void)
* table to update the replication factor column when necessary. This function * table to update the replication factor column when necessary. This function
* skips reference tables if that node already has healthy placement of that * skips reference tables if that node already has healthy placement of that
* reference table to prevent unnecessary data transfer. * reference table to prevent unnecessary data transfer.
*
* TODO: Make is static and updatr comment
*/ */
void void
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
@ -584,17 +581,5 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
ReplicateShardToNode(shardInterval, nodeName, nodePort); ReplicateShardToNode(shardInterval, nodeName, nodePort);
} }
/* create foreign constraints between reference tables */
foreach_ptr(shardInterval, referenceShardIntervalList)
{
char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName,
nodePort,
tableOwner,
commandList);
}
} }
} }