mirror of https://github.com/citusdata/citus.git
Merge pull request #1879 from citusdata/result_in_parallel_worker
Allow intermediate results to be used in parallel workerspull/1889/head
commit
5c5bd80afc
|
@ -482,25 +482,27 @@ UnlockBackendSharedMemory(void)
|
||||||
/*
|
/*
|
||||||
* GetCurrentDistributedTransactionId reads the backend's distributed transaction id and
|
* GetCurrentDistributedTransactionId reads the backend's distributed transaction id and
|
||||||
* returns a copy of it.
|
* returns a copy of it.
|
||||||
|
*
|
||||||
|
* When called from a parallel worker, it uses the parent's transaction ID per the logic
|
||||||
|
* in GetBackendDataForProc.
|
||||||
*/
|
*/
|
||||||
DistributedTransactionId *
|
DistributedTransactionId *
|
||||||
GetCurrentDistributedTransactionId(void)
|
GetCurrentDistributedTransactionId(void)
|
||||||
{
|
{
|
||||||
DistributedTransactionId *currentDistributedTransactionId =
|
DistributedTransactionId *currentDistributedTransactionId =
|
||||||
(DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
|
(DistributedTransactionId *) palloc(sizeof(DistributedTransactionId));
|
||||||
|
BackendData backendData;
|
||||||
|
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
GetBackendDataForProc(MyProc, &backendData);
|
||||||
|
|
||||||
currentDistributedTransactionId->initiatorNodeIdentifier =
|
currentDistributedTransactionId->initiatorNodeIdentifier =
|
||||||
MyBackendData->transactionId.initiatorNodeIdentifier;
|
backendData.transactionId.initiatorNodeIdentifier;
|
||||||
currentDistributedTransactionId->transactionOriginator =
|
currentDistributedTransactionId->transactionOriginator =
|
||||||
MyBackendData->transactionId.transactionOriginator;
|
backendData.transactionId.transactionOriginator;
|
||||||
currentDistributedTransactionId->transactionNumber =
|
currentDistributedTransactionId->transactionNumber =
|
||||||
MyBackendData->transactionId.transactionNumber;
|
backendData.transactionId.transactionNumber;
|
||||||
currentDistributedTransactionId->timestamp =
|
currentDistributedTransactionId->timestamp =
|
||||||
MyBackendData->transactionId.timestamp;
|
backendData.transactionId.timestamp;
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
|
||||||
|
|
||||||
return currentDistributedTransactionId;
|
return currentDistributedTransactionId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,3 +125,41 @@ SELECT initiator_node_identifier, transaction_number, transaction_stamp, (proces
|
||||||
ROLLBACK PREPARED 'dist_xact_id_test';
|
ROLLBACK PREPARED 'dist_xact_id_test';
|
||||||
-- set back to the original zone
|
-- set back to the original zone
|
||||||
SET TIME ZONE DEFAULT;
|
SET TIME ZONE DEFAULT;
|
||||||
|
-- parallel safe wrapper for getting the current transaction number
|
||||||
|
CREATE OR REPLACE FUNCTION parallel_worker_transaction_id_test()
|
||||||
|
RETURNS bigint STRICT VOLATILE PARALLEL SAFE AS $$
|
||||||
|
SELECT transaction_number FROM get_current_transaction_id();
|
||||||
|
$$ LANGUAGE sql;
|
||||||
|
-- force the transaction ID to be used in a parallel plan
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(50, 1234567, '2016-01-01 00:00:00+0');
|
||||||
|
assign_distributed_transaction_id
|
||||||
|
-----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- create >8MB table
|
||||||
|
CREATE UNLOGGED TABLE parallel_id_test AS
|
||||||
|
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
||||||
|
INSERT INTO parallel_id_test VALUES (1234567), (1234567), (1234568), (1234568);
|
||||||
|
ANALYSE parallel_id_test;
|
||||||
|
SET LOCAL max_parallel_workers_per_gather TO 8;
|
||||||
|
SET LOCAL cpu_tuple_cost TO 1000000;
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
||||||
|
QUERY PLAN
|
||||||
|
-------------------------------------------------------------
|
||||||
|
Gather
|
||||||
|
Workers Planned: 1
|
||||||
|
-> Parallel Seq Scan on parallel_id_test
|
||||||
|
Filter: (a = parallel_worker_transaction_id_test())
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
||||||
|
a
|
||||||
|
---------
|
||||||
|
1234567
|
||||||
|
1234567
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
|
|
@ -79,3 +79,27 @@ ROLLBACK PREPARED 'dist_xact_id_test';
|
||||||
|
|
||||||
-- set back to the original zone
|
-- set back to the original zone
|
||||||
SET TIME ZONE DEFAULT;
|
SET TIME ZONE DEFAULT;
|
||||||
|
|
||||||
|
-- parallel safe wrapper for getting the current transaction number
|
||||||
|
CREATE OR REPLACE FUNCTION parallel_worker_transaction_id_test()
|
||||||
|
RETURNS bigint STRICT VOLATILE PARALLEL SAFE AS $$
|
||||||
|
SELECT transaction_number FROM get_current_transaction_id();
|
||||||
|
$$ LANGUAGE sql;
|
||||||
|
|
||||||
|
-- force the transaction ID to be used in a parallel plan
|
||||||
|
BEGIN;
|
||||||
|
SELECT assign_distributed_transaction_id(50, 1234567, '2016-01-01 00:00:00+0');
|
||||||
|
|
||||||
|
-- create >8MB table
|
||||||
|
CREATE UNLOGGED TABLE parallel_id_test AS
|
||||||
|
SELECT s AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
|
||||||
|
INSERT INTO parallel_id_test VALUES (1234567), (1234567), (1234568), (1234568);
|
||||||
|
ANALYSE parallel_id_test;
|
||||||
|
|
||||||
|
SET LOCAL max_parallel_workers_per_gather TO 8;
|
||||||
|
SET LOCAL cpu_tuple_cost TO 1000000;
|
||||||
|
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
||||||
|
SELECT a FROM parallel_id_test WHERE a = parallel_worker_transaction_id_test();
|
||||||
|
ROLLBACK;
|
||||||
|
|
Loading…
Reference in New Issue