Merge pull request #5960 from citusdata/parallelize_node_activate

Parallelize metadata syncing on node activate
pull/5965/head
Önder Kalacı 2022-05-23 09:25:11 +02:00 committed by GitHub
commit 2a768176c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 567 additions and 242 deletions

View File

@ -139,6 +139,7 @@ static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
@ -195,6 +196,33 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
}
/*
* start_metadata_sync_to_all_nodes function sets hasmetadata column of
* all the primary worker nodes to true, and then activate nodes without
* replicating reference tables.
*/
Datum
start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
EnsureSuperUser();
EnsureCoordinator();
List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
bool prevReplicateRefTablesOnActivate = ReplicateReferenceTablesOnActivate;
SetLocalReplicateReferenceTablesOnActivate(false);
ActivateNodeList(workerNodes);
TransactionModifiedNodeMetadata = true;
SetLocalReplicateReferenceTablesOnActivate(prevReplicateRefTablesOnActivate);
PG_RETURN_BOOL(true);
}
/*
* SyncNodeMetadataToNode is the internal API for
* start_metadata_sync_to_node().
@ -543,10 +571,10 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
*/
if (raiseOnError)
{
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
recreateMetadataSnapshotCommandList);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(
workerNode),
currentUser,
recreateMetadataSnapshotCommandList);
return true;
}
else

View File

@ -106,9 +106,9 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsToNode(WorkerNode *workerNode);
static void SyncDistributedObjectsToNodeList(List *workerNodeList);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeList);
static List * InterTableRelationshipCommandList();
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -843,7 +843,7 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode)
/*
* SyncDistributedObjectsToNode sync the distributed objects to the node. It includes
* SyncDistributedObjectsToNodeList sync the distributed objects to the node. It includes
* - All dependencies (e.g., types, schemas, sequences)
* - All shell distributed table
* - Inter relation between those shell tables
@ -852,17 +852,29 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode)
* since all the dependencies should be present in the coordinator already.
*/
static void
SyncDistributedObjectsToNode(WorkerNode *workerNode)
SyncDistributedObjectsToNodeList(List *workerNodeList)
{
if (NodeIsCoordinator(workerNode))
List *workerNodesToSync = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
/* coordinator has all the objects */
return;
if (NodeIsCoordinator(workerNode))
{
/* coordinator has all the objects */
continue;
}
if (!NodeIsPrimary(workerNode))
{
/* secondary nodes gets the objects from their primaries via replication */
continue;
}
workerNodesToSync = lappend(workerNodesToSync, workerNode);
}
if (!NodeIsPrimary(workerNode))
if (workerNodesToSync == NIL)
{
/* secondary nodes gets the objects from their primaries via replication */
return;
}
@ -874,9 +886,8 @@ SyncDistributedObjectsToNode(WorkerNode *workerNode)
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
workerNodesToSync,
CurrentUserName(),
commandList);
}
@ -894,9 +905,8 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
list_make1(workerNode),
CurrentUserName(),
commandList);
}
@ -904,25 +914,33 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
/*
* SyncPgDistTableMetadataToNode syncs the pg_dist_partition, pg_dist_shard
* SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard
* pg_dist_placement and pg_dist_object metadata entries.
*
*/
static void
SyncPgDistTableMetadataToNode(WorkerNode *workerNode)
SyncPgDistTableMetadataToNodeList(List *nodeList)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
syncPgDistMetadataCommandList);
List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList();
List *nodesWithMetadata = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
nodesWithMetadata = lappend(nodesWithMetadata, workerNode);
}
}
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
nodesWithMetadata,
CurrentUserName(),
syncPgDistMetadataCommandList);
}
@ -1118,15 +1136,14 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the
* given node.
* ActivateNodeList iterates over the nodeList and activates the nodes.
* Some part of the node activation is done parallel across the nodes,
* such as syncing the metadata. However, reference table replication is
* done one by one across nodes.
*/
int
ActivateNode(char *nodeName, int nodePort)
void
ActivateNodeList(List *nodeList)
{
bool isActive = true;
/*
* We currently require the object propagation to happen via superuser,
* see #5139. While activating a node, we sync both metadata and object
@ -1143,86 +1160,130 @@ ActivateNode(char *nodeName, int nodePort)
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
/*
* First, locally mark the node is active, if everything goes well,
* we are going to sync this information to all the metadata nodes.
*/
WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
}
/*
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
* set to false).
*
* Sync the metadata changes to all existing metadata nodes irrespective
* of the current nodes' metadata sync state. We expect all nodes up
* and running when another node is activated.
*/
if (!workerNode->isActive && NodeIsPrimary(workerNode))
{
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
localOnly);
}
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(isActive));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
List *nodeToSyncMetadata = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
* First, locally mark the node is active, if everything goes well,
* we are going to sync this information to all the metadata nodes.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
/*
* Sync distributed objects first. We must sync distributed objects before
* replicating reference tables to the remote node, as reference tables may
* need such objects.
*/
SyncDistributedObjectsToNode(workerNode);
/*
* We need to replicate reference tables before syncing node metadata, otherwise
* reference table replication logic would try to get lock on the new node before
* having the shard placement on it
*/
if (ReplicateReferenceTablesOnActivate)
WorkerNode *workerNode =
FindWorkerNodeAnyCluster(node->workerName, node->workerPort);
if (workerNode == NULL)
{
ReplicateAllReferenceTablesToNode(workerNode);
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName,
node->workerPort)));
}
/*
* Sync node metadata. We must sync node metadata before syncing table
* related pg_dist_xxx metadata. Since table related metadata requires
* to have right pg_dist_node entries.
*/
SyncNodeMetadataToNode(nodeName, nodePort);
/* both nodes should be the same */
Assert(workerNode->nodeId == node->nodeId);
/*
* As the last step, sync the table related metadata to the remote node.
* We must handle it as the last step because of limitations shared with
* above comments.
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
* set to false).
*
* Sync the metadata changes to all existing metadata nodes irrespective
* of the current nodes' metadata sync state. We expect all nodes up
* and running when another node is activated.
*/
SyncPgDistTableMetadataToNode(workerNode);
if (!workerNode->isActive && NodeIsPrimary(workerNode))
{
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
localOnly);
}
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
{
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
}
}
/*
* Sync distributed objects first. We must sync distributed objects before
* replicating reference tables to the remote node, as reference tables may
* need such objects.
*/
SyncDistributedObjectsToNodeList(nodeToSyncMetadata);
if (ReplicateReferenceTablesOnActivate)
{
foreach_ptr(node, nodeList)
{
/*
* We need to replicate reference tables before syncing node metadata, otherwise
* reference table replication logic would try to get lock on the new node before
* having the shard placement on it
*/
if (NodeIsPrimary(node))
{
ReplicateAllReferenceTablesToNode(node);
}
}
}
/*
* Sync node metadata. We must sync node metadata before syncing table
* related pg_dist_xxx metadata. Since table related metadata requires
* to have right pg_dist_node entries.
*/
foreach_ptr(node, nodeToSyncMetadata)
{
SyncNodeMetadataToNode(node->workerName, node->workerPort);
}
/*
* As the last step, sync the table related metadata to the remote node.
* We must handle it as the last step because of limitations shared with
* above comments.
*/
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata);
foreach_ptr(node, nodeList)
{
bool isActive = true;
/* finally, let all other active metadata nodes to learn about this change */
SetNodeState(node->workerName, node->workerPort, isActive);
}
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the
* given node.
*/
int
ActivateNode(char *nodeName, int nodePort)
{
bool isActive = true;
WorkerNode *workreNode = ModifiableWorkerNode(nodeName, nodePort);
ActivateNodeList(list_make1(workreNode));
/* finally, let all other active metadata nodes to learn about this change */
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
Assert(newWorkerNode->nodeId == workerNode->nodeId);

View File

@ -3,3 +3,5 @@
#include "udfs/citus_is_coordinator/11.0-2.sql"
#include "udfs/citus_disable_node/11.0-2.sql"
#include "udfs/run_command_on_coordinator/11.0-2.sql"
#include "udfs/start_metadata_sync_to_all_nodes/11.0-2.sql"
#include "udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql"

View File

@ -13,3 +13,6 @@ REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC
DROP FUNCTION pg_catalog.citus_is_coordinator();
DROP FUNCTION pg_catalog.run_command_on_coordinator(text,boolean);
DROP FUNCTION pg_catalog.start_metadata_sync_to_all_nodes();
DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(boolean);

View File

@ -0,0 +1,221 @@
-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures
-- the upgrade to Citus 11 is finished successfully. Upgrade to
-- Citus 11 requires all active primary worker nodes to get the
-- metadata. And, this function's job is to sync the metadata to
-- the nodes that does not already have
-- once the function finishes without any errors and returns true
-- the cluster is ready for running distributed queries from
-- the worker nodes. When debug is enabled, the function provides
-- more information to the user.
CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true)
RETURNS bool
LANGUAGE plpgsql
AS $$
BEGIN
---------------------------------------------
-- This script consists of N stages
-- Each step is documented, and if log level
-- is reduced to DEBUG1, each step is logged
-- as well
---------------------------------------------
------------------------------------------------------------------------------------------
-- STAGE 0: Ensure no concurrent node metadata changing operation happens while this
-- script is running via acquiring a strong lock on the pg_dist_node
------------------------------------------------------------------------------------------
BEGIN
LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT;
EXCEPTION WHEN OTHERS THEN
RAISE 'Another node metadata changing operation is in progress, try again.';
END;
------------------------------------------------------------------------------------------
-- STAGE 1: We want all the commands to run in the same transaction block. Without
-- sequential mode, metadata syncing cannot be done in a transaction block along with
-- other commands
------------------------------------------------------------------------------------------
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
------------------------------------------------------------------------------------------
-- STAGE 2: Ensure we have the prerequisites
-- (a) only superuser can run this script
-- (b) cannot be executed when enable_ddl_propagation is False
-- (c) can only be executed from the coordinator
------------------------------------------------------------------------------------------
DECLARE
is_superuser_running boolean := False;
enable_ddl_prop boolean:= False;
local_group_id int := 0;
BEGIN
SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user;
IF is_superuser_running IS NOT True THEN
RAISE EXCEPTION 'This operation can only be initiated by superuser';
END IF;
SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop;
IF enable_ddl_prop IS NOT True THEN
RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.';
END IF;
SELECT groupid INTO local_group_id FROM pg_dist_local_group;
IF local_group_id != 0 THEN
RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.';
ELSE
RAISE DEBUG 'We are on the coordinator, continue to sync metadata';
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 3: Ensure all primary nodes are active
------------------------------------------------------------------------------------------
DECLARE
primary_disabled_worker_node_count int := 0;
BEGIN
SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node
WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive;
IF primary_disabled_worker_node_count != 0 THEN
RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.'
'Use SELECT citus_activate_node() to activate the disabled nodes';
ELSE
RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata';
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 4: Ensure there is no connectivity issues in the cluster
------------------------------------------------------------------------------------------
DECLARE
all_nodes_can_connect_to_each_other boolean := False;
BEGIN
SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health();
IF all_nodes_can_connect_to_each_other != True THEN
RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all '
'nodes are up and runnnig. Also, make sure that all nodes can connect '
'to each other. Use SELECT * FROM citus_check_cluster_node_health(); '
'to check the cluster health';
ELSE
RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other';
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 5: Ensure all nodes are on the same version
------------------------------------------------------------------------------------------
DECLARE
coordinator_version text := '';
worker_node_version text := '';
worker_node_version_count int := 0;
BEGIN
SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus';
-- first, check if all nodes have the same versions
SELECT
count(distinct result) INTO worker_node_version_count
FROM
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus''');
IF enforce_version_check AND worker_node_version_count != 1 THEN
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
'some of the workers have different versions.';
ELSE
RAISE DEBUG 'All worker nodes have the same Citus version';
END IF;
-- second, check if all nodes have the same versions
SELECT
result INTO worker_node_version
FROM
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';')
GROUP BY result;
IF enforce_version_check AND coordinator_version != worker_node_version THEN
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
'the coordinator has version % and the worker(s) has %',
coordinator_version, worker_node_version;
ELSE
RAISE DEBUG 'All nodes have the same Citus version';
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 6: Ensure all the partitioned tables have the proper naming structure
-- As described on https://github.com/citusdata/citus/issues/4962
-- existing indexes on partitioned distributed tables can collide
-- with the index names exists on the shards
-- luckily, we know how to fix it.
-- And, note that we should do this even if the cluster is a basic plan
-- (e.g., single node Citus) such that when cluster scaled out, everything
-- works as intended
-- And, this should be done only ONCE for a cluster as it can be a pretty
-- time consuming operation. Thus, even if the function is called multiple time,
-- we keep track of it and do not re-execute this part if not needed.
------------------------------------------------------------------------------------------
DECLARE
partitioned_table_exists_pre_11 boolean:=False;
BEGIN
-- we recorded if partitioned tables exists during upgrade to Citus 11
SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11
FROM pg_dist_node_metadata;
IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN
-- this might take long depending on the number of partitions and shards...
RAISE NOTICE 'Preparing all the existing partitioned table indexes';
PERFORM pg_catalog.fix_all_partition_shard_index_names();
-- great, we are done with fixing the existing wrong index names
-- so, lets remove this
UPDATE pg_dist_node_metadata
SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11');
ELSE
RAISE DEBUG 'There are no partitioned tables that should be fixed';
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 7: Return early if there are no primary worker nodes
-- We don't strictly need this step, but it gives a nicer notice message
------------------------------------------------------------------------------------------
DECLARE
primary_worker_node_count bigint :=0;
BEGIN
SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
IF primary_worker_node_count = 0 THEN
RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node';
RETURN true;
ELSE
RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count;
END IF;
END;
------------------------------------------------------------------------------------------
-- STAGE 8: Do the actual metadata & object syncing to the worker nodes
-- For the "already synced" metadata nodes, we do not strictly need to
-- sync the objects & metadata, but there is no harm to do it anyway
-- it'll only cost some execution time but makes sure that we have a
-- a consistent metadata & objects across all the nodes
------------------------------------------------------------------------------------------
DECLARE
BEGIN
-- this might take long depending on the number of tables & objects ...
RAISE NOTICE 'Preparing to sync the metadata to all nodes';
PERFORM start_metadata_sync_to_all_nodes();
END;
RETURN true;
END;
$$;
COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool)
IS 'finalizes upgrade to Citus';
REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC;

View File

@ -209,9 +209,7 @@ END;
-- this might take long depending on the number of tables & objects ...
RAISE NOTICE 'Preparing to sync the metadata to all nodes';
PERFORM start_metadata_sync_to_node(nodename,nodeport)
FROM
pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
PERFORM start_metadata_sync_to_all_nodes();
END;
RETURN true;

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes()
RETURNS bool
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$;
COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes()
IS 'sync metadata to all active primary nodes';
REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC;

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes()
RETURNS bool
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$;
COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes()
IS 'sync metadata to all active primary nodes';
REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC;

View File

@ -490,27 +490,70 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
* coordinated transaction. Any failures aborts the coordinated transaction.
*/
void
SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList)
SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList,
const char *nodeUser,
List *commandList)
{
int connectionFlags = REQUIRE_METADATA_CONNECTION;
if (list_length(commandList) == 0 || list_length(workerNodeList) == 0)
{
/* nothing to do */
return;
}
UseCoordinatedTransaction();
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
List *connectionList = NIL;
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBeginIfNecessary(workerConnection);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
ExecuteCriticalRemoteCommand(workerConnection, commandString);
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int connectionFlags = REQUIRE_METADATA_CONNECTION;
MultiConnection *connection =
StartNodeConnection(connectionFlags, nodeName, nodePort);
MarkRemoteTransactionCritical(connection);
/*
* connection can only be NULL for optional connections, which we don't
* support in this codepath.
*/
Assert((connectionFlags & OPTIONAL_CONNECTION) == 0);
Assert(connection != NULL);
connectionList = lappend(connectionList, connection);
}
FinishConnectionListEstablishment(connectionList);
/* must open transaction blocks to use intermediate results */
RemoteTransactionsBeginIfNecessary(connectionList);
/*
* In order to avoid round-trips per query in queryStringList,
* we join the string and send as a single command. Also,
* if there is only a single command, avoid additional call to
* StringJoin given that some strings can be quite large.
*/
char *stringToSend = (list_length(commandList) == 1) ?
linitial(commandList) : StringJoin(commandList, ';');
/* send commands in parallel */
bool failOnError = true;
MultiConnection *connection = NULL;
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommand(connection, stringToSend);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);
}
}
foreach_ptr(connection, connectionList)
{
ClearResults(connection, failOnError);
}
}

View File

@ -356,9 +356,10 @@ ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName,
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
CurrentUserName(),
ddlCommandList);
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(workerNode),
CurrentUserName(),
ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort);
uint64 placementId = GetNextPlacementId();
@ -599,9 +600,8 @@ ReplicateAllReferenceTablesToNode(WorkerNode *workerNode)
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
list_make1(workerNode),
CurrentUserName(),
commandList);
}

View File

@ -1296,9 +1296,6 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList,
const char *currentUser = CurrentUserName();
foreach_ptr(workerNode, workerNodeList)
{
const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == localGroupId)
{
@ -1306,9 +1303,11 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList,
continue;
}
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort,
currentUser, list_make1(
lockCommand));
SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(
workerNode),
currentUser,
list_make1(
lockCommand));
}
}

View File

@ -63,6 +63,7 @@ extern char *WorkerListFileName;
extern char *CurrentCluster;
extern bool ReplicateReferenceTablesOnActivate;
extern void ActivateNodeList(List *nodeList);
extern int ActivateNode(char *nodeName, int nodePort);
/* Function declarations for finding worker nodes to place shards on */

View File

@ -74,10 +74,11 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction(
List *workerNodeList,
const char *
nodeUser,
List *commandList);
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,
const char *command,
const char *user);

View File

@ -106,10 +106,11 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()');
(1 row)
SELECT master_activate_node('localhost', :worker_2_proxy_port);
ERROR: server closed the connection unexpectedly
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- verify node is not activated
SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;

View File

@ -13,6 +13,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
\set VERBOSITY terse
SET client_min_messages TO ERROR;
CREATE TABLE t1 (id int PRIMARY KEY);
SELECT create_distributed_table('t1', 'id');
create_distributed_table
@ -23,7 +25,6 @@ SELECT create_distributed_table('t1', 'id');
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
-- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
stop_metadata_sync_to_node
---------------------------------------------------------------------
@ -51,12 +52,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- Failure to drop all tables in pg_dist_partition
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -64,19 +62,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").can
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- Failure to delete pg_dist_node entries from the worker
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -84,19 +79,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel('
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- Failure to populate pg_dist_node in the worker
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -104,17 +96,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel('
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- Verify that coordinator knows worker does not have valid metadata
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
hasmetadata
@ -153,7 +142,6 @@ SELECT create_distributed_table('t2', 'id');
ERROR: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
SELECT citus.mitmproxy('conn.onParse(query="citus_internal_add_shard_metadata").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -179,7 +167,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()');
mitmproxy
@ -188,90 +175,27 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- Failure to delete pg_dist_node entries from the worker
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
connection not open
connection not open
connection not open
connection not open
CONTEXT: while executing command on localhost:xxxxx
stop_metadata_sync_to_node
---------------------------------------------------------------------

View File

@ -1191,6 +1191,20 @@ WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER
SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false);
ERROR: only the 'shouldhaveshards' property can be set using this function
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
BEGIN;
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
-- verify that at the end of this file, all primary nodes have metadata synced
SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
?column?

View File

@ -1038,7 +1038,8 @@ SELECT * FROM multi_extension.print_extension_changes();
---------------------------------------------------------------------
| function citus_is_coordinator() boolean
| function run_command_on_coordinator(text,boolean) SETOF record
(2 rows)
| function start_metadata_sync_to_all_nodes() boolean
(3 rows)
-- Test downgrade script (result should be empty)
ALTER EXTENSION citus UPDATE TO '11.0-1';

View File

@ -218,8 +218,9 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam
-- adding the second node will fail as the text search template needs to be created manually
SELECT 1 from master_add_node('localhost', :worker_2_port);
ERROR: text search template "public.intdict_template" does not exist
WARNING: text search template "public.intdict_template" does not exist
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
-- create the text search template manually on the worker
\c - - - :worker_2_port
SET citus.enable_metadata_sync TO false;

View File

@ -205,6 +205,7 @@ ORDER BY 1;
function run_command_on_shards(regclass,text,boolean)
function run_command_on_workers(text,boolean)
function shard_name(regclass,bigint)
function start_metadata_sync_to_all_nodes()
function start_metadata_sync_to_node(text,integer)
function stop_metadata_sync_to_node(text,integer,boolean)
function time_partition_range(regclass)
@ -284,5 +285,5 @@ ORDER BY 1;
view columnar.stripe
view pg_dist_shard_placement
view time_partitions
(268 rows)
(269 rows)

View File

@ -10,6 +10,9 @@ SET citus.shard_replication_factor TO 1;
SELECT pg_backend_pid() as pid \gset
SELECT citus.mitmproxy('conn.allow()');
\set VERBOSITY terse
SET client_min_messages TO ERROR;
CREATE TABLE t1 (id int PRIMARY KEY);
SELECT create_distributed_table('t1', 'id');
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
@ -25,21 +28,21 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop all tables in pg_dist_partition
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to delete pg_dist_node entries from the worker
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()');
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
-- Failure to populate pg_dist_node in the worker
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')');
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
-- Verify that coordinator knows worker does not have valid metadata
@ -71,9 +74,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
-- Failure to delete pg_dist_node entries from the worker
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()');
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
\c - - - :worker_2_port

View File

@ -484,5 +484,10 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproper
DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated;
BEGIN;
SELECT start_metadata_sync_to_all_nodes();
COMMIT;
SELECT start_metadata_sync_to_all_nodes();
-- verify that at the end of this file, all primary nodes have metadata synced
SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';