From c7c460e843de9cc3900d0bc1b57e23679ee9b1c7 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 10:49:58 -0800 Subject: [PATCH 1/3] PartitionTasklistResults: Use different queries per placement We need to know which placement succeeded in executing the worker_partition_query_result() call. Otherwise we wouldn't know which node to fetch from. This change allows that by introducing Task::perPlacementQueryStrings. --- .../distributed/executor/adaptive_executor.c | 14 ++++- .../distributed_intermediate_results.c | 61 ++++++++++++------- .../distributed/multi_physical_planner.h | 10 +++ 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dfd4069ea..c66c8c541 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3127,9 +3127,21 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, Task *task = shardCommandExecution->task; ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - char *queryString = task->queryString; int querySent = 0; + char *queryString = NULL; + if (task->queryString != NULL) + { + queryString = task->queryString; + } + else + { + Assert(list_length(task->taskPlacementList) == list_length( + task->perPlacementQueryStrings)); + queryString = list_nth(task->perPlacementQueryStrings, + placementExecution->placementExecutionIndex); + } + if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index f68638ff7..cd1f80707 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -43,8 +43,9 @@ static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shard static char * SourceShardPrefix(char *resultPrefix, uint64 shardId); static DistributedResultFragment * TupleToDistributedResultFragment( TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation); -static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc - resultDescriptor); +static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, + TupleDesc resultDescriptor, + bool errorOnAnyFailure); /* @@ -111,31 +112,42 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, foreach(taskCell, selectTaskList) { Task *selectTask = (Task *) lfirst(taskCell); - StringInfo wrappedQuery = makeStringInfo(); List *shardPlacementList = selectTask->taskPlacementList; - - ShardPlacement *shardPlacement = linitial(shardPlacementList); char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); char *partitionMethodString = targetRelation->partitionMethod == 'h' ? "hash" : "range"; const char *binaryFormatString = binaryFormat ? "true" : "false"; + List *perPlacementQueries = NIL; - appendStringInfo(wrappedQuery, - "SELECT %d, partition_index" - ", %s || '_' || partition_index::text " - ", rows_written " - "FROM worker_partition_query_result" - "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", - shardPlacement->nodeId, - quote_literal_cstr(taskPrefix), - quote_literal_cstr(taskPrefix), - quote_literal_cstr(selectTask->queryString), - partitionColumnIndex, - quote_literal_cstr(partitionMethodString), - minValuesString->data, maxValuesString->data, - binaryFormatString); + /* + * We need to know which placement could successfully execute the query, + * so we form a different query per placement, each of which returning + * the node id of the placement. + */ + ListCell *placementCell = NULL; + foreach(placementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = lfirst(placementCell); + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, + "SELECT %d, partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", + shardPlacement->nodeId, + quote_literal_cstr(taskPrefix), + quote_literal_cstr(taskPrefix), + quote_literal_cstr(selectTask->queryString), + partitionColumnIndex, + quote_literal_cstr(partitionMethodString), + minValuesString->data, maxValuesString->data, + binaryFormatString); + perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); + } - selectTask->queryString = wrappedQuery->data; + selectTask->queryString = NULL; + selectTask->perPlacementQueryStrings = perPlacementQueries; } } @@ -244,7 +256,9 @@ ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation) TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written", INT8OID, -1, 0); - resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor); + bool errorOnAnyFailure = false; + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, + errorOnAnyFailure); List *fragmentList = NIL; TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, @@ -298,14 +312,15 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, * store containing its results. */ static Tuplestorestate * -ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor) +ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, + bool errorOnAnyFailure) { bool hasReturning = true; int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool randomAccess = true; bool interTransactions = false; TransactionProperties xactProperties = { - .errorOnAnyFailure = true, + .errorOnAnyFailure = errorOnAnyFailure, .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, .requires2PC = false }; diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4b193fabf..a9045bb50 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -181,7 +181,17 @@ typedef struct Task TaskType taskType; uint64 jobId; uint32 taskId; + + /* + * If queryString != NULL, then we have a single query for all placements. + * Otherwise, length of perPlacementQueryStrings is equal to length of + * taskPlacementList and can assign a different query for each placement. + * We need this flexibility when a query should return node specific values. + * For example, on which node did we succeed storing some result files? + */ char *queryString; + List *perPlacementQueryStrings; + uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ List *dependentTaskList; /* only applies to compute tasks */ From bb656691862a3ce02349b9775c51034dc9b7a040 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 10:51:37 -0800 Subject: [PATCH 2/3] Failure tests for PartitionTasklistResults --- .../distributed_intermediate_results.out | 15 -- .../expected/failure_distributed_results.out | 132 ++++++++++++++++++ .../regress/expected/multi_test_helpers.out | 12 ++ src/test/regress/failure_schedule | 1 + .../sql/distributed_intermediate_results.sql | 17 --- .../sql/failure_distributed_results.sql | 74 ++++++++++ src/test/regress/sql/multi_test_helpers.sql | 14 ++ 7 files changed, 233 insertions(+), 32 deletions(-) create mode 100644 src/test/regress/expected/failure_distributed_results.out create mode 100644 src/test/regress/sql/failure_distributed_results.sql diff --git a/src/test/regress/expected/distributed_intermediate_results.out b/src/test/regress/expected/distributed_intermediate_results.out index cc7ad8af9..24d3ce1dc 100644 --- a/src/test/regress/expected/distributed_intermediate_results.out +++ b/src/test/regress/expected/distributed_intermediate_results.out @@ -3,21 +3,6 @@ CREATE SCHEMA distributed_intermediate_results; SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; -- --- Helper UDFs --- --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; --- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. -- diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out new file mode 100644 index 000000000..5681a5173 --- /dev/null +++ b/src/test/regress/expected/failure_distributed_results.out @@ -0,0 +1,132 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 100800; +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 9060 | 10 | 100806 | 1 + test_from_100802_to_2 | 9060 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +WARNING: connection error: localhost:xxxxx +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 57637 | 10 | 100806 | 1 + test_from_100802_to_2 | 57637 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory +CONTEXT: while executing command on localhost:xxxxx +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 1b17445c8..c901beb5f 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -135,3 +135,15 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index f63d28358..ddaff3a8c 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -6,6 +6,7 @@ test: failure_setup test: multi_test_helpers test: failure_replicated_partitions test: multi_test_catalog_views +test: failure_distributed_results test: failure_ddl test: failure_truncate test: failure_create_index_concurrently diff --git a/src/test/regress/sql/distributed_intermediate_results.sql b/src/test/regress/sql/distributed_intermediate_results.sql index 4b5cecaa6..2d59abf45 100644 --- a/src/test/regress/sql/distributed_intermediate_results.sql +++ b/src/test/regress/sql/distributed_intermediate_results.sql @@ -4,23 +4,6 @@ SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; --- --- Helper UDFs --- - --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; - -- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql new file mode 100644 index 000000000..6f6e12805 --- /dev/null +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -0,0 +1,74 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- + + +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; + +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; + +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; + +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.next_shard_id TO 100800; + +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; + +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 3383c6714..93823ea7a 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -141,3 +141,17 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; + + +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; From e1e383cb599b12e9ff2e3cbeb1e208b16eaa4f9c Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 11:09:11 -0800 Subject: [PATCH 3/3] Don't override xact id assigned by coordinator on workers. We might need to send commands from workers to other workers. In these cases we shouldn't override the xact id assigned by coordinator, or otherwise we won't read the consistent set of result files accross the nodes. --- .../transaction/transaction_management.c | 19 +++++++++++++++++-- .../distributed/transaction_management.h | 7 +++++++ 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0d898236a..e86e5ac55 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -117,7 +117,8 @@ static bool MaybeExecutingUDF(void); void UseCoordinatedTransaction(void) { - if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED || + CurrentCoordinatedTransactionState == COORD_TRANS_STARTED_ON_WORKER) { return; } @@ -130,7 +131,21 @@ UseCoordinatedTransaction(void) CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; - AssignDistributedTransactionId(); + /* + * This might be part of bigger distributed transaction originating from + * another node, in which case transaction id has already been assigned + * by a assign_distributed_transaction_id() call. + */ + DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId(); + if (transactionId->transactionNumber == 0) + { + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED_ON_WORKER; + AssignDistributedTransactionId(); + } + else + { + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + } } diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 924d836be..f1e9ece90 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -35,6 +35,13 @@ typedef enum CoordinatedTransactionState /* no coordinated transaction in progress, but connections established */ COORD_TRANS_IDLE, + /* + * Coordinated transaction was initiated by coordinator, but the worker also + * needs to start a coordinated transaction to be able to send commands to + * other workers. + */ + COORD_TRANS_STARTED_ON_WORKER, + /* coordinated transaction in progress */ COORD_TRANS_STARTED,