From 4614814de1d52fbafc911d03ebdb9ac3f8a28892 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 15 Aug 2017 11:40:25 +0200 Subject: [PATCH] Enable 2PC for INSERT...SELECT via coordinator --- src/backend/distributed/commands/multi_copy.c | 9 +++---- .../executor/insert_select_executor.c | 2 -- .../expected/multi_transaction_recovery.out | 26 +++++++++++++++++-- .../sql/multi_transaction_recovery.sql | 18 +++++++++++-- 4 files changed, 44 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 5c9a6c70a..c84cdcd44 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -151,10 +151,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) bool isCopyFromWorker = false; BeginOrContinueCoordinatedTransaction(); - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - CoordinatedTransactionUse2PC(); - } /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -1835,7 +1831,10 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* keep the table metadata to avoid looking it up for every tuple */ copyDest->tableMetadata = cacheEntry; - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + BeginOrContinueCoordinatedTransaction(); + + if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC || + MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) { CoordinatedTransactionUse2PC(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f26cf7443..b4368076c 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -102,8 +102,6 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, CitusCopyDestReceiver *copyDest = NULL; - BeginOrContinueCoordinatedTransaction(); - partitionMethod = PartitionMethod(targetRelationId); if (partitionMethod == DISTRIBUTE_BY_NONE) { diff --git a/src/test/regress/expected/multi_transaction_recovery.out b/src/test/regress/expected/multi_transaction_recovery.out index e156d3724..3a5f25aa0 100644 --- a/src/test/regress/expected/multi_transaction_recovery.out +++ b/src/test/regress/expected/multi_transaction_recovery.out @@ -196,11 +196,33 @@ SELECT recover_prepared_transactions(); 0 (1 row) +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count ------- - 0 + 4 (1 row) -\c - - - :master_port +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +-- Committed COPY should write 4 transaction records +COPY test_recovery (x) FROM STDIN CSV; +SELECT count(*) FROM pg_dist_transaction; + count +------- + 4 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +------------------------------- + 0 +(1 row) + +DROP TABLE test_recovery_ref; DROP TABLE test_recovery; diff --git a/src/test/regress/sql/multi_transaction_recovery.sql b/src/test/regress/sql/multi_transaction_recovery.sql index 29d97c799..b8ec92363 100644 --- a/src/test/regress/sql/multi_transaction_recovery.sql +++ b/src/test/regress/sql/multi_transaction_recovery.sql @@ -90,7 +90,21 @@ INSERT INTO test_recovery SELECT x, 'earth' FROM test_recovery; SELECT count(*) FROM pg_dist_transaction; SELECT recover_prepared_transactions(); -SELECT count(*) FROM pg_dist_transaction; -\c - - - :master_port +-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records +INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- Committed COPY should write 4 transaction records +COPY test_recovery (x) FROM STDIN CSV; +hello-0 +hello-1 +\. + +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +DROP TABLE test_recovery_ref; DROP TABLE test_recovery;