Citus_disable_node

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-21 17:20:13 +03:00
parent 2e61c3e6b8
commit 9fec89d70b
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
4 changed files with 44 additions and 24 deletions

View File

@ -345,7 +345,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
* via process utility. * via process utility.
*/ */
ExecuteAndLogUtilityCommandList(shellTableDDLEvents); ExecuteAndLogUtilityCommandList(shellTableDDLEvents);
MarkObjectDistributed(&tableAddress);
/* /*
* Set shellRelationId as the relation with relationId now points * Set shellRelationId as the relation with relationId now points
@ -368,6 +367,13 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted); InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted);
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList); FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
/*
* Mark the shell relation as distributed on each node as the last step.
*/
ObjectAddress shellRelationAddress = { 0 };
ObjectAddressSet(shellRelationAddress, RelationRelationId, shellRelationId);
MarkObjectDistributed(&shellRelationAddress);
} }
@ -1240,6 +1246,8 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
CreateTruncateTrigger(relationId); CreateTruncateTrigger(relationId);
} }
CreateShellTableOnWorkers(relationId);
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);

View File

@ -509,16 +509,6 @@ citus_disable_node(PG_FUNCTION_ARGS)
workerNode->workerName, workerNode->workerName,
nodePort))); nodePort)));
} }
/*
* Delete replicated table placements from the coordinator's metadata,
* but not remotely. That is because one more more of the remote
* nodes might be down. Instead, we let the background worker
* to sync the metadata when possible.
*/
bool forceRemoteDelete = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
} }
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
@ -1182,27 +1172,44 @@ ActivateNode(char *nodeName, int nodePort)
BoolGetDatum(isActive)); BoolGetDatum(isActive));
} }
if (syncMetadata) /*
{ * Delete replicated table placements from the coordinator's metadata,
StartMetadataSyncToNode(nodeName, nodePort); * including remote ones.
*/
bool forceRemoteDelete = true;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
/* /*
* Since coordinator node already has both objects and related metadata * Since coordinator node already has both objects and related metadata
* we don't need to recreate them. * we don't need to recreate them.
*/ */
if (NodeIsPrimary(workerNode))
{
if (workerNode->groupId != COORDINATOR_GROUP_ID) if (workerNode->groupId != COORDINATOR_GROUP_ID)
{ {
/* TODO: Consider calling function below according to other states like primary/secondary */ /* TODO: Consider calling function below according to other states like primary/secondary */
/* Should we check syncMetadata always on as well? */ /* Should we check syncMetadata always on as well? */
ClearDistributedObjectsWithMetadataFromNode(workerNode); ClearDistributedObjectsWithMetadataFromNode(workerNode);
SetUpDistributedTableWithDependencies(workerNode); SetUpDistributedTableWithDependencies(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode);
} }
else if (ReplicateReferenceTablesOnActivate) else if (ReplicateReferenceTablesOnActivate)
{ {
// We only need to replicate reference table to the coordinator node
ReplicateAllReferenceTablesToNode(workerNode->workerName, ReplicateAllReferenceTablesToNode(workerNode->workerName,
workerNode->workerPort); workerNode->workerPort);
}
}
if (syncMetadata)
{
StartMetadataSyncToNode(nodeName, nodePort);
if (workerNode->groupId != COORDINATOR_GROUP_ID)
{
SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode);
} }
} }

View File

@ -371,7 +371,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
nodePort))); nodePort)));
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, tableOwner,
ddlCommandList); ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort); int32 groupId = GroupForNode(nodeName, nodePort);
@ -596,8 +596,10 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval); List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName,
commandList); nodePort,
tableOwner,
commandList);
} }
} }
} }

View File

@ -78,6 +78,9 @@ SELECT master_get_active_worker_nodes();
-- try to disable a node which does not exist and see that an error is thrown -- try to disable a node which does not exist and see that an error is thrown
SELECT citus_disable_node('localhost.noexist', 2345); SELECT citus_disable_node('localhost.noexist', 2345);
table pg_dist_node;
\d
-- drop the table without leaving a shard placement behind (messes up other tests) -- drop the table without leaving a shard placement behind (messes up other tests)
SELECT master_activate_node('localhost', :worker_2_port); SELECT master_activate_node('localhost', :worker_2_port);