mirror of https://github.com/citusdata/citus.git
Merge pull request #1841 from citusdata/send_begin
Send begin in real-time executor when in a coordinated transactionpull/1838/head
commit
8cb5734481
|
@ -90,6 +90,7 @@ MultiRealTimeExecute(Job *job)
|
|||
|
||||
workerNodeList = ActiveReadableNodeList();
|
||||
workerHash = WorkerHash(workerHashName, workerNodeList);
|
||||
|
||||
if (IsTransactionBlock())
|
||||
{
|
||||
BeginOrContinueCoordinatedTransaction();
|
||||
|
@ -318,7 +319,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
if (pollStatus == CLIENT_CONNECTION_READY)
|
||||
{
|
||||
taskExecution->dataFetchTaskIndex = -1;
|
||||
taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP;
|
||||
|
||||
if (InCoordinatedTransaction())
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_START;
|
||||
}
|
||||
else
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP;
|
||||
}
|
||||
}
|
||||
else if (pollStatus == CLIENT_CONNECTION_BUSY)
|
||||
{
|
||||
|
@ -397,6 +406,74 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
|
|||
break;
|
||||
}
|
||||
|
||||
case EXEC_BEGIN_START:
|
||||
{
|
||||
int32 connectionId = connectionIdArray[currentIndex];
|
||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||
|
||||
/*
|
||||
* If BEGIN was not yet sent on this connection, send it now.
|
||||
* Otherwise, continue with the task.
|
||||
*/
|
||||
if (transaction->transactionState == REMOTE_TRANS_INVALID)
|
||||
{
|
||||
StartRemoteTransactionBegin(connection);
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We skip data fetches when in a distributed transaction since
|
||||
* they cannot be performed in a transactional way (e.g. would
|
||||
* trigger deadlock detection).
|
||||
*/
|
||||
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case EXEC_BEGIN_RUNNING:
|
||||
{
|
||||
int32 connectionId = connectionIdArray[currentIndex];
|
||||
MultiConnection *connection = MultiClientGetConnection(connectionId);
|
||||
RemoteTransaction *transaction = &connection->remoteTransaction;
|
||||
|
||||
/* check if query results are in progress or unavailable */
|
||||
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
|
||||
if (resultStatus == CLIENT_RESULT_BUSY)
|
||||
{
|
||||
*executionStatus = TASK_STATUS_SOCKET_READ;
|
||||
taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING;
|
||||
break;
|
||||
}
|
||||
else if (resultStatus == CLIENT_RESULT_UNAVAILABLE)
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||
break;
|
||||
}
|
||||
|
||||
/* read the results from BEGIN and update the transaction state */
|
||||
FinishRemoteTransactionBegin(connection);
|
||||
|
||||
if (transaction->transactionFailed)
|
||||
{
|
||||
taskStatusArray[currentIndex] = EXEC_TASK_FAILED;
|
||||
break;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* We skip data fetches when in a distributed transaction since
|
||||
* they cannot be performed in a transactional way (e.g. would
|
||||
* trigger deadlock detection).
|
||||
*/
|
||||
taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
case EXEC_FETCH_TASK_LOOP:
|
||||
{
|
||||
List *dataFetchTaskList = task->dependedTaskList;
|
||||
|
|
|
@ -58,7 +58,11 @@ typedef enum
|
|||
EXEC_TASK_TRACKER_RETRY = 13,
|
||||
EXEC_TASK_TRACKER_FAILED = 14,
|
||||
EXEC_SOURCE_TASK_TRACKER_RETRY = 15,
|
||||
EXEC_SOURCE_TASK_TRACKER_FAILED = 16
|
||||
EXEC_SOURCE_TASK_TRACKER_FAILED = 16,
|
||||
|
||||
/* transactional operations */
|
||||
EXEC_BEGIN_START = 20,
|
||||
EXEC_BEGIN_RUNNING = 21
|
||||
} TaskExecStatus;
|
||||
|
||||
|
||||
|
|
|
@ -214,6 +214,71 @@ SELECT COUNT(*) FROM test_table;
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- We don't get a distributed transaction id outside a transaction block
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
?column?
|
||||
----------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- We should get a distributed transaction id inside a transaction block
|
||||
BEGIN;
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
?column?
|
||||
----------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
-- Add a function to insert a row into a table
|
||||
SELECT public.run_command_on_master_and_workers($$
|
||||
CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name)
|
||||
RETURNS bool
|
||||
AS $BODY$
|
||||
BEGIN
|
||||
EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name);
|
||||
RETURN true;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
$$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- SELECT should be rolled back because we send BEGIN
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- Sneakily insert directly into shards
|
||||
SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table;
|
||||
insert_row_test
|
||||
-----------------
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
(6 rows)
|
||||
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
12
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- Test with foreign key
|
||||
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||
|
@ -227,7 +292,8 @@ SELECT * FROM co_test_table;
|
|||
|
||||
ROLLBACK;
|
||||
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table test_table
|
||||
drop cascades to table co_test_table
|
||||
drop cascades to table ref_test_table
|
||||
drop cascades to function insert_row_test(name)
|
||||
|
|
|
@ -222,6 +222,71 @@ SELECT COUNT(*) FROM test_table;
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- We don't get a distributed transaction id outside a transaction block
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
?column?
|
||||
----------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
-- We should get a distributed transaction id inside a transaction block
|
||||
BEGIN;
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
?column?
|
||||
----------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
END;
|
||||
-- Add a function to insert a row into a table
|
||||
SELECT public.run_command_on_master_and_workers($$
|
||||
CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name)
|
||||
RETURNS bool
|
||||
AS $BODY$
|
||||
BEGIN
|
||||
EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name);
|
||||
RETURN true;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
$$);
|
||||
run_command_on_master_and_workers
|
||||
-----------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- SELECT should be rolled back because we send BEGIN
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- Sneakily insert directly into shards
|
||||
SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table;
|
||||
insert_row_test
|
||||
-----------------
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
t
|
||||
(6 rows)
|
||||
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
12
|
||||
(1 row)
|
||||
|
||||
ABORT;
|
||||
SELECT count(*) FROM test_table;
|
||||
count
|
||||
-------
|
||||
6
|
||||
(1 row)
|
||||
|
||||
-- Test with foreign key
|
||||
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||
|
@ -235,7 +300,8 @@ SELECT * FROM co_test_table;
|
|||
|
||||
ROLLBACK;
|
||||
DROP SCHEMA multi_real_time_transaction CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table test_table
|
||||
drop cascades to table co_test_table
|
||||
drop cascades to table ref_test_table
|
||||
drop cascades to function insert_row_test(name)
|
||||
|
|
|
@ -32,7 +32,6 @@ test: multi_read_from_secondaries
|
|||
test: multi_create_table
|
||||
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
|
||||
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete
|
||||
test: multi_real_time_transaction
|
||||
|
||||
# ----------
|
||||
# Tests for partitioning support
|
||||
|
@ -42,7 +41,7 @@ test: multi_partitioning_utils multi_partitioning
|
|||
# ----------
|
||||
# Miscellaneous tests to check our query planning behavior
|
||||
# ----------
|
||||
test: multi_deparse_shard_query multi_distributed_transaction_id
|
||||
test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction
|
||||
test: multi_explain
|
||||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||
|
|
|
@ -135,6 +135,38 @@ ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50);
|
|||
SELECT COUNT(*) FROM test_table;
|
||||
ROLLBACK;
|
||||
|
||||
-- We don't get a distributed transaction id outside a transaction block
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
|
||||
-- We should get a distributed transaction id inside a transaction block
|
||||
BEGIN;
|
||||
SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1;
|
||||
END;
|
||||
|
||||
-- Add a function to insert a row into a table
|
||||
SELECT public.run_command_on_master_and_workers($$
|
||||
CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name)
|
||||
RETURNS bool
|
||||
AS $BODY$
|
||||
BEGIN
|
||||
EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name);
|
||||
RETURN true;
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
$$);
|
||||
|
||||
-- SELECT should be rolled back because we send BEGIN
|
||||
BEGIN;
|
||||
SELECT count(*) FROM test_table;
|
||||
|
||||
-- Sneakily insert directly into shards
|
||||
SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table;
|
||||
SELECT count(*) FROM test_table;
|
||||
ABORT;
|
||||
|
||||
SELECT count(*) FROM test_table;
|
||||
|
||||
|
||||
-- Test with foreign key
|
||||
ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id);
|
||||
ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;
|
||||
|
|
Loading…
Reference in New Issue