From 5f02d18ef8e215f3d318795eddfe2b435a0dad63 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 28 Jul 2021 13:15:00 +0200 Subject: [PATCH] transactional metadata sync for maintanince daemon As we use the current user to sync the metadata to the nodes with #5105 (and many other PRs), there is no reason that prevents us to use the coordinated transaction for metadata syncing. This commit also renames few functions to reflect their actual implementation. --- .../distributed/commands/dependencies.c | 10 +- .../distributed/metadata/metadata_sync.c | 76 +++++-- .../distributed/metadata/node_metadata.c | 8 +- .../distributed/operations/repair_shards.c | 16 +- .../distributed/operations/shard_cleaner.c | 6 +- .../transaction/worker_transaction.c | 97 ++++++++- .../distributed/utils/reference_table_utils.c | 8 +- src/include/distributed/worker_transaction.h | 24 ++- .../expected/failure_mx_metadata_sync.out | 18 ++ .../regress/expected/multi_metadata_sync.out | 8 +- .../expected/multi_mx_node_metadata.out | 33 ++- .../expected/start_stop_metadata_sync.out | 191 ++++++++++++++++-- src/test/regress/sql/multi_metadata_sync.sql | 2 +- .../regress/sql/multi_mx_node_metadata.sql | 22 +- .../regress/sql/start_stop_metadata_sync.sql | 95 ++++++++- 15 files changed, 528 insertions(+), 86 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 60cb56492..5ba6f8af5 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -119,9 +119,9 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) const char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), - ddlCommands); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), + ddlCommands); } } @@ -312,8 +312,8 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), ddlCommands); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b91c0f4e0..2591bb588 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -47,11 +47,13 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_shard.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -78,6 +80,7 @@ char *EnableManualMetadataChangesForUser = ""; +static void EnsureSequentialModeMetadataOperations(void); static List * GetDistributedTableDDLEvents(Oid relationId); static char * LocalGroupIdUpdateCommand(int32 groupId); static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, @@ -175,7 +178,7 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) EnsureSuperUser(); EnsureModificationsCanRun(); - PreventInTransactionBlock(true, "start_metadata_sync_to_node"); + EnsureSequentialModeMetadataOperations(); LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -222,6 +225,50 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) } +/* + * EnsureSequentialModeMetadataOperations makes sure that the current transaction is + * already in sequential mode, or can still safely be put in sequential mode, + * it errors if that is not possible. The error contains information for the user to + * retry the transaction with sequential mode set from the begining. + * + * Metadata objects (e.g., distributed table on the workers) exists only 1 instance of + * the type used by potentially multiple other shards/connections. To make sure all + * shards/connections in the transaction can interact with the metadata needs to be + * visible on all connections used by the transaction, meaning we can only use 1 + * connection per node. + */ +static void +EnsureSequentialModeMetadataOperations(void) +{ + if (!IsTransactionBlock()) + { + /* we do not need to switch to sequential mode if we are not in a transaction */ + return; + } + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg( + "cannot execute metadata syncing operation because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail("When modifying metadata, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail("Metadata synced or stopped syncing. To make " + "sure subsequent commands see the metadata correctly " + "we need to make sure to use only one connection for " + "all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} + + /* * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * to false in pg_dist_node table, thus indicating that the specified worker node does not @@ -233,7 +280,6 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); - PreventInTransactionBlock(true, "stop_metadata_sync_to_node"); text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); @@ -376,19 +422,19 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) */ if (raiseOnError) { - SendCommandListToWorkerInSingleTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + recreateMetadataSnapshotCommandList); return true; } else { bool success = - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendOptionalCommandListToWorkerInCoordinatedTransaction( + workerNode->workerName, workerNode->workerPort, + currentUser, recreateMetadataSnapshotCommandList); + return success; } } @@ -401,6 +447,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) static void DropMetadataSnapshotOnNode(WorkerNode *workerNode) { + EnsureSequentialModeMetadataOperations(); + char *userName = CurrentUserName(); /* generate the queries which drop the metadata */ @@ -409,10 +457,10 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) dropMetadataCommandList = lappend(dropMetadataCommandList, LocalGroupIdUpdateCommand(0)); - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - userName, - dropMetadataCommandList); + SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + userName, + dropMetadataCommandList); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5abc62e43..284d78944 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -615,10 +615,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); /* send commands to new workers*/ - SendCommandListToWorkerInSingleTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CitusExtensionOwnerName(), + ddlCommands); } } diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index c4f16f6b2..2784e6958 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -753,8 +753,8 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode } EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, - ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, + ddlCommandList); /* after successful repair, we update shard state as healthy*/ List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId); @@ -954,8 +954,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); ddlCommandList = NIL; @@ -973,8 +973,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ddlCommandList, PostLoadShardCreationCommandList(shardInterval, sourceNodeName, sourceNodePort)); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); MemoryContextReset(localContext); } @@ -1007,8 +1007,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, commandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, commandList); MemoryContextReset(localContext); } diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index afa206594..5423e2745 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -275,9 +275,9 @@ TryDropShard(GroupShardPlacement *placement) /* remove the shard from the node */ bool success = - SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName, - shardPlacement->nodePort, - NULL, dropCommandList); + SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName, + shardPlacement->nodePort, + NULL, dropCommandList); if (success) { /* delete the actual placement */ diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a10530ea6..cece88323 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -434,14 +434,14 @@ EnsureNoModificationsHaveBeenDone() /* - * SendCommandListToWorkerInSingleTransaction opens connection to the node with the given - * nodeName and nodePort. Then, the connection starts a transaction on the remote - * node and executes the commands in the transaction. The function raises error if - * any of the queries fails. + * SendCommandListToWorkerOutsideTransaction forces to open a new connection + * to the node with the given nodeName and nodePort. Then, the connection starts + * a transaction on the remote node and executes the commands in the transaction. + * The function raises error if any of the queries fails. */ void -SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) { int connectionFlags = FORCE_NEW_CONNECTION; @@ -465,13 +465,43 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, /* - * SendOptionalCommandListToWorkerInTransaction sends the given command list to - * the given worker in a single transaction. If any of the commands fail, it - * rollbacks the transaction, and otherwise commits. + * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node + * with the given nodeName and nodePort. The commands are sent as part of the + * coordinated transaction. Any failures aborts the coordinated transaction. + */ +void +SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = 0; + + UseCoordinatedTransaction(); + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBeginIfNecessary(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + ExecuteCriticalRemoteCommand(workerConnection, commandString); + } +} + + +/* + * SendOptionalCommandListToWorkerOutsideTransaction sends the given command + * list to the given worker in a single transaction that is outside of the + * coordinated tranaction. If any of the commands fail, it rollbacks the + * transaction, and otherwise commits. */ bool -SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) { int connectionFlags = FORCE_NEW_CONNECTION; bool failed = false; @@ -511,6 +541,51 @@ SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePor } +/* + * SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given + * command list to the given worker as part of the coordinated transaction. + * If any of the commands fail, the function returns false. + */ +bool +SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 + nodePort, + const char *nodeUser, + List *commandList) +{ + int connectionFlags = 0; + bool failed = false; + + UseCoordinatedTransaction(); + + MultiConnection *workerConnection = + GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) + { + return false; + } + + RemoteTransactionsBeginIfNecessary(list_make1(workerConnection)); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != + RESPONSE_OKAY) + { + failed = true; + + bool raiseErrors = false; + MarkRemoteTransactionFailed(workerConnection, raiseErrors); + break; + } + } + + return !failed; +} + + /* * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given * metadata nodes are out of sync. It is safer to avoid metadata changing diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index cafa2b328..a35e0ef57 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -370,8 +370,8 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) nodePort))); EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -570,8 +570,8 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) char *tableOwner = TableOwner(shardInterval->relationId); List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - commandList); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, + commandList); } } } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a0f2bede0..14daee4ef 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -37,17 +37,25 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser, const char *command); extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser, const char *command); -extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 - nodePort, - const char *nodeUser, - List *commandList); +extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); +extern bool SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); -extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); +extern void SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user); diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index d4b46498a..fbb35c215 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -187,6 +187,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -216,6 +222,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -245,6 +257,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 1abc2d45b..26fbec0b2 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -470,11 +470,15 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': 1 (1 row) --- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +-- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked \c - - - :master_port BEGIN; SELECT start_metadata_sync_to_node('localhost', :worker_2_port); -ERROR: start_metadata_sync_to_node cannot run inside a transaction block + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + ROLLBACK; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; hasmetadata diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index c1b2885b4..eea9cd5bb 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -338,13 +338,42 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; --------------------------------------------------------------------- -- Test updating a node when another node is in readonly-mode --------------------------------------------------------------------- -SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset -SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); +-- first, add node and sync metadata in the same transaction +CREATE TYPE some_type AS (a int, b int); +CREATE TABLE some_ref_table (a int, b some_type); +SELECT create_reference_table('some_ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; +BEGIN; + SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset + SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) + -- and modifications can be read from any worker in the same transaction + INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT count(*) FROM some_ref_table; + count +--------------------------------------------------------------------- + 22 +(1 row) + + SELECT count(*) FROM some_ref_table; + count +--------------------------------------------------------------------- + 22 +(1 row) + +COMMIT; +DROP TABLE some_ref_table; +DROP TYPE some_type; -- Create a table with shards on both nodes CREATE TABLE dist_table_2(a int); SELECT create_distributed_table('dist_table_2', 'a'); diff --git a/src/test/regress/expected/start_stop_metadata_sync.out b/src/test/regress/expected/start_stop_metadata_sync.out index b460595f6..accfd9ae5 100644 --- a/src/test/regress/expected/start_stop_metadata_sync.out +++ b/src/test/regress/expected/start_stop_metadata_sync.out @@ -91,11 +91,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar'); (1 row) VACUUM (FREEZE, ANALYZE) events_2021_jan; --- this should fail -BEGIN; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -ERROR: start_metadata_sync_to_node cannot run inside a transaction block -ROLLBACK; -- sync metadata SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -125,7 +120,7 @@ SELECT * FROM test_matview; (1 row) SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text; - logicalrelid | partmethod | partkey | colocationid | repmodel + logicalrelid | partmethod | partkey | colocationid | repmodel --------------------------------------------------------------------- events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s @@ -164,18 +159,15 @@ SELECT * FROM distributed_table_1; (0 rows) ALTER TABLE distributed_table_4 DROP COLUMN b; --- this should fail BEGIN; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -ERROR: stop_metadata_sync_to_node cannot run inside a transaction block -ROLLBACK; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); NOTICE: dropping metadata on the node (localhost,57637) stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) +COMMIT; SELECT * FROM test_view; count --------------------------------------------------------------------- @@ -245,12 +237,14 @@ SELECT * FROM distributed_table_1; --------------------------------------------------------------------- (0 rows) -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node --------------------------------------------------------------------- (1 row) +COMMIT; \c - - - :worker_1_port SELECT count(*) > 0 FROM pg_dist_node; ?column? @@ -278,7 +272,180 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; +-- both start & stop metadata sync operations can be transactional +BEGIN; + -- sync the same node multiple times + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + -- sync the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT start_metadata_sync_to_node(name,port) FROM nodes; + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + -- stop the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +\c - - - :worker_1_port +SELECT count(*) > 0 FROM pg_dist_node; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_dist_shard; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + ?column? +--------------------------------------------------------------------- + f +(1 row) + +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; +-- start metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +-- stop metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +-- multi-connection commands are not allowed with start_metadata_sync +BEGIN; + SET citus.force_max_query_parallelization TO ON; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ERROR: cannot execute metadata syncing operation because there was a parallel operation on a distributed table in the transaction +DETAIL: When modifying metadata, Citus needs to perform all operations over a single connection per node to ensure consistency. +HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" +ROLLBACK; +-- this is safe because start_metadata_sync_to_node already switches to +-- sequential execution +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- multi-shard commands are allowed with start_metadata_sync +-- as long as the start_metadata_sync_to_node executed +-- when it is OK to switch to sequential execution +BEGIN; + -- sync at the start of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + SET citus.multi_shard_modify_mode TO sequential; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + ALTER TABLE test_table ADD COLUMN B INT; + INSERT INTO test_table SELECT i,i From generate_series(0,100)i; + SELECT count(*) FROM test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + + ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; + SELECT count(*) FROM distributed_table_3; + count +--------------------------------------------------------------------- + 1 +(1 row) + + -- sync at the end of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; -- cleanup +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); NOTICE: dropping metadata on the node (localhost,57637) stop_metadata_sync_to_node diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 975625ceb..ab5247dd4 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -168,7 +168,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE relid = 'mx_testing_schema.mx_index'::regclass; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; --- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +-- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked \c - - - :master_port BEGIN; SELECT start_metadata_sync_to_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 990f2f6da..c5625a59e 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -162,8 +162,26 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; -------------------------------------------------------------------------- -- Test updating a node when another node is in readonly-mode -------------------------------------------------------------------------- -SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset -SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + +-- first, add node and sync metadata in the same transaction +CREATE TYPE some_type AS (a int, b int); +CREATE TABLE some_ref_table (a int, b some_type); +SELECT create_reference_table('some_ref_table'); +INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + +BEGIN; + SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset + SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + + -- and modifications can be read from any worker in the same transaction + INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT count(*) FROM some_ref_table; + SELECT count(*) FROM some_ref_table; +COMMIT; + +DROP TABLE some_ref_table; +DROP TYPE some_type; -- Create a table with shards on both nodes CREATE TABLE dist_table_2(a int); diff --git a/src/test/regress/sql/start_stop_metadata_sync.sql b/src/test/regress/sql/start_stop_metadata_sync.sql index 5a2c97f0e..2fbdd1ec7 100644 --- a/src/test/regress/sql/start_stop_metadata_sync.sql +++ b/src/test/regress/sql/start_stop_metadata_sync.sql @@ -68,11 +68,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar'); VACUUM (FREEZE, ANALYZE) events_2021_jan; --- this should fail -BEGIN; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -ROLLBACK; - -- sync metadata SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -93,12 +88,10 @@ SET search_path TO "start_stop_metadata_sync"; SELECT * FROM distributed_table_1; ALTER TABLE distributed_table_4 DROP COLUMN b; --- this should fail BEGIN; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -ROLLBACK; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +COMMIT; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT * FROM test_view; SELECT * FROM test_matview; SELECT count(*) > 0 FROM pg_dist_node; @@ -116,7 +109,9 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; SELECT * FROM distributed_table_1; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +COMMIT; \c - - - :worker_1_port SELECT count(*) > 0 FROM pg_dist_node; @@ -127,7 +122,87 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; +-- both start & stop metadata sync operations can be transactional +BEGIN; + -- sync the same node multiple times + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + + -- sync the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT start_metadata_sync_to_node(name,port) FROM nodes; + + -- stop the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +COMMIT; + + +\c - - - :worker_1_port +SELECT count(*) > 0 FROM pg_dist_node; +SELECT count(*) > 0 FROM pg_dist_shard; +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; + +-- start metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + show citus.multi_shard_modify_mode; +COMMIT; + +-- stop metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + show citus.multi_shard_modify_mode; +COMMIT; + +-- multi-connection commands are not allowed with start_metadata_sync +BEGIN; + SET citus.force_max_query_parallelization TO ON; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ROLLBACK; + +-- this is safe because start_metadata_sync_to_node already switches to +-- sequential execution +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); +ROLLBACK; + +-- multi-shard commands are allowed with start_metadata_sync +-- as long as the start_metadata_sync_to_node executed +-- when it is OK to switch to sequential execution +BEGIN; + -- sync at the start of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + SET citus.multi_shard_modify_mode TO sequential; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + ALTER TABLE test_table ADD COLUMN B INT; + INSERT INTO test_table SELECT i,i From generate_series(0,100)i; + SELECT count(*) FROM test_table; + ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; + SELECT count(*) FROM distributed_table_3; + -- sync at the end of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ROLLBACK; + -- cleanup +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SET client_min_messages TO WARNING;