diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 53697de00..15c2c62cc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -137,6 +137,7 @@ static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); +PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); @@ -193,6 +194,33 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) } +/* + * start_metadata_sync_to_all_nodes function sets hasmetadata column of + * all the primary worker nodes to true, and then activate nodes without + * replicating reference tables. + */ +Datum +start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + EnsureSuperUser(); + EnsureCoordinator(); + + List *workerNodes = ActivePrimaryNonCoordinatorNodeList(RowShareLock); + + bool prevReplicateRefTablesOnActivate = ReplicateReferenceTablesOnActivate; + SetLocalReplicateReferenceTablesOnActivate(false); + + ActivateNodeList(workerNodes); + TransactionModifiedNodeMetadata = true; + + SetLocalReplicateReferenceTablesOnActivate(prevReplicateRefTablesOnActivate); + + PG_RETURN_BOOL(true); +} + + /* * SyncNodeMetadataToNode is the internal API for * start_metadata_sync_to_node(). @@ -541,10 +569,10 @@ SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) */ if (raiseOnError) { - SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1( + workerNode), + currentUser, + recreateMetadataSnapshotCommandList); return true; } else diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index ab430f6f5..da0656cc2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -106,9 +106,9 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SyncDistributedObjectsToNode(WorkerNode *workerNode); +static void SyncDistributedObjectsToNodeList(List *workerNodeList); static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode); -static void SyncPgDistTableMetadataToNode(WorkerNode *workerNode); +static void SyncPgDistTableMetadataToNodeList(List *nodeList); static List * InterTableRelationshipCommandList(); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -843,7 +843,7 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode) /* - * SyncDistributedObjectsToNode sync the distributed objects to the node. It includes + * SyncDistributedObjectsToNodeList sync the distributed objects to the node. It includes * - All dependencies (e.g., types, schemas, sequences) * - All shell distributed table * - Inter relation between those shell tables @@ -852,17 +852,29 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode) * since all the dependencies should be present in the coordinator already. */ static void -SyncDistributedObjectsToNode(WorkerNode *workerNode) +SyncDistributedObjectsToNodeList(List *workerNodeList) { - if (NodeIsCoordinator(workerNode)) + List *workerNodesToSync = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) { - /* coordinator has all the objects */ - return; + if (NodeIsCoordinator(workerNode)) + { + /* coordinator has all the objects */ + continue; + } + + if (!NodeIsPrimary(workerNode)) + { + /* secondary nodes gets the objects from their primaries via replication */ + continue; + } + + workerNodesToSync = lappend(workerNodesToSync, workerNode); } - if (!NodeIsPrimary(workerNode)) + if (workerNodesToSync == NIL) { - /* secondary nodes gets the objects from their primaries via replication */ return; } @@ -874,9 +886,8 @@ SyncDistributedObjectsToNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + workerNodesToSync, CurrentUserName(), commandList); } @@ -894,9 +905,8 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + list_make1(workerNode), CurrentUserName(), commandList); } @@ -904,25 +914,33 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode) /* - * SyncPgDistTableMetadataToNode syncs the pg_dist_partition, pg_dist_shard + * SyncPgDistTableMetadataToNodeList syncs the pg_dist_partition, pg_dist_shard * pg_dist_placement and pg_dist_object metadata entries. * */ static void -SyncPgDistTableMetadataToNode(WorkerNode *workerNode) +SyncPgDistTableMetadataToNodeList(List *nodeList) { - if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) - { - List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, - CurrentUserName(), - syncPgDistMetadataCommandList); + List *syncPgDistMetadataCommandList = PgDistTableMetadataSyncCommandList(); + + List *nodesWithMetadata = NIL; + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode)) + { + nodesWithMetadata = lappend(nodesWithMetadata, workerNode); + } } + + + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + nodesWithMetadata, + CurrentUserName(), + syncPgDistMetadataCommandList); } @@ -1118,15 +1136,14 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) /* - * ActivateNode activates the node with nodeName and nodePort. Currently, activation - * includes only replicating the reference tables and setting isactive column of the - * given node. + * ActivateNodeList iterates over the nodeList and activates the nodes. + * Some part of the node activation is done parallel across the nodes, + * such as syncing the metadata. However, reference table replication is + * done one by one across nodes. */ -int -ActivateNode(char *nodeName, int nodePort) +void +ActivateNodeList(List *nodeList) { - bool isActive = true; - /* * We currently require the object propagation to happen via superuser, * see #5139. While activating a node, we sync both metadata and object @@ -1143,86 +1160,130 @@ ActivateNode(char *nodeName, int nodePort) /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); - /* - * First, locally mark the node is active, if everything goes well, - * we are going to sync this information to all the metadata nodes. - */ - WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeName, nodePort); - if (workerNode == NULL) - { - ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); - } - /* - * Delete existing reference and replicated table placements on the - * given groupId if the group has been disabled earlier (e.g., isActive - * set to false). - * - * Sync the metadata changes to all existing metadata nodes irrespective - * of the current nodes' metadata sync state. We expect all nodes up - * and running when another node is activated. - */ - if (!workerNode->isActive && NodeIsPrimary(workerNode)) - { - bool localOnly = false; - DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - localOnly); - } - - workerNode = - SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, - BoolGetDatum(isActive)); - - /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ - bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); - - if (syncMetadata) + List *nodeToSyncMetadata = NIL; + WorkerNode *node = NULL; + foreach_ptr(node, nodeList) { /* - * We are going to sync the metadata anyway in this transaction, so do - * not fail just because the current metadata is not synced. + * First, locally mark the node is active, if everything goes well, + * we are going to sync this information to all the metadata nodes. */ - SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, - BoolGetDatum(true)); - - /* - * Update local group id first, as object dependency logic requires to have - * updated local group id. - */ - UpdateLocalGroupIdOnNode(workerNode); - - /* - * Sync distributed objects first. We must sync distributed objects before - * replicating reference tables to the remote node, as reference tables may - * need such objects. - */ - SyncDistributedObjectsToNode(workerNode); - - /* - * We need to replicate reference tables before syncing node metadata, otherwise - * reference table replication logic would try to get lock on the new node before - * having the shard placement on it - */ - if (ReplicateReferenceTablesOnActivate) + WorkerNode *workerNode = + FindWorkerNodeAnyCluster(node->workerName, node->workerPort); + if (workerNode == NULL) { - ReplicateAllReferenceTablesToNode(workerNode); + ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", node->workerName, + node->workerPort))); } - /* - * Sync node metadata. We must sync node metadata before syncing table - * related pg_dist_xxx metadata. Since table related metadata requires - * to have right pg_dist_node entries. - */ - SyncNodeMetadataToNode(nodeName, nodePort); + /* both nodes should be the same */ + Assert(workerNode->nodeId == node->nodeId); /* - * As the last step, sync the table related metadata to the remote node. - * We must handle it as the last step because of limitations shared with - * above comments. + * Delete existing reference and replicated table placements on the + * given groupId if the group has been disabled earlier (e.g., isActive + * set to false). + * + * Sync the metadata changes to all existing metadata nodes irrespective + * of the current nodes' metadata sync state. We expect all nodes up + * and running when another node is activated. */ - SyncPgDistTableMetadataToNode(workerNode); + if (!workerNode->isActive && NodeIsPrimary(workerNode)) + { + bool localOnly = false; + DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, + localOnly); + } + + workerNode = + SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, + BoolGetDatum(true)); + + /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ + bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); + if (syncMetadata) + { + /* + * We are going to sync the metadata anyway in this transaction, so do + * not fail just because the current metadata is not synced. + */ + SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, + BoolGetDatum(true)); + + /* + * Update local group id first, as object dependency logic requires to have + * updated local group id. + */ + UpdateLocalGroupIdOnNode(workerNode); + + nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode); + } } + /* + * Sync distributed objects first. We must sync distributed objects before + * replicating reference tables to the remote node, as reference tables may + * need such objects. + */ + SyncDistributedObjectsToNodeList(nodeToSyncMetadata); + + if (ReplicateReferenceTablesOnActivate) + { + foreach_ptr(node, nodeList) + { + /* + * We need to replicate reference tables before syncing node metadata, otherwise + * reference table replication logic would try to get lock on the new node before + * having the shard placement on it + */ + if (NodeIsPrimary(node)) + { + ReplicateAllReferenceTablesToNode(node); + } + } + } + + /* + * Sync node metadata. We must sync node metadata before syncing table + * related pg_dist_xxx metadata. Since table related metadata requires + * to have right pg_dist_node entries. + */ + foreach_ptr(node, nodeToSyncMetadata) + { + SyncNodeMetadataToNode(node->workerName, node->workerPort); + } + + /* + * As the last step, sync the table related metadata to the remote node. + * We must handle it as the last step because of limitations shared with + * above comments. + */ + SyncPgDistTableMetadataToNodeList(nodeToSyncMetadata); + + foreach_ptr(node, nodeList) + { + bool isActive = true; + + /* finally, let all other active metadata nodes to learn about this change */ + SetNodeState(node->workerName, node->workerPort, isActive); + } +} + + +/* + * ActivateNode activates the node with nodeName and nodePort. Currently, activation + * includes only replicating the reference tables and setting isactive column of the + * given node. + */ +int +ActivateNode(char *nodeName, int nodePort) +{ + bool isActive = true; + + WorkerNode *workreNode = ModifiableWorkerNode(nodeName, nodePort); + ActivateNodeList(list_make1(workreNode)); + /* finally, let all other active metadata nodes to learn about this change */ WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); Assert(newWorkerNode->nodeId == workerNode->nodeId); diff --git a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql index 94f8ff226..0aa5d10bd 100644 --- a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql +++ b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql @@ -3,3 +3,5 @@ #include "udfs/citus_disable_node/11.0-2.sql" #include "udfs/citus_is_coordinator/11.0-2.sql" #include "udfs/run_command_on_coordinator/11.0-2.sql" +#include "udfs/start_metadata_sync_to_all_nodes/11.0-2.sql" +#include "udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql index 270bbb52e..6569f4bbc 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql @@ -13,3 +13,6 @@ REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC DROP FUNCTION pg_catalog.citus_is_coordinator(); DROP FUNCTION pg_catalog.run_command_on_coordinator(text,boolean); + +DROP FUNCTION pg_catalog.start_metadata_sync_to_all_nodes(); +DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(boolean); diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql new file mode 100644 index 000000000..2b4bb17f6 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-2.sql @@ -0,0 +1,221 @@ +-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures +-- the upgrade to Citus 11 is finished successfully. Upgrade to +-- Citus 11 requires all active primary worker nodes to get the +-- metadata. And, this function's job is to sync the metadata to +-- the nodes that does not already have +-- once the function finishes without any errors and returns true +-- the cluster is ready for running distributed queries from +-- the worker nodes. When debug is enabled, the function provides +-- more information to the user. +CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true) + RETURNS bool + LANGUAGE plpgsql + AS $$ +BEGIN + + --------------------------------------------- + -- This script consists of N stages + -- Each step is documented, and if log level + -- is reduced to DEBUG1, each step is logged + -- as well + --------------------------------------------- + +------------------------------------------------------------------------------------------ + -- STAGE 0: Ensure no concurrent node metadata changing operation happens while this + -- script is running via acquiring a strong lock on the pg_dist_node +------------------------------------------------------------------------------------------ +BEGIN + LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT; + + EXCEPTION WHEN OTHERS THEN + RAISE 'Another node metadata changing operation is in progress, try again.'; +END; + +------------------------------------------------------------------------------------------ + -- STAGE 1: We want all the commands to run in the same transaction block. Without + -- sequential mode, metadata syncing cannot be done in a transaction block along with + -- other commands +------------------------------------------------------------------------------------------ + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + +------------------------------------------------------------------------------------------ + -- STAGE 2: Ensure we have the prerequisites + -- (a) only superuser can run this script + -- (b) cannot be executed when enable_ddl_propagation is False + -- (c) can only be executed from the coordinator +------------------------------------------------------------------------------------------ +DECLARE + is_superuser_running boolean := False; + enable_ddl_prop boolean:= False; + local_group_id int := 0; +BEGIN + SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user; + IF is_superuser_running IS NOT True THEN + RAISE EXCEPTION 'This operation can only be initiated by superuser'; + END IF; + + SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop; + IF enable_ddl_prop IS NOT True THEN + RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.'; + END IF; + + SELECT groupid INTO local_group_id FROM pg_dist_local_group; + + IF local_group_id != 0 THEN + RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.'; + ELSE + RAISE DEBUG 'We are on the coordinator, continue to sync metadata'; + END IF; +END; + + + ------------------------------------------------------------------------------------------ + -- STAGE 3: Ensure all primary nodes are active + ------------------------------------------------------------------------------------------ + DECLARE + primary_disabled_worker_node_count int := 0; + BEGIN + SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node + WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive; + + IF primary_disabled_worker_node_count != 0 THEN + RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.' + 'Use SELECT citus_activate_node() to activate the disabled nodes'; + ELSE + RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 4: Ensure there is no connectivity issues in the cluster + ------------------------------------------------------------------------------------------ + DECLARE + all_nodes_can_connect_to_each_other boolean := False; + BEGIN + SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health(); + + IF all_nodes_can_connect_to_each_other != True THEN + RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all ' + 'nodes are up and runnnig. Also, make sure that all nodes can connect ' + 'to each other. Use SELECT * FROM citus_check_cluster_node_health(); ' + 'to check the cluster health'; + ELSE + RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 5: Ensure all nodes are on the same version + ------------------------------------------------------------------------------------------ + DECLARE + coordinator_version text := ''; + worker_node_version text := ''; + worker_node_version_count int := 0; + + BEGIN + SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus'; + + -- first, check if all nodes have the same versions + SELECT + count(distinct result) INTO worker_node_version_count + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'''); + IF enforce_version_check AND worker_node_version_count != 1 THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'some of the workers have different versions.'; + ELSE + RAISE DEBUG 'All worker nodes have the same Citus version'; + END IF; + + -- second, check if all nodes have the same versions + SELECT + result INTO worker_node_version + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + + IF enforce_version_check AND coordinator_version != worker_node_version THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the coordinator has version % and the worker(s) has %', + coordinator_version, worker_node_version; + ELSE + RAISE DEBUG 'All nodes have the same Citus version'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 6: Ensure all the partitioned tables have the proper naming structure + -- As described on https://github.com/citusdata/citus/issues/4962 + -- existing indexes on partitioned distributed tables can collide + -- with the index names exists on the shards + -- luckily, we know how to fix it. + -- And, note that we should do this even if the cluster is a basic plan + -- (e.g., single node Citus) such that when cluster scaled out, everything + -- works as intended + -- And, this should be done only ONCE for a cluster as it can be a pretty + -- time consuming operation. Thus, even if the function is called multiple time, + -- we keep track of it and do not re-execute this part if not needed. + ------------------------------------------------------------------------------------------ + DECLARE + partitioned_table_exists_pre_11 boolean:=False; + BEGIN + + -- we recorded if partitioned tables exists during upgrade to Citus 11 + SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11 + FROM pg_dist_node_metadata; + + IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN + + -- this might take long depending on the number of partitions and shards... + RAISE NOTICE 'Preparing all the existing partitioned table indexes'; + PERFORM pg_catalog.fix_all_partition_shard_index_names(); + + -- great, we are done with fixing the existing wrong index names + -- so, lets remove this + UPDATE pg_dist_node_metadata + SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11'); + ELSE + RAISE DEBUG 'There are no partitioned tables that should be fixed'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 7: Return early if there are no primary worker nodes + -- We don't strictly need this step, but it gives a nicer notice message + ------------------------------------------------------------------------------------------ + DECLARE + primary_worker_node_count bigint :=0; + BEGIN + SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + + IF primary_worker_node_count = 0 THEN + RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node'; + RETURN true; + ELSE + RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 8: Do the actual metadata & object syncing to the worker nodes + -- For the "already synced" metadata nodes, we do not strictly need to + -- sync the objects & metadata, but there is no harm to do it anyway + -- it'll only cost some execution time but makes sure that we have a + -- a consistent metadata & objects across all the nodes + ------------------------------------------------------------------------------------------ + DECLARE + BEGIN + + -- this might take long depending on the number of tables & objects ... + RAISE NOTICE 'Preparing to sync the metadata to all nodes'; + + PERFORM start_metadata_sync_to_all_nodes(); + END; + + RETURN true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) + IS 'finalizes upgrade to Citus'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql index 7b7d357ff..2b4bb17f6 100644 --- a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql @@ -209,9 +209,7 @@ END; -- this might take long depending on the number of tables & objects ... RAISE NOTICE 'Preparing to sync the metadata to all nodes'; - PERFORM start_metadata_sync_to_node(nodename,nodeport) - FROM - pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + PERFORM start_metadata_sync_to_all_nodes(); END; RETURN true; diff --git a/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql new file mode 100644 index 000000000..ca886fb9a --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/11.0-2.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + RETURNS bool + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$; +COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + IS 'sync metadata to all active primary nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql new file mode 100644 index 000000000..ca886fb9a --- /dev/null +++ b/src/backend/distributed/sql/udfs/start_metadata_sync_to_all_nodes/latest.sql @@ -0,0 +1,9 @@ +CREATE OR REPLACE FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + RETURNS bool + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$start_metadata_sync_to_all_nodes$$; +COMMENT ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() + IS 'sync metadata to all active primary nodes'; + +REVOKE ALL ON FUNCTION pg_catalog.start_metadata_sync_to_all_nodes() FROM PUBLIC; diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index 6a140a569..6b4b1a351 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -490,27 +490,70 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, * coordinated transaction. Any failures aborts the coordinated transaction. */ void -SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList) +SendMetadataCommandListToWorkerListInCoordinatedTransaction(List *workerNodeList, + const char *nodeUser, + List *commandList) { - int connectionFlags = REQUIRE_METADATA_CONNECTION; + if (list_length(commandList) == 0 || list_length(workerNodeList) == 0) + { + /* nothing to do */ + return; + } UseCoordinatedTransaction(); - MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, - nodeName, nodePort, - nodeUser, NULL); + List *connectionList = NIL; - MarkRemoteTransactionCritical(workerConnection); - RemoteTransactionBeginIfNecessary(workerConnection); - - /* iterate over the commands and execute them in the same connection */ - const char *commandString = NULL; - foreach_ptr(commandString, commandList) + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerNodeList) { - ExecuteCriticalRemoteCommand(workerConnection, commandString); + const char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + int connectionFlags = REQUIRE_METADATA_CONNECTION; + + MultiConnection *connection = + StartNodeConnection(connectionFlags, nodeName, nodePort); + + MarkRemoteTransactionCritical(connection); + + /* + * connection can only be NULL for optional connections, which we don't + * support in this codepath. + */ + Assert((connectionFlags & OPTIONAL_CONNECTION) == 0); + Assert(connection != NULL); + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* must open transaction blocks to use intermediate results */ + RemoteTransactionsBeginIfNecessary(connectionList); + + /* + * In order to avoid round-trips per query in queryStringList, + * we join the string and send as a single command. Also, + * if there is only a single command, avoid additional call to + * StringJoin given that some strings can be quite large. + */ + char *stringToSend = (list_length(commandList) == 1) ? + linitial(commandList) : StringJoin(commandList, ';'); + + /* send commands in parallel */ + bool failOnError = true; + MultiConnection *connection = NULL; + foreach_ptr(connection, connectionList) + { + int querySent = SendRemoteCommand(connection, stringToSend); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach_ptr(connection, connectionList) + { + ClearResults(connection, failOnError); } } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 70f56119c..5c6775c80 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -356,9 +356,10 @@ ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName, /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, - CurrentUserName(), - ddlCommandList); + WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(workerNode), + CurrentUserName(), + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -599,9 +600,8 @@ ReplicateAllReferenceTablesToNode(WorkerNode *workerNode) /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); - SendMetadataCommandListToWorkerInCoordinatedTransaction( - workerNode->workerName, - workerNode->workerPort, + SendMetadataCommandListToWorkerListInCoordinatedTransaction( + list_make1(workerNode), CurrentUserName(), commandList); } diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index f7fef26ab..d0da443b3 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -1295,9 +1295,6 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList, const char *currentUser = CurrentUserName(); foreach_ptr(workerNode, workerNodeList) { - const char *nodeName = workerNode->workerName; - int nodePort = workerNode->workerPort; - /* if local node is one of the targets, acquire the lock locally */ if (workerNode->groupId == localGroupId) { @@ -1305,9 +1302,11 @@ AcquireDistributedLockOnRelations_Internal(List *lockRelationRecordList, continue; } - SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, - currentUser, list_make1( - lockCommand)); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1( + workerNode), + currentUser, + list_make1( + lockCommand)); } } diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 27de1d464..e861b8a65 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -63,6 +63,7 @@ extern char *WorkerListFileName; extern char *CurrentCluster; extern bool ReplicateReferenceTablesOnActivate; +extern void ActivateNodeList(List *nodeList); extern int ActivateNode(char *nodeName, int nodePort); /* Function declarations for finding worker nodes to place shards on */ diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 76cd3f79c..72b16acd5 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -74,10 +74,11 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); -extern void SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern void SendMetadataCommandListToWorkerListInCoordinatedTransaction( + List *workerNodeList, + const char * + nodeUser, + List *commandList); extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user); diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index 538fa31d0..76952767e 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -106,10 +106,11 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA").kill()'); (1 row) SELECT master_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly +WARNING: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- verify node is not activated SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index 7a74d91e5..7d667759d 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -13,6 +13,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +\set VERBOSITY terse +SET client_min_messages TO ERROR; CREATE TABLE t1 (id int PRIMARY KEY); SELECT create_distributed_table('t1', 'id'); create_distributed_table @@ -23,7 +25,6 @@ SELECT create_distributed_table('t1', 'id'); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); -- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -51,12 +52,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT citus_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to drop all tables in pg_dist_partition -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -64,19 +62,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").can SELECT citus_activate_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT citus_activate_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -84,19 +79,16 @@ SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Failure to populate pg_dist_node in the worker -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -104,17 +96,14 @@ SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -ERROR: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- Verify that coordinator knows worker does not have valid metadata SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; hasmetadata @@ -153,7 +142,6 @@ SELECT create_distributed_table('t2', 'id'); ERROR: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx SELECT citus.mitmproxy('conn.onParse(query="citus_internal_add_shard_metadata").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- @@ -179,7 +167,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()'); mitmproxy @@ -188,90 +175,27 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) ERROR: canceling statement due to user request -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); mitmproxy --------------------------------------------------------------------- (1 row) SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -NOTICE: dropping metadata on the node (localhost,9060) -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx -WARNING: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. -connection not open -connection not open -connection not open -connection not open -CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index bda479057..fce2bcbd2 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -1191,6 +1191,20 @@ WHERE logicalrelid = 'test_dist_non_colocated'::regclass GROUP BY nodeport ORDER SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproperty', false); ERROR: only the 'shouldhaveshards' property can be set using this function DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; ?column? diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index cd0dd26ff..04f825f1d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1038,7 +1038,8 @@ SELECT * FROM multi_extension.print_extension_changes(); --------------------------------------------------------------------- | function citus_is_coordinator() boolean | function run_command_on_coordinator(text,boolean) SETOF record -(2 rows) + | function start_metadata_sync_to_all_nodes() boolean +(3 rows) -- Test downgrade script (result should be empty) ALTER EXTENSION citus UPDATE TO '11.0-1'; diff --git a/src/test/regress/expected/propagate_extension_commands.out b/src/test/regress/expected/propagate_extension_commands.out index e7feb5221..ec900db1e 100644 --- a/src/test/regress/expected/propagate_extension_commands.out +++ b/src/test/regress/expected/propagate_extension_commands.out @@ -218,8 +218,9 @@ SELECT run_command_on_workers($$SELECT extversion FROM pg_extension WHERE extnam -- adding the second node will fail as the text search template needs to be created manually SELECT 1 from master_add_node('localhost', :worker_2_port); -ERROR: text search template "public.intdict_template" does not exist +WARNING: text search template "public.intdict_template" does not exist CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx -- create the text search template manually on the worker \c - - - :worker_2_port SET citus.enable_metadata_sync TO false; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index af59040d5..3a5bd4d26 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -204,6 +204,7 @@ ORDER BY 1; function run_command_on_shards(regclass,text,boolean) function run_command_on_workers(text,boolean) function shard_name(regclass,bigint) + function start_metadata_sync_to_all_nodes() function start_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer,boolean) function time_partition_range(regclass) diff --git a/src/test/regress/sql/failure_mx_metadata_sync.sql b/src/test/regress/sql/failure_mx_metadata_sync.sql index 5dfe88585..90e882fe5 100644 --- a/src/test/regress/sql/failure_mx_metadata_sync.sql +++ b/src/test/regress/sql/failure_mx_metadata_sync.sql @@ -10,6 +10,9 @@ SET citus.shard_replication_factor TO 1; SELECT pg_backend_pid() as pid \gset SELECT citus.mitmproxy('conn.allow()'); +\set VERBOSITY terse +SET client_min_messages TO ERROR; + CREATE TABLE t1 (id int PRIMARY KEY); SELECT create_distributed_table('t1', 'id'); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); @@ -25,21 +28,21 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou SELECT citus_activate_node('localhost', :worker_2_proxy_port); -- Failure to drop all tables in pg_dist_partition -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").cancel(' || :pid || ')'); SELECT citus_activate_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_partition").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill()'); SELECT citus_activate_node('localhost', :worker_2_proxy_port); -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Failure to populate pg_dist_node in the worker -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").cancel(' || :pid || ')'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()'); SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Verify that coordinator knows worker does not have valid metadata @@ -71,9 +74,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET grou SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -- Failure to delete pg_dist_node entries from the worker -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").cancel(' || :pid || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE FROM pg_dist_node").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").kill()'); SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); \c - - - :worker_2_port diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index aa359e5c7..4162ad7c6 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -484,5 +484,10 @@ SELECT * from master_set_node_property('localhost', :worker_2_port, 'bogusproper DROP TABLE test_dist, test_ref, test_dist_colocated, test_dist_non_colocated; +BEGIN; + SELECT start_metadata_sync_to_all_nodes(); +COMMIT; +SELECT start_metadata_sync_to_all_nodes(); + -- verify that at the end of this file, all primary nodes have metadata synced SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';