diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 70c08ca8f..79ba3c96c 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -482,25 +482,27 @@ UnlockBackendSharedMemory(void) /* * GetCurrentDistributedTransactionId reads the backend's distributed transaction id and * returns a copy of it. + * + * When called from a parallel worker, it uses the parent's transaction ID per the logic + * in GetBackendDataForProc. */ DistributedTransactionId * GetCurrentDistributedTransactionId(void) { DistributedTransactionId *currentDistributedTransactionId = (DistributedTransactionId *) palloc(sizeof(DistributedTransactionId)); + BackendData backendData; - SpinLockAcquire(&MyBackendData->mutex); + GetBackendDataForProc(MyProc, &backendData); currentDistributedTransactionId->initiatorNodeIdentifier = - MyBackendData->transactionId.initiatorNodeIdentifier; + backendData.transactionId.initiatorNodeIdentifier; currentDistributedTransactionId->transactionOriginator = - MyBackendData->transactionId.transactionOriginator; + backendData.transactionId.transactionOriginator; currentDistributedTransactionId->transactionNumber = - MyBackendData->transactionId.transactionNumber; + backendData.transactionId.transactionNumber; currentDistributedTransactionId->timestamp = - MyBackendData->transactionId.timestamp; - - SpinLockRelease(&MyBackendData->mutex); + backendData.transactionId.timestamp; return currentDistributedTransactionId; } diff --git a/src/test/regress/expected/multi_distributed_transaction_id.out b/src/test/regress/expected/multi_distributed_transaction_id.out index 01328c153..7c04d1c56 100644 --- a/src/test/regress/expected/multi_distributed_transaction_id.out +++ b/src/test/regress/expected/multi_distributed_transaction_id.out @@ -125,3 +125,41 @@ SELECT initiator_node_identifier, transaction_number, transaction_stamp, (proces ROLLBACK PREPARED 'dist_xact_id_test'; -- set back to the original zone SET TIME ZONE DEFAULT; +-- parallel safe wrapper for getting the current transaction number +CREATE OR REPLACE FUNCTION parallel_worker_transaction_id_test() +RETURNS bigint STRICT VOLATILE PARALLEL SAFE AS $$ + SELECT transaction_number FROM get_current_transaction_id(); +$$ LANGUAGE sql; +-- force the transaction ID to be used in a parallel plan +BEGIN; +SELECT assign_distributed_transaction_id(50, 1234567, '2016-01-01 00:00:00+0'); + assign_distributed_transaction_id +----------------------------------- + +(1 row) + +-- create >8MB table +CREATE UNLOGGED TABLE parallel_id_test AS +SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s; +INSERT INTO parallel_id_test VALUES (1234567), (1234567), (1234568), (1234568); +ANALYSE parallel_id_test; +SET LOCAL max_parallel_workers_per_gather TO 8; +SET LOCAL cpu_tuple_cost TO 1000000; +EXPLAIN (COSTS OFF) +SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test(); + QUERY PLAN +------------------------------------------------------------- + Gather + Workers Planned: 1 + -> Parallel Seq Scan on parallel_id_test + Filter: (a = parallel_worker_transaction_id_test()) +(4 rows) + +SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test(); + a +--------- + 1234567 + 1234567 +(2 rows) + +ROLLBACK; diff --git a/src/test/regress/sql/multi_distributed_transaction_id.sql b/src/test/regress/sql/multi_distributed_transaction_id.sql index 838549cb7..5e185162c 100644 --- a/src/test/regress/sql/multi_distributed_transaction_id.sql +++ b/src/test/regress/sql/multi_distributed_transaction_id.sql @@ -79,3 +79,27 @@ ROLLBACK PREPARED 'dist_xact_id_test'; -- set back to the original zone SET TIME ZONE DEFAULT; + +-- parallel safe wrapper for getting the current transaction number +CREATE OR REPLACE FUNCTION parallel_worker_transaction_id_test() +RETURNS bigint STRICT VOLATILE PARALLEL SAFE AS $$ + SELECT transaction_number FROM get_current_transaction_id(); +$$ LANGUAGE sql; + +-- force the transaction ID to be used in a parallel plan +BEGIN; +SELECT assign_distributed_transaction_id(50, 1234567, '2016-01-01 00:00:00+0'); + +-- create >8MB table +CREATE UNLOGGED TABLE parallel_id_test AS +SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s; +INSERT INTO parallel_id_test VALUES (1234567), (1234567), (1234568), (1234568); +ANALYSE parallel_id_test; + +SET LOCAL max_parallel_workers_per_gather TO 8; +SET LOCAL cpu_tuple_cost TO 1000000; + +EXPLAIN (COSTS OFF) +SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test(); +SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test(); +ROLLBACK;