diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index dfd4069ea..c66c8c541 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -3127,9 +3127,21 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, Task *task = shardCommandExecution->task; ShardPlacement *taskPlacement = placementExecution->shardPlacement; List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - char *queryString = task->queryString; int querySent = 0; + char *queryString = NULL; + if (task->queryString != NULL) + { + queryString = task->queryString; + } + else + { + Assert(list_length(task->taskPlacementList) == list_length( + task->perPlacementQueryStrings)); + queryString = list_nth(task->perPlacementQueryStrings, + placementExecution->placementExecutionIndex); + } + if (execution->transactionProperties->useRemoteTransactionBlocks != TRANSACTION_BLOCKS_DISALLOWED) { diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index f68638ff7..cd1f80707 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -43,8 +43,9 @@ static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shard static char * SourceShardPrefix(char *resultPrefix, uint64 shardId); static DistributedResultFragment * TupleToDistributedResultFragment( TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation); -static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc - resultDescriptor); +static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, + TupleDesc resultDescriptor, + bool errorOnAnyFailure); /* @@ -111,31 +112,42 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, foreach(taskCell, selectTaskList) { Task *selectTask = (Task *) lfirst(taskCell); - StringInfo wrappedQuery = makeStringInfo(); List *shardPlacementList = selectTask->taskPlacementList; - - ShardPlacement *shardPlacement = linitial(shardPlacementList); char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); char *partitionMethodString = targetRelation->partitionMethod == 'h' ? "hash" : "range"; const char *binaryFormatString = binaryFormat ? "true" : "false"; + List *perPlacementQueries = NIL; - appendStringInfo(wrappedQuery, - "SELECT %d, partition_index" - ", %s || '_' || partition_index::text " - ", rows_written " - "FROM worker_partition_query_result" - "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", - shardPlacement->nodeId, - quote_literal_cstr(taskPrefix), - quote_literal_cstr(taskPrefix), - quote_literal_cstr(selectTask->queryString), - partitionColumnIndex, - quote_literal_cstr(partitionMethodString), - minValuesString->data, maxValuesString->data, - binaryFormatString); + /* + * We need to know which placement could successfully execute the query, + * so we form a different query per placement, each of which returning + * the node id of the placement. + */ + ListCell *placementCell = NULL; + foreach(placementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = lfirst(placementCell); + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, + "SELECT %d, partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", + shardPlacement->nodeId, + quote_literal_cstr(taskPrefix), + quote_literal_cstr(taskPrefix), + quote_literal_cstr(selectTask->queryString), + partitionColumnIndex, + quote_literal_cstr(partitionMethodString), + minValuesString->data, maxValuesString->data, + binaryFormatString); + perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); + } - selectTask->queryString = wrappedQuery->data; + selectTask->queryString = NULL; + selectTask->perPlacementQueryStrings = perPlacementQueries; } } @@ -244,7 +256,9 @@ ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation) TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written", INT8OID, -1, 0); - resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor); + bool errorOnAnyFailure = false; + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, + errorOnAnyFailure); List *fragmentList = NIL; TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, @@ -298,14 +312,15 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, * store containing its results. */ static Tuplestorestate * -ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor) +ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, + bool errorOnAnyFailure) { bool hasReturning = true; int targetPoolSize = MaxAdaptiveExecutorPoolSize; bool randomAccess = true; bool interTransactions = false; TransactionProperties xactProperties = { - .errorOnAnyFailure = true, + .errorOnAnyFailure = errorOnAnyFailure, .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, .requires2PC = false }; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 0d898236a..e86e5ac55 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -117,7 +117,8 @@ static bool MaybeExecutingUDF(void); void UseCoordinatedTransaction(void) { - if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED) + if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED || + CurrentCoordinatedTransactionState == COORD_TRANS_STARTED_ON_WORKER) { return; } @@ -130,7 +131,21 @@ UseCoordinatedTransaction(void) CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; - AssignDistributedTransactionId(); + /* + * This might be part of bigger distributed transaction originating from + * another node, in which case transaction id has already been assigned + * by a assign_distributed_transaction_id() call. + */ + DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId(); + if (transactionId->transactionNumber == 0) + { + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED_ON_WORKER; + AssignDistributedTransactionId(); + } + else + { + CurrentCoordinatedTransactionState = COORD_TRANS_STARTED; + } } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 4b193fabf..a9045bb50 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -181,7 +181,17 @@ typedef struct Task TaskType taskType; uint64 jobId; uint32 taskId; + + /* + * If queryString != NULL, then we have a single query for all placements. + * Otherwise, length of perPlacementQueryStrings is equal to length of + * taskPlacementList and can assign a different query for each placement. + * We need this flexibility when a query should return node specific values. + * For example, on which node did we succeed storing some result files? + */ char *queryString; + List *perPlacementQueryStrings; + uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ List *dependentTaskList; /* only applies to compute tasks */ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index 924d836be..f1e9ece90 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -35,6 +35,13 @@ typedef enum CoordinatedTransactionState /* no coordinated transaction in progress, but connections established */ COORD_TRANS_IDLE, + /* + * Coordinated transaction was initiated by coordinator, but the worker also + * needs to start a coordinated transaction to be able to send commands to + * other workers. + */ + COORD_TRANS_STARTED_ON_WORKER, + /* coordinated transaction in progress */ COORD_TRANS_STARTED, diff --git a/src/test/regress/expected/distributed_intermediate_results.out b/src/test/regress/expected/distributed_intermediate_results.out index cc7ad8af9..24d3ce1dc 100644 --- a/src/test/regress/expected/distributed_intermediate_results.out +++ b/src/test/regress/expected/distributed_intermediate_results.out @@ -3,21 +3,6 @@ CREATE SCHEMA distributed_intermediate_results; SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; -- --- Helper UDFs --- --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; --- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. -- diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out new file mode 100644 index 000000000..5681a5173 --- /dev/null +++ b/src/test/regress/expected/failure_distributed_results.out @@ -0,0 +1,132 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 100800; +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 9060 | 10 | 100806 | 1 + test_from_100802_to_2 | 9060 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +WARNING: connection error: localhost:xxxxx +DETAIL: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_100800_to_0 | 9060 | 22 | 100805 | 0 + test_from_100801_to_0 | 57637 | 2 | 100805 | 0 + test_from_100801_to_1 | 57637 | 15 | 100806 | 1 + test_from_100802_to_1 | 57637 | 10 | 100806 | 1 + test_from_100802_to_2 | 57637 | 5 | 100807 | 2 + test_from_100803_to_2 | 57637 | 18 | 100807 | 2 + test_from_100803_to_3 | 57637 | 4 | 100808 | 3 + test_from_100804_to_3 | 9060 | 24 | 100808 | 3 +(8 rows) + +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory +CONTEXT: while executing command on localhost:xxxxx +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; + fetched +--------------------------------------------------------------------- + t +(1 row) + +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 15 | 863 +(1 row) + +ROLLBACk; +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 1b17445c8..c901beb5f 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -135,3 +135,15 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index f63d28358..ddaff3a8c 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -6,6 +6,7 @@ test: failure_setup test: multi_test_helpers test: failure_replicated_partitions test: multi_test_catalog_views +test: failure_distributed_results test: failure_ddl test: failure_truncate test: failure_create_index_concurrently diff --git a/src/test/regress/sql/distributed_intermediate_results.sql b/src/test/regress/sql/distributed_intermediate_results.sql index 4b5cecaa6..2d59abf45 100644 --- a/src/test/regress/sql/distributed_intermediate_results.sql +++ b/src/test/regress/sql/distributed_intermediate_results.sql @@ -4,23 +4,6 @@ SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; --- --- Helper UDFs --- - --- partition_task_list_results tests the internal PartitionTasklistResults function -CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, - query text, - target_table regclass, - binaryFormat bool DEFAULT true) - RETURNS TABLE(resultId text, - nodeId int, - rowCount bigint, - targetShardId bigint, - targetShardIndex int) - LANGUAGE C STRICT VOLATILE - AS 'citus', $$partition_task_list_results$$; - -- -- We don't have extensive tests for partition_task_results, since it will be -- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql new file mode 100644 index 000000000..6f6e12805 --- /dev/null +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -0,0 +1,74 @@ +-- +-- Test failures related to distributed_intermediate_results.c +-- + + +CREATE SCHEMA failure_distributed_results; +SET search_path TO 'failure_distributed_results'; + +-- do not cache any connections +SET citus.max_cached_conns_per_worker TO 0; + +-- we don't want to see the prepared transaction numbers in the warnings +SET client_min_messages TO WARNING; + +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.next_shard_id TO 100800; + +-- always try the 1st replica before the 2nd replica. +SET citus.task_assignment_policy TO 'first-replica'; + +-- +-- Case 1. +-- Source is replicated, target is not. +-- Fail the partition query on the first node. +-- +CREATE TABLE source_table(a int); +SET citus.shard_replication_factor TO 2; +SET citus.shard_count TO 5; +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table(a int); +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SELECT create_distributed_table('target_table', 'a'); + +-- without failure results from 100802 should be stored on 9060 +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +-- with failure, results from 100802 should be retried and succeed on 57637 +SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +-- fetch from worker 2 should fail +SAVEPOINT s1; +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched; +ROLLBACK TO SAVEPOINT s1; +-- fetch from worker 1 should succeed +SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched; +-- make sure the results read are same as the previous transaction block +SELECT count(*), sum(x) FROM + read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int); +ROLLBACk; + +SET search_path TO 'public'; +RESET citus.shard_replication_factor; +RESET citus.shard_count; +RESET citus.task_assignment_policy; +DROP SCHEMA failure_distributed_results CASCADE; +RESET client_min_messages; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index 3383c6714..93823ea7a 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -141,3 +141,17 @@ BEGIN END LOOP; END; $$ LANGUAGE plpgsql; + + +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$;