diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 5b1833049..c84cdcd44 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -151,10 +151,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) bool isCopyFromWorker = false; BeginOrContinueCoordinatedTransaction(); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - CoordinatedTransactionUse2PC(); - } /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -244,6 +240,7 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) uint32 connectionFlags = FOR_DML; masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort); + MarkRemoteTransactionCritical(masterConnection); ClaimConnectionExclusively(masterConnection); RemoteTransactionBeginIfNecessary(masterConnection); @@ -1834,7 +1831,10 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* keep the table metadata to avoid looking it up for every tuple */ copyDest->tableMetadata = cacheEntry; - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + BeginOrContinueCoordinatedTransaction(); + + if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || + MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { CoordinatedTransactionUse2PC(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f26cf7443..b4368076c 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -102,8 +102,6 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, CitusCopyDestReceiver *copyDest = NULL; - BeginOrContinueCoordinatedTransaction(); - partitionMethod = PartitionMethod(targetRelationId); if (partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 60f39103a..2f964d603 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -483,6 +483,7 @@ void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, bool useExclusiveConnection, bool colocatedShard) { + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedRelationId); char *placementOwner = TableOwner(distributedRelationId); bool includeSequenceDefaults = false; List *ddlCommandList = GetTableDDLEvents(distributedRelationId, @@ -509,6 +510,12 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, BeginOrContinueCoordinatedTransaction(); + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC || + cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + { + CoordinatedTransactionUse2PC(); + } + foreach(shardPlacementCell, shardPlacements) { ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell); diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 30e1be29a..f89efdf1b 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -773,7 +773,8 @@ CoordinatedRemoteTransactionsCommit(void) if (transaction->transactionState == REMOTE_TRANS_INVALID || transaction->transactionState == REMOTE_TRANS_1PC_COMMITTING || transaction->transactionState == REMOTE_TRANS_2PC_COMMITTING || - transaction->transactionState == REMOTE_TRANS_COMMITTED) + transaction->transactionState == REMOTE_TRANS_COMMITTED || + transaction->transactionState == REMOTE_TRANS_ABORTED) { continue; } diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index a5ec64d3f..3a5f25aa0 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -63,6 +63,7 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'should_commit'; SET citus.shard_replication_factor TO 2; SET citus.shard_count TO 2; SET citus.multi_shard_commit_protocol TO '2pc'; +-- create_distributed_table should add 2 recovery records (1 connection per node) CREATE TABLE test_recovery (x text); SELECT create_distributed_table('test_recovery', 'x'); create_distributed_table @@ -70,7 +71,40 @@ SELECT create_distributed_table('test_recovery', 'x'); (1 row) +SELECT count(*) FROM pg_dist_transaction; + count +------- + 2 +(1 row) + +-- create_reference_table should add another 2 recovery records +CREATE TABLE test_recovery_ref (x text); +SELECT create_reference_table('test_recovery_ref'); + create_reference_table +------------------------ + +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- plain INSERT does not use 2PC INSERT INTO test_recovery VALUES ('hello'); +SELECT count(*) FROM pg_dist_transaction; + count +------- + 0 +(1 row) + -- Committed DDL commands should write 4 transaction recovery records BEGIN; ALTER TABLE test_recovery ADD COLUMN y text; @@ -162,11 +196,33 @@ SELECT recover_prepared_transactions(); 0 (1 row) +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count ------- - 0 + 4 (1 row) -\c - - - :master_port +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- Committed COPY should write 4 transaction records +COPY test_recovery (x) FROM STDIN CSV; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +DROP TABLE test_recovery_ref; DROP TABLE test_recovery; diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 656855d43..13714446e 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -352,7 +352,12 @@ SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey -- Test copy from the worker node COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); + +-- Make sure we don't use 2PC when connecting to master, even if requested +BEGIN; +SET LOCAL citus.multi_shard_commit_protocol TO '2pc'; COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); +COMMIT; -- Test if there is no relation to copy data with the worker copy COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index d44a614af..498b7dbed 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -468,7 +468,11 @@ HINT: Consider using hash partitioning. \c - - - 57637 -- Test copy from the worker node COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); +-- Make sure we don't use 2PC when connecting to master, even if requested +BEGIN; +SET LOCAL citus.multi_shard_commit_protocol TO '2pc'; COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); +COMMIT; -- Test if there is no relation to copy data with the worker copy COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); WARNING: relation "lineitem_copy_none" does not exist diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index b566080c9..b8ec92363 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -40,9 +40,21 @@ SET citus.shard_replication_factor TO 2; SET citus.shard_count TO 2; SET citus.multi_shard_commit_protocol TO '2pc'; +-- create_distributed_table should add 2 recovery records (1 connection per node) CREATE TABLE test_recovery (x text); SELECT create_distributed_table('test_recovery', 'x'); +SELECT count(*) FROM pg_dist_transaction; + +-- create_reference_table should add another 2 recovery records +CREATE TABLE test_recovery_ref (x text); +SELECT create_reference_table('test_recovery_ref'); +SELECT count(*) FROM pg_dist_transaction; + +SELECT recover_prepared_transactions(); + +-- plain INSERT does not use 2PC INSERT INTO test_recovery VALUES ('hello'); +SELECT count(*) FROM pg_dist_transaction; -- Committed DDL commands should write 4 transaction recovery records BEGIN; @@ -78,7 +90,21 @@ INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; SELECT count(*) FROM pg_dist_transaction; SELECT recover_prepared_transactions(); -SELECT count(*) FROM pg_dist_transaction; -\c - - - :master_port +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- Committed COPY should write 4 transaction records +COPY test_recovery (x) FROM STDIN CSV; +hello-0 +hello-1 +\. + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +DROP TABLE test_recovery_ref; DROP TABLE test_recovery;