From dd02e1755ff2e5d6408fe4ab6b5077a4c34ce44b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 19 May 2022 12:43:41 +0200 Subject: [PATCH] Parallelize metadata syncing on node activate MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It is often useful to be able to sync the metadata in parallel across nodes. Also citus_finalize_upgrade_to_citus11() uses start_metadata_sync_to_primary_nodes() after this commit. Note that this commit does not parallelize all pieces of node activation or metadata syncing. Instead, it tries to parallelize potenially large parts of metadata, which is the objects and distributed tables (in general Citus tables). In the future, it would be nice to sync the reference tables in parallel across nodes. Create ~720 distributed tables / ~23450 shards ```SQL -- declaratively partitioned table CREATE TABLE github_events_looooooooooooooong_name ( event_id bigint, event_type text, event_public boolean, repo_id bigint, payload jsonb, repo jsonb, actor jsonb, org jsonb, created_at timestamp ) PARTITION BY RANGE (created_at); SELECT create_time_partitions( table_name := 'github_events_looooooooooooooong_name', partition_interval := '1 day', end_at := now() + '24 months' ); CREATE INDEX ON github_events_looooooooooooooong_name USING btree (event_id, event_type, event_public, repo_id); SELECT create_distributed_table('github_events_looooooooooooooong_name', 'repo_id'); SET client_min_messages TO ERROR; ``` across 1 node: almost same as expected ```SQL SELECT start_metadata_sync_to_primary_nodes(); Time: 15664.418 ms (00:15.664) select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node; Time: 14284.069 ms (00:14.284) ``` across 7 nodes: ~3.5x improvement ```SQL SELECT start_metadata_sync_to_primary_nodes(); ┌──────────────────────────────────────┐ │ start_metadata_sync_to_primary_nodes │ ├──────────────────────────────────────┤ │ t │ └──────────────────────────────────────┘ (1 row) Time: 25711.192 ms (00:25.711) -- across 7 nodes select start_metadata_sync_to_node(nodename,nodeport) from pg_dist_node; Time: 82126.075 ms (01:22.126) ``` --- .../distributed/metadata/metadata_sync.c | 36 ++- .../distributed/metadata/node_metadata.c | 265 +++++++++++------- .../distributed/sql/citus--11.0-1--11.0-2.sql | 2 + .../sql/downgrades/citus--11.0-2--11.0-1.sql | 3 + .../11.0-2.sql | 221 +++++++++++++++ .../latest.sql | 4 +- .../11.0-2.sql | 9 + .../latest.sql | 9 + .../transaction/worker_transaction.c | 73 ++++- .../distributed/utils/reference_table_utils.c | 12 +- src/backend/distributed/utils/resource_lock.c | 11 +- src/include/distributed/worker_manager.h | 1 + src/include/distributed/worker_transaction.h | 9 +- .../expected/failure_add_disable_node.out | 3 +- .../expected/failure_mx_metadata_sync.out | 104 +------ .../expected/multi_cluster_management.out | 14 + src/test/regress/expected/multi_extension.out | 3 +- .../expected/propagate_extension_commands.out | 3 +- .../expected/upgrade_list_citus_objects.out | 3 +- .../regress/sql/failure_mx_metadata_sync.sql | 19 +- .../regress/sql/multi_cluster_management.sql | 5 + 21 files changed, 567 insertions(+), 242 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql create mode 100644 src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql create mode 100644 src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 8c1a68151..4e50acab6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -139,6 +139,7 @@ static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); +PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); @@ -195,6 +196,33 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) } +/* + * start_metadata_sync_to_all_nodes function sets hasmetadata column of + * all the primary worker nodes to true, and then activate nodes without + * replicating reference tables. + */ +Datum +start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + EnsureSuperUser(); + EnsureCoordinator(); + + List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock); + + bool prevReplicateRefTablesOnActivate = ReplicateReferenceTablesOnActivate; + SetLocalReplicateReferenceTablesOnActivate(false); + + ActivateNodeList(workerNodes); + TransactionModifiedNodeMetadata = true; + + SetLocalReplicateReferenceTablesOnActivate(prevReplicateRefTablesOnActivate); + + PG_RETURN_BOOL(true); +} + + /* * SyncNodeMetadataToNode is the internal API for * start_metadata_sync_to_node(). @@ -543,10 +571,10 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) */ if (raiseOnError) { - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1( + workerNode), + currentUser, + recreateMetadataSnapshotCommandList); return true; } else diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ab430f6f5..da0656cc2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -106,9 +106,9 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SyncDistributedObjectsToNode(WorkerNode *workerNode); +static void SyncDistributedObjectsToNodeList(List *workerNodeList); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); -static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode); +static void SyncPgDistTableMetadataToNodeList(List *nodeList); static List * InterTableRelationshipCommandList(); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -843,7 +843,7 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode) /* - * SyncDistributedObjectsToNode sync the distributed objects to the node. It includes + * SyncDistributedObjectsToNodeList sync the distributed objects to the node. It includes * - All dependencies (e.g., types, schemas, sequences) * - All shell distributed table * - Inter relation between those shell tables @@ -852,17 +852,29 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode) * since all the dependencies should be present in the coordinator already. */ static void -SyncDistributedObjectsToNode(WorkerNode *workerNode) +SyncDistributedObjectsToNodeList(List *workerNodeList) { - if (NodeIsCoordinator(workerNode)) + List *workerNodesToSync = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) { - /* coordinator has all the objects */ - return; + if (NodeIsCoordinator(workerNode)) + { + /* coordinator has all the objects */ + continue; + } + + if (!NodeIsPrimary(workerNode)) + { + /* secondary nodes gets the objects from their primaries via replication */ + continue; + } + + workerNodesToSync = lappend(workerNodesToSync, workerNode); } - if (!NodeIsPrimary(workerNode)) + if (workerNodesToSync == NIL) { - /* secondary nodes gets the objects from their primaries via replication */ return; } @@ -874,9 +886,8 @@ SyncDistributedObjectsToNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + workerNodesToSync, CurrentUserName(), commandList); } @@ -894,9 +905,8 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + list_make1(workerNode), CurrentUserName(), commandList); } @@ -904,25 +914,33 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) /* - * SyncPgDistTableMetadataToNode syncs the pg_dist_partition, pg_dist_shard + * SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard * pg_dist_placement and pg_dist_object metadata entries. * */ static void -SyncPgDistTableMetadataToNode(WorkerNode *workerNode) +SyncPgDistTableMetadataToNodeList(List *nodeList) { - if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) - { - List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - syncPgDistMetadataCommandList); + List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); + + List *nodesWithMetadata = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) + { + nodesWithMetadata = lappend(nodesWithMetadata, workerNode); + } } + + + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + nodesWithMetadata, + CurrentUserName(), + syncPgDistMetadataCommandList); } @@ -1118,15 +1136,14 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) /* - * ActivateNode activates the node with nodeName and nodePort. Currently, activation - * includes only replicating the reference tables and setting isactive column of the - * given node. + * ActivateNodeList iterates over the nodeList and activates the nodes. + * Some part of the node activation is done parallel across the nodes, + * such as syncing the metadata. However, reference table replication is + * done one by one across nodes. */ -int -ActivateNode(char *nodeName, int nodePort) +void +ActivateNodeList(List *nodeList) { - bool isActive = true; - /* * We currently require the object propagation to happen via superuser, * see #5139. While activating a node, we sync both metadata and object @@ -1143,86 +1160,130 @@ ActivateNode(char *nodeName, int nodePort) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - /* - * First, locally mark the node is active, if everything goes well, - * we are going to sync this information to all the metadata nodes. - */ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); - } - /* - * Delete existing reference and replicated table placements on the - * given groupId if the group has been disabled earlier (e.g., isActive - * set to false). - * - * Sync the metadata changes to all existing metadata nodes irrespective - * of the current nodes' metadata sync state. We expect all nodes up - * and running when another node is activated. - */ - if (!workerNode->isActive && NodeIsPrimary(workerNode)) - { - bool localOnly = false; - DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - localOnly); - } - - workerNode = - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, - BoolGetDatum(isActive)); - - /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ - bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); - - if (syncMetadata) + List *nodeToSyncMetadata = NIL; + WorkerNode *node = NULL; + foreach_ptr(node, nodeList) { /* - * We are going to sync the metadata anyway in this transaction, so do - * not fail just because the current metadata is not synced. + * First, locally mark the node is active, if everything goes well, + * we are going to sync this information to all the metadata nodes. */ - SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - - /* - * Update local group id first, as object dependency logic requires to have - * updated local group id. - */ - UpdateLocalGroupIdOnNode(workerNode); - - /* - * Sync distributed objects first. We must sync distributed objects before - * replicating reference tables to the remote node, as reference tables may - * need such objects. - */ - SyncDistributedObjectsToNode(workerNode); - - /* - * We need to replicate reference tables before syncing node metadata, otherwise - * reference table replication logic would try to get lock on the new node before - * having the shard placement on it - */ - if (ReplicateReferenceTablesOnActivate) + WorkerNode *workerNode = + FindWorkerNodeAnyCluster(node->workerName, node->workerPort); + if (workerNode == NULL) { - ReplicateAllReferenceTablesToNode(workerNode); + ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName, + node->workerPort))); } - /* - * Sync node metadata. We must sync node metadata before syncing table - * related pg_dist_xxx metadata. Since table related metadata requires - * to have right pg_dist_node entries. - */ - SyncNodeMetadataToNode(nodeName, nodePort); + /* both nodes should be the same */ + Assert(workerNode->nodeId == node->nodeId); /* - * As the last step, sync the table related metadata to the remote node. - * We must handle it as the last step because of limitations shared with - * above comments. + * Delete existing reference and replicated table placements on the + * given groupId if the group has been disabled earlier (e.g., isActive + * set to false). + * + * Sync the metadata changes to all existing metadata nodes irrespective + * of the current nodes' metadata sync state. We expect all nodes up + * and running when another node is activated. */ - SyncPgDistTableMetadataToNode(workerNode); + if (!workerNode->isActive && NodeIsPrimary(workerNode)) + { + bool localOnly = false; + DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, + localOnly); + } + + workerNode = + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + + /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ + bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); + if (syncMetadata) + { + /* + * We are going to sync the metadata anyway in this transaction, so do + * not fail just because the current metadata is not synced. + */ + SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + + /* + * Update local group id first, as object dependency logic requires to have + * updated local group id. + */ + UpdateLocalGroupIdOnNode(workerNode); + + nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); + } } + /* + * Sync distributed objects first. We must sync distributed objects before + * replicating reference tables to the remote node, as reference tables may + * need such objects. + */ + SyncDistributedObjectsToNodeList(nodeToSyncMetadata); + + if (ReplicateReferenceTablesOnActivate) + { + foreach_ptr(node, nodeList) + { + /* + * We need to replicate reference tables before syncing node metadata, otherwise + * reference table replication logic would try to get lock on the new node before + * having the shard placement on it + */ + if (NodeIsPrimary(node)) + { + ReplicateAllReferenceTablesToNode(node); + } + } + } + + /* + * Sync node metadata. We must sync node metadata before syncing table + * related pg_dist_xxx metadata. Since table related metadata requires + * to have right pg_dist_node entries. + */ + foreach_ptr(node, nodeToSyncMetadata) + { + SyncNodeMetadataToNode(node->workerName, node->workerPort); + } + + /* + * As the last step, sync the table related metadata to the remote node. + * We must handle it as the last step because of limitations shared with + * above comments. + */ + SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); + + foreach_ptr(node, nodeList) + { + bool isActive = true; + + /* finally, let all other active metadata nodes to learn about this change */ + SetNodeState(node->workerName, node->workerPort, isActive); + } +} + + +/* + * ActivateNode activates the node with nodeName and nodePort. Currently, activation + * includes only replicating the reference tables and setting isactive column of the + * given node. + */ +int +ActivateNode(char *nodeName, int nodePort) +{ + bool isActive = true; + + WorkerNode *workreNode = ModifiableWorkerNode(nodeName, nodePort); + ActivateNodeList(list_make1(workreNode)); + /* finally, let all other active metadata nodes to learn about this change */ WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); Assert(newWorkerNode->nodeId == workerNode->nodeId); diff --git a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql index d926ee5e3..53ebae152 100644 --- a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql +++ b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql @@ -3,3 +3,5 @@ #include "udfs/citus_is_coordinator/11.0-2.sql" #include "udfs/citus_disable_node/11.0-2.sql" #include "udfs/run_command_on_coordinator/11.0-2.sql" +#include "udfs/start_metadata_sync_to_all_nodes/11.0-2.sql" +#include "udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql index 270bbb52e..6569f4bbc 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql @@ -13,3 +13,6 @@ REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC DROP FUNCTION pg_catalog.citus_is_coordinator(); DROP FUNCTION pg_catalog.run_command_on_coordinator(text,boolean); + +DROP FUNCTION pg_catalog.start_metadata_sync_to_all_nodes(); +DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(boolean); diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql new file mode 100644 index 000000000..2b4bb17f6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql @@ -0,0 +1,221 @@ +-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures +-- the upgrade to Citus 11 is finished successfully. Upgrade to +-- Citus 11 requires all active primary worker nodes to get the +-- metadata. And, this function's job is to sync the metadata to +-- the nodes that does not already have +-- once the function finishes without any errors and returns true +-- the cluster is ready for running distributed queries from +-- the worker nodes. When debug is enabled, the function provides +-- more information to the user. +CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true) + RETURNS bool + LANGUAGE plpgsql + AS $$ +BEGIN + + --------------------------------------------- + -- This script consists of N stages + -- Each step is documented, and if log level + -- is reduced to DEBUG1, each step is logged + -- as well + --------------------------------------------- + +------------------------------------------------------------------------------------------ + -- STAGE 0: Ensure no concurrent node metadata changing operation happens while this + -- script is running via acquiring a strong lock on the pg_dist_node +------------------------------------------------------------------------------------------ +BEGIN + LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT; + + EXCEPTION WHEN OTHERS THEN + RAISE 'Another node metadata changing operation is in progress, try again.'; +END; + +------------------------------------------------------------------------------------------ + -- STAGE 1: We want all the commands to run in the same transaction block. Without + -- sequential mode, metadata syncing cannot be done in a transaction block along with + -- other commands +------------------------------------------------------------------------------------------ + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + +------------------------------------------------------------------------------------------ + -- STAGE 2: Ensure we have the prerequisites + -- (a) only superuser can run this script + -- (b) cannot be executed when enable_ddl_propagation is False + -- (c) can only be executed from the coordinator +------------------------------------------------------------------------------------------ +DECLARE + is_superuser_running boolean := False; + enable_ddl_prop boolean:= False; + local_group_id int := 0; +BEGIN + SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user; + IF is_superuser_running IS NOT True THEN + RAISE EXCEPTION 'This operation can only be initiated by superuser'; + END IF; + + SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop; + IF enable_ddl_prop IS NOT True THEN + RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.'; + END IF; + + SELECT groupid INTO local_group_id FROM pg_dist_local_group; + + IF local_group_id != 0 THEN + RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.'; + ELSE + RAISE DEBUG 'We are on the coordinator, continue to sync metadata'; + END IF; +END; + + + ------------------------------------------------------------------------------------------ + -- STAGE 3: Ensure all primary nodes are active + ------------------------------------------------------------------------------------------ + DECLARE + primary_disabled_worker_node_count int := 0; + BEGIN + SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node + WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive; + + IF primary_disabled_worker_node_count != 0 THEN + RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.' + 'Use SELECT citus_activate_node() to activate the disabled nodes'; + ELSE + RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 4: Ensure there is no connectivity issues in the cluster + ------------------------------------------------------------------------------------------ + DECLARE + all_nodes_can_connect_to_each_other boolean := False; + BEGIN + SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health(); + + IF all_nodes_can_connect_to_each_other != True THEN + RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all ' + 'nodes are up and runnnig. Also, make sure that all nodes can connect ' + 'to each other. Use SELECT * FROM citus_check_cluster_node_health(); ' + 'to check the cluster health'; + ELSE + RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 5: Ensure all nodes are on the same version + ------------------------------------------------------------------------------------------ + DECLARE + coordinator_version text := ''; + worker_node_version text := ''; + worker_node_version_count int := 0; + + BEGIN + SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus'; + + -- first, check if all nodes have the same versions + SELECT + count(distinct result) INTO worker_node_version_count + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'''); + IF enforce_version_check AND worker_node_version_count != 1 THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'some of the workers have different versions.'; + ELSE + RAISE DEBUG 'All worker nodes have the same Citus version'; + END IF; + + -- second, check if all nodes have the same versions + SELECT + result INTO worker_node_version + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + + IF enforce_version_check AND coordinator_version != worker_node_version THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the coordinator has version % and the worker(s) has %', + coordinator_version, worker_node_version; + ELSE + RAISE DEBUG 'All nodes have the same Citus version'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 6: Ensure all the partitioned tables have the proper naming structure + -- As described on https://github.com/citusdata/citus/issues/4962 + -- existing indexes on partitioned distributed tables can collide + -- with the index names exists on the shards + -- luckily, we know how to fix it. + -- And, note that we should do this even if the cluster is a basic plan + -- (e.g., single node Citus) such that when cluster scaled out, everything + -- works as intended + -- And, this should be done only ONCE for a cluster as it can be a pretty + -- time consuming operation. Thus, even if the function is called multiple time, + -- we keep track of it and do not re-execute this part if not needed. + ------------------------------------------------------------------------------------------ + DECLARE + partitioned_table_exists_pre_11 boolean:=False; + BEGIN + + -- we recorded if partitioned tables exists during upgrade to Citus 11 + SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11 + FROM pg_dist_node_metadata; + + IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN + + -- this might take long depending on the number of partitions and shards... + RAISE NOTICE 'Preparing all the existing partitioned table indexes'; + PERFORM pg_catalog.fix_all_partition_shard_index_names(); + + -- great, we are done with fixing the existing wrong index names + -- so, lets remove this + UPDATE pg_dist_node_metadata + SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11'); + ELSE + RAISE DEBUG 'There are no partitioned tables that should be fixed'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 7: Return early if there are no primary worker nodes + -- We don't strictly need this step, but it gives a nicer notice message + ------------------------------------------------------------------------------------------ + DECLARE + primary_worker_node_count bigint :=0; + BEGIN + SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + + IF primary_worker_node_count = 0 THEN + RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node'; + RETURN true; + ELSE + RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 8: Do the actual metadata & object syncing to the worker nodes + -- For the "already synced" metadata nodes, we do not strictly need to + -- sync the objects & metadata, but there is no harm to do it anyway + -- it'll only cost some execution time but makes sure that we have a + -- a consistent metadata & objects across all the nodes + ------------------------------------------------------------------------------------------ + DECLARE + BEGIN + + -- this might take long depending on the number of tables & objects ... + RAISE NOTICE 'Preparing to sync the metadata to all nodes'; + + PERFORM start_metadata_sync_to_all_nodes(); + END; + + RETURN true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) + IS 'finalizes upgrade to Citus'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql index 7b7d357ff..2b4bb17f6 100644 --- a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql @@ -209,9 +209,7 @@ END; -- this might take long depending on the number of tables & objects ... RAISE NOTICE 'Preparing to sync the metadata to all nodes'; - PERFORM start_metadata_sync_to_node(nodename,nodeport) - FROM - pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + PERFORM start_metadata_sync_to_all_nodes(); END; RETURN true; diff --git a/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql new file mode 100644 index 000000000..ca886fb9a --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + RETURNS bool + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$; +COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + IS 'sync metadata to all active primary nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql new file mode 100644 index 000000000..ca886fb9a --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + RETURNS bool + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$; +COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + IS 'sync metadata to all active primary nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 6a140a569..6b4b1a351 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -490,27 +490,70 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, * coordinated transaction. Any failures aborts the coordinated transaction. */ void -SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList) +SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList, + const char *nodeUser, + List *commandList) { - int connectionFlags = REQUIRE_METADATA_CONNECTION; + if (list_length(commandList) == 0 || list_length(workerNodeList) == 0) + { + /* nothing to do */ + return; + } UseCoordinatedTransaction(); - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); + List *connectionList = NIL; - MarkRemoteTransactionCritical(workerConnection); - RemoteTransactionBeginIfNecessary(workerConnection); - - /* iterate over the commands and execute them in the same connection */ - const char *commandString = NULL; - foreach_ptr(commandString, commandList) + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) { - ExecuteCriticalRemoteCommand(workerConnection, commandString); + const char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int connectionFlags = REQUIRE_METADATA_CONNECTION; + + MultiConnection *connection = + StartNodeConnection(connectionFlags, nodeName, nodePort); + + MarkRemoteTransactionCritical(connection); + + /* + * connection can only be NULL for optional connections, which we don't + * support in this codepath. + */ + Assert((connectionFlags & OPTIONAL_CONNECTION) == 0); + Assert(connection != NULL); + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* must open transaction blocks to use intermediate results */ + RemoteTransactionsBeginIfNecessary(connectionList); + + /* + * In order to avoid round-trips per query in queryStringList, + * we join the string and send as a single command. Also, + * if there is only a single command, avoid additional call to + * StringJoin given that some strings can be quite large. + */ + char *stringToSend = (list_length(commandList) == 1) ? + linitial(commandList) : StringJoin(commandList, ';'); + + /* send commands in parallel */ + bool failOnError = true; + MultiConnection *connection = NULL; + foreach_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, stringToSend); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach_ptr(connection, connectionList) + { + ClearResults(connection, failOnError); } } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 70f56119c..5c6775c80 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -356,9 +356,10 @@ ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName, /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, - CurrentUserName(), - ddlCommandList); + WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(workerNode), + CurrentUserName(), + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -599,9 +600,8 @@ ReplicateAllReferenceTablesToNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + list_make1(workerNode), CurrentUserName(), commandList); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 7038d933c..b9e29711e 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -1296,9 +1296,6 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList, const char *currentUser = CurrentUserName(); foreach_ptr(workerNode, workerNodeList) { - const char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - /* if local node is one of the targets, acquire the lock locally */ if (workerNode->groupId == localGroupId) { @@ -1306,9 +1303,11 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList, continue; } - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, - currentUser, list_make1( - lockCommand)); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1( + workerNode), + currentUser, + list_make1( + lockCommand)); } } diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 27de1d464..e861b8a65 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -63,6 +63,7 @@ extern char *WorkerListFileName; extern char *CurrentCluster; extern bool ReplicateReferenceTablesOnActivate; +extern void ActivateNodeList(List *nodeList); extern int ActivateNode(char *nodeName, int nodePort); /* Function declarations for finding worker nodes to place shards on */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 76cd3f79c..72b16acd5 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -74,10 +74,11 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); -extern void SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( + List *workerNodeList, + const char * + nodeUser, + List *commandList); extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user); diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index 538fa31d0..76952767e 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -106,10 +106,11 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()'); (1 row) SELECT master_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly +WARNING: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- verify node is not activated SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index 7a74d91e5..7d667759d 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -13,6 +13,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +\set VERBOSITY terse +SET client_min_messages TO ERROR; CREATE TABLE t1 (id int PRIMARY KEY); SELECT create_distributed_table('t1', 'id'); create_distributed_table @@ -23,7 +25,6 @@ SELECT create_distributed_table('t1', 'id'); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); -- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -51,12 +52,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT citus_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to drop all tables in pg_dist_partition -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -64,19 +62,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").can SELECT citus_activate_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT citus_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -84,19 +79,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to populate pg_dist_node in the worker -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -104,17 +96,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Verify that coordinator knows worker does not have valid metadata SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; hasmetadata @@ -153,7 +142,6 @@ SELECT create_distributed_table('t2', 'id'); ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx SELECT citus.mitmproxy('conn.onParse(query="citus_internal_add_shard_metadata").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -179,7 +167,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()'); mitmproxy @@ -188,90 +175,27 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index bda479057..fce2bcbd2 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1191,6 +1191,20 @@ WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false); ERROR: only the 'shouldhaveshards' property can be set using this function DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; ?column? diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c03fe11ae..b5b99b9f6 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1038,7 +1038,8 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_is_coordinator() boolean | function run_command_on_coordinator(text,boolean) SETOF record -(2 rows) + | function start_metadata_sync_to_all_nodes() boolean +(3 rows) -- Test downgrade script (result should be empty) ALTER EXTENSION citus UPDATE TO '11.0-1'; diff --git a/src/test/regress/expected/propagate_extension_commands.out b/src/test/regress/expected/propagate_extension_commands.out index e7feb5221..ec900db1e 100644 --- a/src/test/regress/expected/propagate_extension_commands.out +++ b/src/test/regress/expected/propagate_extension_commands.out @@ -218,8 +218,9 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam -- adding the second node will fail as the text search template needs to be created manually SELECT 1 from master_add_node('localhost', :worker_2_port); -ERROR: text search template "public.intdict_template" does not exist +WARNING: text search template "public.intdict_template" does not exist CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- create the text search template manually on the worker \c - - - :worker_2_port SET citus.enable_metadata_sync TO false; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 9bb051301..2b5868018 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -205,6 +205,7 @@ ORDER BY 1; function run_command_on_shards(regclass,text,boolean) function run_command_on_workers(text,boolean) function shard_name(regclass,bigint) + function start_metadata_sync_to_all_nodes() function start_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer,boolean) function time_partition_range(regclass) @@ -284,5 +285,5 @@ ORDER BY 1; view columnar.stripe view pg_dist_shard_placement view time_partitions -(268 rows) +(269 rows) diff --git a/src/test/regress/sql/failure_mx_metadata_sync.sql b/src/test/regress/sql/failure_mx_metadata_sync.sql index 5dfe88585..90e882fe5 100644 --- a/src/test/regress/sql/failure_mx_metadata_sync.sql +++ b/src/test/regress/sql/failure_mx_metadata_sync.sql @@ -10,6 +10,9 @@ SET citus.shard_replication_factor TO 1; SELECT pg_backend_pid() as pid \gset SELECT citus.mitmproxy('conn.allow()'); +\set VERBOSITY terse +SET client_min_messages TO ERROR; + CREATE TABLE t1 (id int PRIMARY KEY); SELECT create_distributed_table('t1', 'id'); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); @@ -25,21 +28,21 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou SELECT citus_activate_node('localhost', :worker_2_proxy_port); -- Failure to drop all tables in pg_dist_partition -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); SELECT citus_activate_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()'); SELECT citus_activate_node('localhost', :worker_2_proxy_port); -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Failure to populate pg_dist_node in the worker -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Verify that coordinator knows worker does not have valid metadata @@ -71,9 +74,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); \c - - - :worker_2_port diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index aa359e5c7..4162ad7c6 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -484,5 +484,10 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproper DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); + -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';