From d6dd0b3a81a531dfbb319cc1a695c27bb6d5f6b9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 29 Nov 2017 17:46:28 +0100 Subject: [PATCH 1/2] Send BEGIN in the real-time executor when in a transaction --- .../executor/multi_real_time_executor.c | 79 ++++++++++++++++++- .../distributed/multi_server_executor.h | 6 +- 2 files changed, 83 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/multi_real_time_executor.c b/src/backend/distributed/executor/multi_real_time_executor.c index 63a8dcccb..4fd4c932e 100644 --- a/src/backend/distributed/executor/multi_real_time_executor.c +++ b/src/backend/distributed/executor/multi_real_time_executor.c @@ -90,6 +90,7 @@ MultiRealTimeExecute(Job *job) workerNodeList = ActiveReadableNodeList(); workerHash = WorkerHash(workerHashName, workerNodeList); + if (IsTransactionBlock()) { BeginOrContinueCoordinatedTransaction(); @@ -318,7 +319,15 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, if (pollStatus == CLIENT_CONNECTION_READY) { taskExecution->dataFetchTaskIndex = -1; - taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP; + + if (InCoordinatedTransaction()) + { + taskStatusArray[currentIndex] = EXEC_BEGIN_START; + } + else + { + taskStatusArray[currentIndex] = EXEC_FETCH_TASK_LOOP; + } } else if (pollStatus == CLIENT_CONNECTION_BUSY) { @@ -397,6 +406,74 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution, break; } + case EXEC_BEGIN_START: + { + int32 connectionId = connectionIdArray[currentIndex]; + MultiConnection *connection = MultiClientGetConnection(connectionId); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* + * If BEGIN was not yet sent on this connection, send it now. + * Otherwise, continue with the task. + */ + if (transaction->transactionState == REMOTE_TRANS_INVALID) + { + StartRemoteTransactionBegin(connection); + taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING; + break; + } + else + { + /* + * We skip data fetches when in a distributed transaction since + * they cannot be performed in a transactional way (e.g. would + * trigger deadlock detection). + */ + taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START; + break; + } + } + + case EXEC_BEGIN_RUNNING: + { + int32 connectionId = connectionIdArray[currentIndex]; + MultiConnection *connection = MultiClientGetConnection(connectionId); + RemoteTransaction *transaction = &connection->remoteTransaction; + + /* check if query results are in progress or unavailable */ + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_BUSY) + { + *executionStatus = TASK_STATUS_SOCKET_READ; + taskStatusArray[currentIndex] = EXEC_BEGIN_RUNNING; + break; + } + else if (resultStatus == CLIENT_RESULT_UNAVAILABLE) + { + taskStatusArray[currentIndex] = EXEC_TASK_FAILED; + break; + } + + /* read the results from BEGIN and update the transaction state */ + FinishRemoteTransactionBegin(connection); + + if (transaction->transactionFailed) + { + taskStatusArray[currentIndex] = EXEC_TASK_FAILED; + break; + } + else + { + /* + * We skip data fetches when in a distributed transaction since + * they cannot be performed in a transactional way (e.g. would + * trigger deadlock detection). + */ + taskStatusArray[currentIndex] = EXEC_COMPUTE_TASK_START; + break; + } + } + case EXEC_FETCH_TASK_LOOP: { List *dataFetchTaskList = task->dependedTaskList; diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index cab5b35a1..c062bd84a 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -58,7 +58,11 @@ typedef enum EXEC_TASK_TRACKER_RETRY = 13, EXEC_TASK_TRACKER_FAILED = 14, EXEC_SOURCE_TASK_TRACKER_RETRY = 15, - EXEC_SOURCE_TASK_TRACKER_FAILED = 16 + EXEC_SOURCE_TASK_TRACKER_FAILED = 16, + + /* transactional operations */ + EXEC_BEGIN_START = 20, + EXEC_BEGIN_RUNNING = 21 } TaskExecStatus; From 0d6a7f58841eb573509faddccce3e68e109e696d Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 30 Nov 2017 09:21:46 +0100 Subject: [PATCH 2/2] Add real-time BEGIN regression tests --- .../expected/multi_real_time_transaction.out | 68 ++++++++++++++++++- .../multi_real_time_transaction_0.out | 68 ++++++++++++++++++- src/test/regress/multi_schedule | 3 +- .../sql/multi_real_time_transaction.sql | 32 +++++++++ 4 files changed, 167 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/multi_real_time_transaction.out b/src/test/regress/expected/multi_real_time_transaction.out index 7c43d5285..5ac61cbff 100644 --- a/src/test/regress/expected/multi_real_time_transaction.out +++ b/src/test/regress/expected/multi_real_time_transaction.out @@ -214,6 +214,71 @@ SELECT COUNT(*) FROM test_table; (1 row) ROLLBACK; +-- We don't get a distributed transaction id outside a transaction block +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; + ?column? +---------- + f +(1 row) + +-- We should get a distributed transaction id inside a transaction block +BEGIN; +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; + ?column? +---------- + t +(1 row) + +END; +-- Add a function to insert a row into a table +SELECT public.run_command_on_master_and_workers($$ +CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name) +RETURNS bool +AS $BODY$ +BEGIN + EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name); + RETURN true; +END; +$BODY$ LANGUAGE plpgsql; +$$); + run_command_on_master_and_workers +----------------------------------- + +(1 row) + +-- SELECT should be rolled back because we send BEGIN +BEGIN; +SELECT count(*) FROM test_table; + count +------- + 6 +(1 row) + +-- Sneakily insert directly into shards +SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table; + insert_row_test +----------------- + t + t + t + t + t + t +(6 rows) + +SELECT count(*) FROM test_table; + count +------- + 12 +(1 row) + +ABORT; +SELECT count(*) FROM test_table; + count +------- + 6 +(1 row) + -- Test with foreign key ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; @@ -227,7 +292,8 @@ SELECT * FROM co_test_table; ROLLBACK; DROP SCHEMA multi_real_time_transaction CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table test_table drop cascades to table co_test_table drop cascades to table ref_test_table +drop cascades to function insert_row_test(name) diff --git a/src/test/regress/expected/multi_real_time_transaction_0.out b/src/test/regress/expected/multi_real_time_transaction_0.out index c335b0a73..d022a1f51 100644 --- a/src/test/regress/expected/multi_real_time_transaction_0.out +++ b/src/test/regress/expected/multi_real_time_transaction_0.out @@ -222,6 +222,71 @@ SELECT COUNT(*) FROM test_table; (1 row) ROLLBACK; +-- We don't get a distributed transaction id outside a transaction block +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; + ?column? +---------- + f +(1 row) + +-- We should get a distributed transaction id inside a transaction block +BEGIN; +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; + ?column? +---------- + t +(1 row) + +END; +-- Add a function to insert a row into a table +SELECT public.run_command_on_master_and_workers($$ +CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name) +RETURNS bool +AS $BODY$ +BEGIN + EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name); + RETURN true; +END; +$BODY$ LANGUAGE plpgsql; +$$); + run_command_on_master_and_workers +----------------------------------- + +(1 row) + +-- SELECT should be rolled back because we send BEGIN +BEGIN; +SELECT count(*) FROM test_table; + count +------- + 6 +(1 row) + +-- Sneakily insert directly into shards +SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table; + insert_row_test +----------------- + t + t + t + t + t + t +(6 rows) + +SELECT count(*) FROM test_table; + count +------- + 12 +(1 row) + +ABORT; +SELECT count(*) FROM test_table; + count +------- + 6 +(1 row) + -- Test with foreign key ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE; @@ -235,7 +300,8 @@ SELECT * FROM co_test_table; ROLLBACK; DROP SCHEMA multi_real_time_transaction CASCADE; -NOTICE: drop cascades to 3 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table test_table drop cascades to table co_test_table drop cascades to table ref_test_table +drop cascades to function insert_row_test(name) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 09f18eeda..331b43e59 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -32,7 +32,6 @@ test: multi_read_from_secondaries test: multi_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select multi_insert_select_window multi_shard_update_delete -test: multi_real_time_transaction # ---------- # Tests for partitioning support @@ -42,7 +41,7 @@ test: multi_partitioning_utils multi_partitioning # ---------- # Miscellaneous tests to check our query planning behavior # ---------- -test: multi_deparse_shard_query multi_distributed_transaction_id +test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction test: multi_explain test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql diff --git a/src/test/regress/sql/multi_real_time_transaction.sql b/src/test/regress/sql/multi_real_time_transaction.sql index 229d11d85..12f2ec2cf 100644 --- a/src/test/regress/sql/multi_real_time_transaction.sql +++ b/src/test/regress/sql/multi_real_time_transaction.sql @@ -135,6 +135,38 @@ ALTER TABLE test_table ADD CONSTRAINT num_check CHECK (col_1 < 50); SELECT COUNT(*) FROM test_table; ROLLBACK; +-- We don't get a distributed transaction id outside a transaction block +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; + +-- We should get a distributed transaction id inside a transaction block +BEGIN; +SELECT (get_current_transaction_id()).transaction_number > 0 FROM test_table LIMIT 1; +END; + +-- Add a function to insert a row into a table +SELECT public.run_command_on_master_and_workers($$ +CREATE FUNCTION multi_real_time_transaction.insert_row_test(table_name name) +RETURNS bool +AS $BODY$ +BEGIN + EXECUTE format('INSERT INTO %s VALUES(100,100,''function'')', table_name); + RETURN true; +END; +$BODY$ LANGUAGE plpgsql; +$$); + +-- SELECT should be rolled back because we send BEGIN +BEGIN; +SELECT count(*) FROM test_table; + +-- Sneakily insert directly into shards +SELECT insert_row_test(pg_typeof(test_table)::name) FROM test_table; +SELECT count(*) FROM test_table; +ABORT; + +SELECT count(*) FROM test_table; + + -- Test with foreign key ALTER TABLE test_table ADD CONSTRAINT p_key_tt PRIMARY KEY (id); ALTER TABLE co_test_table ADD CONSTRAINT f_key_ctt FOREIGN KEY (id) REFERENCES test_table(id) ON DELETE CASCADE;