diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index 60cb56492..5ba6f8af5 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -119,9 +119,9 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) const char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), - ddlCommands); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), + ddlCommands); } } @@ -312,8 +312,8 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, + CitusExtensionOwnerName(), ddlCommands); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index b91c0f4e0..2591bb588 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -47,11 +47,13 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_shard.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/worker_manager.h" @@ -78,6 +80,7 @@ char *EnableManualMetadataChangesForUser = ""; +static void EnsureSequentialModeMetadataOperations(void); static List * GetDistributedTableDDLEvents(Oid relationId); static char * LocalGroupIdUpdateCommand(int32 groupId); static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, @@ -175,7 +178,7 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) EnsureSuperUser(); EnsureModificationsCanRun(); - PreventInTransactionBlock(true, "start_metadata_sync_to_node"); + EnsureSequentialModeMetadataOperations(); LockRelationOid(DistNodeRelationId(), ExclusiveLock); @@ -222,6 +225,50 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort) } +/* + * EnsureSequentialModeMetadataOperations makes sure that the current transaction is + * already in sequential mode, or can still safely be put in sequential mode, + * it errors if that is not possible. The error contains information for the user to + * retry the transaction with sequential mode set from the begining. + * + * Metadata objects (e.g., distributed table on the workers) exists only 1 instance of + * the type used by potentially multiple other shards/connections. To make sure all + * shards/connections in the transaction can interact with the metadata needs to be + * visible on all connections used by the transaction, meaning we can only use 1 + * connection per node. + */ +static void +EnsureSequentialModeMetadataOperations(void) +{ + if (!IsTransactionBlock()) + { + /* we do not need to switch to sequential mode if we are not in a transaction */ + return; + } + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg( + "cannot execute metadata syncing operation because there was a " + "parallel operation on a distributed table in the " + "transaction"), + errdetail("When modifying metadata, Citus needs to " + "perform all operations over a single connection per " + "node to ensure consistency."), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail("Metadata synced or stopped syncing. To make " + "sure subsequent commands see the metadata correctly " + "we need to make sure to use only one connection for " + "all future commands"))); + SetLocalMultiShardModifyModeToSequential(); +} + + /* * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * to false in pg_dist_node table, thus indicating that the specified worker node does not @@ -233,7 +280,6 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); - PreventInTransactionBlock(true, "stop_metadata_sync_to_node"); text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); @@ -376,19 +422,19 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) */ if (raiseOnError) { - SendCommandListToWorkerInSingleTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + recreateMetadataSnapshotCommandList); return true; } else { bool success = - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendOptionalCommandListToWorkerInCoordinatedTransaction( + workerNode->workerName, workerNode->workerPort, + currentUser, recreateMetadataSnapshotCommandList); + return success; } } @@ -401,6 +447,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) static void DropMetadataSnapshotOnNode(WorkerNode *workerNode) { + EnsureSequentialModeMetadataOperations(); + char *userName = CurrentUserName(); /* generate the queries which drop the metadata */ @@ -409,10 +457,10 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) dropMetadataCommandList = lappend(dropMetadataCommandList, LocalGroupIdUpdateCommand(0)); - SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, - workerNode->workerPort, - userName, - dropMetadataCommandList); + SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + userName, + dropMetadataCommandList); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5abc62e43..284d78944 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -615,10 +615,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); /* send commands to new workers*/ - SendCommandListToWorkerInSingleTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CitusExtensionOwnerName(), + ddlCommands); } } diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index c4f16f6b2..2784e6958 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -753,8 +753,8 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode } EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, - ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner, + ddlCommandList); /* after successful repair, we update shard state as healthy*/ List *placementList = ShardPlacementListWithoutOrphanedPlacements(shardId); @@ -954,8 +954,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, sourceNodePort); char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); ddlCommandList = NIL; @@ -973,8 +973,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, ddlCommandList, PostLoadShardCreationCommandList(shardInterval, sourceNodeName, sourceNodePort)); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, ddlCommandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, ddlCommandList); MemoryContextReset(localContext); } @@ -1007,8 +1007,8 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName, } char *tableOwner = TableOwner(shardInterval->relationId); - SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, - tableOwner, commandList); + SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, + tableOwner, commandList); MemoryContextReset(localContext); } diff --git a/src/backend/distributed/operations/shard_cleaner.c b/src/backend/distributed/operations/shard_cleaner.c index afa206594..5423e2745 100644 --- a/src/backend/distributed/operations/shard_cleaner.c +++ b/src/backend/distributed/operations/shard_cleaner.c @@ -275,9 +275,9 @@ TryDropShard(GroupShardPlacement *placement) /* remove the shard from the node */ bool success = - SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName, - shardPlacement->nodePort, - NULL, dropCommandList); + SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName, + shardPlacement->nodePort, + NULL, dropCommandList); if (success) { /* delete the actual placement */ diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index a10530ea6..cece88323 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -434,14 +434,14 @@ EnsureNoModificationsHaveBeenDone() /* - * SendCommandListToWorkerInSingleTransaction opens connection to the node with the given - * nodeName and nodePort. Then, the connection starts a transaction on the remote - * node and executes the commands in the transaction. The function raises error if - * any of the queries fails. + * SendCommandListToWorkerOutsideTransaction forces to open a new connection + * to the node with the given nodeName and nodePort. Then, the connection starts + * a transaction on the remote node and executes the commands in the transaction. + * The function raises error if any of the queries fails. */ void -SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) { int connectionFlags = FORCE_NEW_CONNECTION; @@ -465,13 +465,43 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, /* - * SendOptionalCommandListToWorkerInTransaction sends the given command list to - * the given worker in a single transaction. If any of the commands fail, it - * rollbacks the transaction, and otherwise commits. + * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node + * with the given nodeName and nodePort. The commands are sent as part of the + * coordinated transaction. Any failures aborts the coordinated transaction. + */ +void +SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) +{ + int connectionFlags = 0; + + UseCoordinatedTransaction(); + + MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags, + nodeName, nodePort, + nodeUser, NULL); + + MarkRemoteTransactionCritical(workerConnection); + RemoteTransactionBeginIfNecessary(workerConnection); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + ExecuteCriticalRemoteCommand(workerConnection, commandString); + } +} + + +/* + * SendOptionalCommandListToWorkerOutsideTransaction sends the given command + * list to the given worker in a single transaction that is outside of the + * coordinated tranaction. If any of the commands fail, it rollbacks the + * transaction, and otherwise commits. */ bool -SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, + const char *nodeUser, List *commandList) { int connectionFlags = FORCE_NEW_CONNECTION; bool failed = false; @@ -511,6 +541,51 @@ SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePor } +/* + * SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given + * command list to the given worker as part of the coordinated transaction. + * If any of the commands fail, the function returns false. + */ +bool +SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 + nodePort, + const char *nodeUser, + List *commandList) +{ + int connectionFlags = 0; + bool failed = false; + + UseCoordinatedTransaction(); + + MultiConnection *workerConnection = + GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, + nodeUser, NULL); + if (PQstatus(workerConnection->pgConn) != CONNECTION_OK) + { + return false; + } + + RemoteTransactionsBeginIfNecessary(list_make1(workerConnection)); + + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, commandList) + { + if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) != + RESPONSE_OKAY) + { + failed = true; + + bool raiseErrors = false; + MarkRemoteTransactionFailed(workerConnection, raiseErrors); + break; + } + } + + return !failed; +} + + /* * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given * metadata nodes are out of sync. It is safer to avoid metadata changing diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index cafa2b328..a35e0ef57 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -370,8 +370,8 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) nodePort))); EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -570,8 +570,8 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) char *tableOwner = TableOwner(shardInterval->relationId); List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, - commandList); + SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, + commandList); } } } diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index a0f2bede0..14daee4ef 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -37,17 +37,25 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet, const char *nodeUser, const char *command); extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, const char *nodeUser, const char *command); -extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 - nodePort, - const char *nodeUser, - List *commandList); +extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); +extern bool SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); -extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); +extern void SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user); diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index d4b46498a..fbb35c215 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -187,6 +187,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -216,6 +222,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- @@ -245,6 +257,12 @@ WARNING: server closed the connection unexpectedly before or while processing the request. CONTEXT: while executing command on localhost:xxxxx WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open +CONTEXT: while executing command on localhost:xxxxx +WARNING: connection not open CONTEXT: while executing command on localhost:xxxxx stop_metadata_sync_to_node --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_metadata_sync.out b/src/test/regress/expected/multi_metadata_sync.out index 1abc2d45b..26fbec0b2 100644 --- a/src/test/regress/expected/multi_metadata_sync.out +++ b/src/test/regress/expected/multi_metadata_sync.out @@ -470,11 +470,15 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table': 1 (1 row) --- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +-- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked \c - - - :master_port BEGIN; SELECT start_metadata_sync_to_node('localhost', :worker_2_port); -ERROR: start_metadata_sync_to_node cannot run inside a transaction block + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + ROLLBACK; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; hasmetadata diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index c1b2885b4..eea9cd5bb 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -338,13 +338,42 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; --------------------------------------------------------------------- -- Test updating a node when another node is in readonly-mode --------------------------------------------------------------------- -SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset -SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); +-- first, add node and sync metadata in the same transaction +CREATE TYPE some_type AS (a int, b int); +CREATE TABLE some_ref_table (a int, b some_type); +SELECT create_reference_table('some_ref_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; +BEGIN; + SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset + SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) + -- and modifications can be read from any worker in the same transaction + INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT count(*) FROM some_ref_table; + count +--------------------------------------------------------------------- + 22 +(1 row) + + SELECT count(*) FROM some_ref_table; + count +--------------------------------------------------------------------- + 22 +(1 row) + +COMMIT; +DROP TABLE some_ref_table; +DROP TYPE some_type; -- Create a table with shards on both nodes CREATE TABLE dist_table_2(a int); SELECT create_distributed_table('dist_table_2', 'a'); diff --git a/src/test/regress/expected/start_stop_metadata_sync.out b/src/test/regress/expected/start_stop_metadata_sync.out index b460595f6..accfd9ae5 100644 --- a/src/test/regress/expected/start_stop_metadata_sync.out +++ b/src/test/regress/expected/start_stop_metadata_sync.out @@ -91,11 +91,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar'); (1 row) VACUUM (FREEZE, ANALYZE) events_2021_jan; --- this should fail -BEGIN; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -ERROR: start_metadata_sync_to_node cannot run inside a transaction block -ROLLBACK; -- sync metadata SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node @@ -125,7 +120,7 @@ SELECT * FROM test_matview; (1 row) SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text; - logicalrelid | partmethod | partkey | colocationid | repmodel + logicalrelid | partmethod | partkey | colocationid | repmodel --------------------------------------------------------------------- events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s @@ -164,18 +159,15 @@ SELECT * FROM distributed_table_1; (0 rows) ALTER TABLE distributed_table_4 DROP COLUMN b; --- this should fail BEGIN; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -ERROR: stop_metadata_sync_to_node cannot run inside a transaction block -ROLLBACK; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); NOTICE: dropping metadata on the node (localhost,57637) stop_metadata_sync_to_node --------------------------------------------------------------------- (1 row) +COMMIT; SELECT * FROM test_view; count --------------------------------------------------------------------- @@ -245,12 +237,14 @@ SELECT * FROM distributed_table_1; --------------------------------------------------------------------- (0 rows) -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); start_metadata_sync_to_node --------------------------------------------------------------------- (1 row) +COMMIT; \c - - - :worker_1_port SELECT count(*) > 0 FROM pg_dist_node; ?column? @@ -278,7 +272,180 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; +-- both start & stop metadata sync operations can be transactional +BEGIN; + -- sync the same node multiple times + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + -- sync the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT start_metadata_sync_to_node(name,port) FROM nodes; + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + -- stop the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +COMMIT; +\c - - - :worker_1_port +SELECT count(*) > 0 FROM pg_dist_node; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_dist_shard; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + ?column? +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + ?column? +--------------------------------------------------------------------- + f +(1 row) + +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; +-- start metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +-- stop metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + show citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +-- multi-connection commands are not allowed with start_metadata_sync +BEGIN; + SET citus.force_max_query_parallelization TO ON; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ERROR: cannot execute metadata syncing operation because there was a parallel operation on a distributed table in the transaction +DETAIL: When modifying metadata, Citus needs to perform all operations over a single connection per node to ensure consistency. +HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" +ROLLBACK; +-- this is safe because start_metadata_sync_to_node already switches to +-- sequential execution +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- multi-shard commands are allowed with start_metadata_sync +-- as long as the start_metadata_sync_to_node executed +-- when it is OK to switch to sequential execution +BEGIN; + -- sync at the start of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + + SET citus.multi_shard_modify_mode TO sequential; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + + ALTER TABLE test_table ADD COLUMN B INT; + INSERT INTO test_table SELECT i,i From generate_series(0,100)i; + SELECT count(*) FROM test_table; + count +--------------------------------------------------------------------- + 101 +(1 row) + + ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; + SELECT count(*) FROM distributed_table_3; + count +--------------------------------------------------------------------- + 1 +(1 row) + + -- sync at the end of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; -- cleanup +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); NOTICE: dropping metadata on the node (localhost,57637) stop_metadata_sync_to_node diff --git a/src/test/regress/sql/multi_metadata_sync.sql b/src/test/regress/sql/multi_metadata_sync.sql index 975625ceb..ab5247dd4 100644 --- a/src/test/regress/sql/multi_metadata_sync.sql +++ b/src/test/regress/sql/multi_metadata_sync.sql @@ -168,7 +168,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE relid = 'mx_testing_schema.mx_index'::regclass; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; --- Make sure that start_metadata_sync_to_node cannot be called inside a transaction +-- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked \c - - - :master_port BEGIN; SELECT start_metadata_sync_to_node('localhost', :worker_2_port); diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 990f2f6da..c5625a59e 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -162,8 +162,26 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; -------------------------------------------------------------------------- -- Test updating a node when another node is in readonly-mode -------------------------------------------------------------------------- -SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset -SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + +-- first, add node and sync metadata in the same transaction +CREATE TYPE some_type AS (a int, b int); +CREATE TABLE some_ref_table (a int, b some_type); +SELECT create_reference_table('some_ref_table'); +INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + +BEGIN; + SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset + SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); + + -- and modifications can be read from any worker in the same transaction + INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; + SET LOCAL citus.task_assignment_policy TO "round-robin"; + SELECT count(*) FROM some_ref_table; + SELECT count(*) FROM some_ref_table; +COMMIT; + +DROP TABLE some_ref_table; +DROP TYPE some_type; -- Create a table with shards on both nodes CREATE TABLE dist_table_2(a int); diff --git a/src/test/regress/sql/start_stop_metadata_sync.sql b/src/test/regress/sql/start_stop_metadata_sync.sql index 5a2c97f0e..2fbdd1ec7 100644 --- a/src/test/regress/sql/start_stop_metadata_sync.sql +++ b/src/test/regress/sql/start_stop_metadata_sync.sql @@ -68,11 +68,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar'); VACUUM (FREEZE, ANALYZE) events_2021_jan; --- this should fail -BEGIN; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -ROLLBACK; - -- sync metadata SELECT start_metadata_sync_to_node('localhost', :worker_1_port); @@ -93,12 +88,10 @@ SET search_path TO "start_stop_metadata_sync"; SELECT * FROM distributed_table_1; ALTER TABLE distributed_table_4 DROP COLUMN b; --- this should fail BEGIN; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -ROLLBACK; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); +COMMIT; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT * FROM test_view; SELECT * FROM test_matview; SELECT count(*) > 0 FROM pg_dist_node; @@ -116,7 +109,9 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; SELECT * FROM distributed_table_1; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +COMMIT; \c - - - :worker_1_port SELECT count(*) > 0 FROM pg_dist_node; @@ -127,7 +122,87 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel \c - - - :master_port SET search_path TO "start_stop_metadata_sync"; +-- both start & stop metadata sync operations can be transactional +BEGIN; + -- sync the same node multiple times + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + + -- sync the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT start_metadata_sync_to_node(name,port) FROM nodes; + + -- stop the same node in the same command + WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + 'localhost', :worker_2_port)) + SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +COMMIT; + + +\c - - - :worker_1_port +SELECT count(*) > 0 FROM pg_dist_node; +SELECT count(*) > 0 FROM pg_dist_shard; +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); +SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync'); + +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; + +-- start metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + show citus.multi_shard_modify_mode; +COMMIT; + +-- stop metadata sync sets the multi-shard modify mode to sequential +BEGIN; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); + show citus.multi_shard_modify_mode; +COMMIT; + +-- multi-connection commands are not allowed with start_metadata_sync +BEGIN; + SET citus.force_max_query_parallelization TO ON; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ROLLBACK; + +-- this is safe because start_metadata_sync_to_node already switches to +-- sequential execution +BEGIN; + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); +ROLLBACK; + +-- multi-shard commands are allowed with start_metadata_sync +-- as long as the start_metadata_sync_to_node executed +-- when it is OK to switch to sequential execution +BEGIN; + -- sync at the start of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + SET citus.multi_shard_modify_mode TO sequential; + CREATE TABLE test_table(a int); + SELECT create_distributed_table('test_table', 'a'); + ALTER TABLE test_table ADD COLUMN B INT; + INSERT INTO test_table SELECT i,i From generate_series(0,100)i; + SELECT count(*) FROM test_table; + ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; + SELECT count(*) FROM distributed_table_3; + -- sync at the end of the tx + SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +ROLLBACK; + -- cleanup +\c - - - :master_port +SET search_path TO "start_stop_metadata_sync"; + SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SET client_min_messages TO WARNING;