mirror of https://github.com/citusdata/citus.git
Merge pull request #3363 from citusdata/redistribute_failure
PartitionTasklistResults: Use different queries per placementpull/3371/head
commit
e185b54cbc
|
@ -3127,9 +3127,21 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
Task *task = shardCommandExecution->task;
|
Task *task = shardCommandExecution->task;
|
||||||
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
|
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
|
||||||
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
||||||
char *queryString = task->queryString;
|
|
||||||
int querySent = 0;
|
int querySent = 0;
|
||||||
|
|
||||||
|
char *queryString = NULL;
|
||||||
|
if (task->queryString != NULL)
|
||||||
|
{
|
||||||
|
queryString = task->queryString;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Assert(list_length(task->taskPlacementList) == list_length(
|
||||||
|
task->perPlacementQueryStrings));
|
||||||
|
queryString = list_nth(task->perPlacementQueryStrings,
|
||||||
|
placementExecution->placementExecutionIndex);
|
||||||
|
}
|
||||||
|
|
||||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||||
TRANSACTION_BLOCKS_DISALLOWED)
|
TRANSACTION_BLOCKS_DISALLOWED)
|
||||||
{
|
{
|
||||||
|
|
|
@ -43,8 +43,9 @@ static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shard
|
||||||
static char * SourceShardPrefix(char *resultPrefix, uint64 shardId);
|
static char * SourceShardPrefix(char *resultPrefix, uint64 shardId);
|
||||||
static DistributedResultFragment * TupleToDistributedResultFragment(
|
static DistributedResultFragment * TupleToDistributedResultFragment(
|
||||||
TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation);
|
TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation);
|
||||||
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc
|
static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList,
|
||||||
resultDescriptor);
|
TupleDesc resultDescriptor,
|
||||||
|
bool errorOnAnyFailure);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -111,31 +112,42 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
|
||||||
foreach(taskCell, selectTaskList)
|
foreach(taskCell, selectTaskList)
|
||||||
{
|
{
|
||||||
Task *selectTask = (Task *) lfirst(taskCell);
|
Task *selectTask = (Task *) lfirst(taskCell);
|
||||||
StringInfo wrappedQuery = makeStringInfo();
|
|
||||||
List *shardPlacementList = selectTask->taskPlacementList;
|
List *shardPlacementList = selectTask->taskPlacementList;
|
||||||
|
|
||||||
ShardPlacement *shardPlacement = linitial(shardPlacementList);
|
|
||||||
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId);
|
||||||
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
char *partitionMethodString = targetRelation->partitionMethod == 'h' ?
|
||||||
"hash" : "range";
|
"hash" : "range";
|
||||||
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
const char *binaryFormatString = binaryFormat ? "true" : "false";
|
||||||
|
List *perPlacementQueries = NIL;
|
||||||
|
|
||||||
appendStringInfo(wrappedQuery,
|
/*
|
||||||
"SELECT %d, partition_index"
|
* We need to know which placement could successfully execute the query,
|
||||||
", %s || '_' || partition_index::text "
|
* so we form a different query per placement, each of which returning
|
||||||
", rows_written "
|
* the node id of the placement.
|
||||||
"FROM worker_partition_query_result"
|
*/
|
||||||
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
ListCell *placementCell = NULL;
|
||||||
shardPlacement->nodeId,
|
foreach(placementCell, shardPlacementList)
|
||||||
quote_literal_cstr(taskPrefix),
|
{
|
||||||
quote_literal_cstr(taskPrefix),
|
ShardPlacement *shardPlacement = lfirst(placementCell);
|
||||||
quote_literal_cstr(selectTask->queryString),
|
StringInfo wrappedQuery = makeStringInfo();
|
||||||
partitionColumnIndex,
|
appendStringInfo(wrappedQuery,
|
||||||
quote_literal_cstr(partitionMethodString),
|
"SELECT %d, partition_index"
|
||||||
minValuesString->data, maxValuesString->data,
|
", %s || '_' || partition_index::text "
|
||||||
binaryFormatString);
|
", rows_written "
|
||||||
|
"FROM worker_partition_query_result"
|
||||||
|
"(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0",
|
||||||
|
shardPlacement->nodeId,
|
||||||
|
quote_literal_cstr(taskPrefix),
|
||||||
|
quote_literal_cstr(taskPrefix),
|
||||||
|
quote_literal_cstr(selectTask->queryString),
|
||||||
|
partitionColumnIndex,
|
||||||
|
quote_literal_cstr(partitionMethodString),
|
||||||
|
minValuesString->data, maxValuesString->data,
|
||||||
|
binaryFormatString);
|
||||||
|
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
|
||||||
|
}
|
||||||
|
|
||||||
selectTask->queryString = wrappedQuery->data;
|
selectTask->queryString = NULL;
|
||||||
|
selectTask->perPlacementQueryStrings = perPlacementQueries;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -244,7 +256,9 @@ ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation)
|
||||||
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
|
TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written",
|
||||||
INT8OID, -1, 0);
|
INT8OID, -1, 0);
|
||||||
|
|
||||||
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor);
|
bool errorOnAnyFailure = false;
|
||||||
|
resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor,
|
||||||
|
errorOnAnyFailure);
|
||||||
|
|
||||||
List *fragmentList = NIL;
|
List *fragmentList = NIL;
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor,
|
||||||
|
@ -298,14 +312,15 @@ TupleToDistributedResultFragment(TupleTableSlot *tupleSlot,
|
||||||
* store containing its results.
|
* store containing its results.
|
||||||
*/
|
*/
|
||||||
static Tuplestorestate *
|
static Tuplestorestate *
|
||||||
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor)
|
ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
|
||||||
|
bool errorOnAnyFailure)
|
||||||
{
|
{
|
||||||
bool hasReturning = true;
|
bool hasReturning = true;
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
bool randomAccess = true;
|
bool randomAccess = true;
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
TransactionProperties xactProperties = {
|
TransactionProperties xactProperties = {
|
||||||
.errorOnAnyFailure = true,
|
.errorOnAnyFailure = errorOnAnyFailure,
|
||||||
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED,
|
||||||
.requires2PC = false
|
.requires2PC = false
|
||||||
};
|
};
|
||||||
|
|
|
@ -117,7 +117,8 @@ static bool MaybeExecutingUDF(void);
|
||||||
void
|
void
|
||||||
UseCoordinatedTransaction(void)
|
UseCoordinatedTransaction(void)
|
||||||
{
|
{
|
||||||
if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED)
|
if (CurrentCoordinatedTransactionState == COORD_TRANS_STARTED ||
|
||||||
|
CurrentCoordinatedTransactionState == COORD_TRANS_STARTED_ON_WORKER)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -130,7 +131,21 @@ UseCoordinatedTransaction(void)
|
||||||
|
|
||||||
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||||
|
|
||||||
AssignDistributedTransactionId();
|
/*
|
||||||
|
* This might be part of bigger distributed transaction originating from
|
||||||
|
* another node, in which case transaction id has already been assigned
|
||||||
|
* by a assign_distributed_transaction_id() call.
|
||||||
|
*/
|
||||||
|
DistributedTransactionId *transactionId = GetCurrentDistributedTransactionId();
|
||||||
|
if (transactionId->transactionNumber == 0)
|
||||||
|
{
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED_ON_WORKER;
|
||||||
|
AssignDistributedTransactionId();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
CurrentCoordinatedTransactionState = COORD_TRANS_STARTED;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -181,7 +181,17 @@ typedef struct Task
|
||||||
TaskType taskType;
|
TaskType taskType;
|
||||||
uint64 jobId;
|
uint64 jobId;
|
||||||
uint32 taskId;
|
uint32 taskId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If queryString != NULL, then we have a single query for all placements.
|
||||||
|
* Otherwise, length of perPlacementQueryStrings is equal to length of
|
||||||
|
* taskPlacementList and can assign a different query for each placement.
|
||||||
|
* We need this flexibility when a query should return node specific values.
|
||||||
|
* For example, on which node did we succeed storing some result files?
|
||||||
|
*/
|
||||||
char *queryString;
|
char *queryString;
|
||||||
|
List *perPlacementQueryStrings;
|
||||||
|
|
||||||
uint64 anchorShardId; /* only applies to compute tasks */
|
uint64 anchorShardId; /* only applies to compute tasks */
|
||||||
List *taskPlacementList; /* only applies to compute tasks */
|
List *taskPlacementList; /* only applies to compute tasks */
|
||||||
List *dependentTaskList; /* only applies to compute tasks */
|
List *dependentTaskList; /* only applies to compute tasks */
|
||||||
|
|
|
@ -35,6 +35,13 @@ typedef enum CoordinatedTransactionState
|
||||||
/* no coordinated transaction in progress, but connections established */
|
/* no coordinated transaction in progress, but connections established */
|
||||||
COORD_TRANS_IDLE,
|
COORD_TRANS_IDLE,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Coordinated transaction was initiated by coordinator, but the worker also
|
||||||
|
* needs to start a coordinated transaction to be able to send commands to
|
||||||
|
* other workers.
|
||||||
|
*/
|
||||||
|
COORD_TRANS_STARTED_ON_WORKER,
|
||||||
|
|
||||||
/* coordinated transaction in progress */
|
/* coordinated transaction in progress */
|
||||||
COORD_TRANS_STARTED,
|
COORD_TRANS_STARTED,
|
||||||
|
|
||||||
|
|
|
@ -3,21 +3,6 @@ CREATE SCHEMA distributed_intermediate_results;
|
||||||
SET search_path TO 'distributed_intermediate_results';
|
SET search_path TO 'distributed_intermediate_results';
|
||||||
SET citus.next_shard_id TO 4213581;
|
SET citus.next_shard_id TO 4213581;
|
||||||
--
|
--
|
||||||
-- Helper UDFs
|
|
||||||
--
|
|
||||||
-- partition_task_list_results tests the internal PartitionTasklistResults function
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
|
|
||||||
query text,
|
|
||||||
target_table regclass,
|
|
||||||
binaryFormat bool DEFAULT true)
|
|
||||||
RETURNS TABLE(resultId text,
|
|
||||||
nodeId int,
|
|
||||||
rowCount bigint,
|
|
||||||
targetShardId bigint,
|
|
||||||
targetShardIndex int)
|
|
||||||
LANGUAGE C STRICT VOLATILE
|
|
||||||
AS 'citus', $$partition_task_list_results$$;
|
|
||||||
--
|
|
||||||
-- We don't have extensive tests for partition_task_results, since it will be
|
-- We don't have extensive tests for partition_task_results, since it will be
|
||||||
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
||||||
--
|
--
|
||||||
|
|
|
@ -0,0 +1,132 @@
|
||||||
|
--
|
||||||
|
-- Test failures related to distributed_intermediate_results.c
|
||||||
|
--
|
||||||
|
CREATE SCHEMA failure_distributed_results;
|
||||||
|
SET search_path TO 'failure_distributed_results';
|
||||||
|
-- do not cache any connections
|
||||||
|
SET citus.max_cached_conns_per_worker TO 0;
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 100800;
|
||||||
|
-- always try the 1st replica before the 2nd replica.
|
||||||
|
SET citus.task_assignment_policy TO 'first-replica';
|
||||||
|
--
|
||||||
|
-- Case 1.
|
||||||
|
-- Source is replicated, target is not.
|
||||||
|
-- Fail the partition query on the first node.
|
||||||
|
--
|
||||||
|
CREATE TABLE source_table(a int);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.shard_count TO 5;
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
|
||||||
|
CREATE TABLE target_table(a int);
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- without failure results from 100802 should be stored on 9060
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
|
NATURAL JOIN pg_dist_node;
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
|
resultid | nodeport | rowcount | targetshardid | targetshardindex
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_from_100800_to_0 | 9060 | 22 | 100805 | 0
|
||||||
|
test_from_100801_to_0 | 57637 | 2 | 100805 | 0
|
||||||
|
test_from_100801_to_1 | 57637 | 15 | 100806 | 1
|
||||||
|
test_from_100802_to_1 | 9060 | 10 | 100806 | 1
|
||||||
|
test_from_100802_to_2 | 9060 | 5 | 100807 | 2
|
||||||
|
test_from_100803_to_2 | 57637 | 18 | 100807 | 2
|
||||||
|
test_from_100803_to_3 | 57637 | 4 | 100808 | 3
|
||||||
|
test_from_100804_to_3 | 9060 | 24 | 100808 | 3
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched;
|
||||||
|
fetched
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*), sum(x) FROM
|
||||||
|
read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int);
|
||||||
|
count | sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
15 | 863
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACk;
|
||||||
|
-- with failure, results from 100802 should be retried and succeed on 57637
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
|
NATURAL JOIN pg_dist_node;
|
||||||
|
WARNING: connection error: localhost:xxxxx
|
||||||
|
DETAIL: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
|
resultid | nodeport | rowcount | targetshardid | targetshardindex
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_from_100800_to_0 | 9060 | 22 | 100805 | 0
|
||||||
|
test_from_100801_to_0 | 57637 | 2 | 100805 | 0
|
||||||
|
test_from_100801_to_1 | 57637 | 15 | 100806 | 1
|
||||||
|
test_from_100802_to_1 | 57637 | 10 | 100806 | 1
|
||||||
|
test_from_100802_to_2 | 57637 | 5 | 100807 | 2
|
||||||
|
test_from_100803_to_2 | 57637 | 18 | 100807 | 2
|
||||||
|
test_from_100803_to_3 | 57637 | 4 | 100808 | 3
|
||||||
|
test_from_100804_to_3 | 9060 | 24 | 100808 | 3
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- fetch from worker 2 should fail
|
||||||
|
SAVEPOINT s1;
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched;
|
||||||
|
ERROR: could not open file "base/pgsql_job_cache/xx_x_xxx/test_from_100802_to_1.data": No such file or directory
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
-- fetch from worker 1 should succeed
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched;
|
||||||
|
fetched
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- make sure the results read are same as the previous transaction block
|
||||||
|
SELECT count(*), sum(x) FROM
|
||||||
|
read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int);
|
||||||
|
count | sum
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
15 | 863
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACk;
|
||||||
|
SET search_path TO 'public';
|
||||||
|
RESET citus.shard_replication_factor;
|
||||||
|
RESET citus.shard_count;
|
||||||
|
RESET citus.task_assignment_policy;
|
||||||
|
DROP SCHEMA failure_distributed_results CASCADE;
|
||||||
|
RESET client_min_messages;
|
|
@ -135,3 +135,15 @@ BEGIN
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
-- partition_task_list_results tests the internal PartitionTasklistResults function
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
|
||||||
|
query text,
|
||||||
|
target_table regclass,
|
||||||
|
binaryFormat bool DEFAULT true)
|
||||||
|
RETURNS TABLE(resultId text,
|
||||||
|
nodeId int,
|
||||||
|
rowCount bigint,
|
||||||
|
targetShardId bigint,
|
||||||
|
targetShardIndex int)
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$partition_task_list_results$$;
|
||||||
|
|
|
@ -6,6 +6,7 @@ test: failure_setup
|
||||||
test: multi_test_helpers
|
test: multi_test_helpers
|
||||||
test: failure_replicated_partitions
|
test: failure_replicated_partitions
|
||||||
test: multi_test_catalog_views
|
test: multi_test_catalog_views
|
||||||
|
test: failure_distributed_results
|
||||||
test: failure_ddl
|
test: failure_ddl
|
||||||
test: failure_truncate
|
test: failure_truncate
|
||||||
test: failure_create_index_concurrently
|
test: failure_create_index_concurrently
|
||||||
|
|
|
@ -4,23 +4,6 @@ SET search_path TO 'distributed_intermediate_results';
|
||||||
|
|
||||||
SET citus.next_shard_id TO 4213581;
|
SET citus.next_shard_id TO 4213581;
|
||||||
|
|
||||||
--
|
|
||||||
-- Helper UDFs
|
|
||||||
--
|
|
||||||
|
|
||||||
-- partition_task_list_results tests the internal PartitionTasklistResults function
|
|
||||||
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
|
|
||||||
query text,
|
|
||||||
target_table regclass,
|
|
||||||
binaryFormat bool DEFAULT true)
|
|
||||||
RETURNS TABLE(resultId text,
|
|
||||||
nodeId int,
|
|
||||||
rowCount bigint,
|
|
||||||
targetShardId bigint,
|
|
||||||
targetShardIndex int)
|
|
||||||
LANGUAGE C STRICT VOLATILE
|
|
||||||
AS 'citus', $$partition_task_list_results$$;
|
|
||||||
|
|
||||||
--
|
--
|
||||||
-- We don't have extensive tests for partition_task_results, since it will be
|
-- We don't have extensive tests for partition_task_results, since it will be
|
||||||
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
--
|
||||||
|
-- Test failures related to distributed_intermediate_results.c
|
||||||
|
--
|
||||||
|
|
||||||
|
|
||||||
|
CREATE SCHEMA failure_distributed_results;
|
||||||
|
SET search_path TO 'failure_distributed_results';
|
||||||
|
|
||||||
|
-- do not cache any connections
|
||||||
|
SET citus.max_cached_conns_per_worker TO 0;
|
||||||
|
|
||||||
|
-- we don't want to see the prepared transaction numbers in the warnings
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 100800;
|
||||||
|
|
||||||
|
-- always try the 1st replica before the 2nd replica.
|
||||||
|
SET citus.task_assignment_policy TO 'first-replica';
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Case 1.
|
||||||
|
-- Source is replicated, target is not.
|
||||||
|
-- Fail the partition query on the first node.
|
||||||
|
--
|
||||||
|
CREATE TABLE source_table(a int);
|
||||||
|
SET citus.shard_replication_factor TO 2;
|
||||||
|
SET citus.shard_count TO 5;
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
|
||||||
|
|
||||||
|
CREATE TABLE target_table(a int);
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
|
-- without failure results from 100802 should be stored on 9060
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
|
NATURAL JOIN pg_dist_node;
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched;
|
||||||
|
SELECT count(*), sum(x) FROM
|
||||||
|
read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int);
|
||||||
|
ROLLBACk;
|
||||||
|
|
||||||
|
-- with failure, results from 100802 should be retried and succeed on 57637
|
||||||
|
SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()');
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE distributed_result_info AS
|
||||||
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
|
NATURAL JOIN pg_dist_node;
|
||||||
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
|
-- fetch from worker 2 should fail
|
||||||
|
SAVEPOINT s1;
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_2_port) > 0 AS fetched;
|
||||||
|
ROLLBACK TO SAVEPOINT s1;
|
||||||
|
-- fetch from worker 1 should succeed
|
||||||
|
SELECT fetch_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[], 'localhost', :worker_1_port) > 0 AS fetched;
|
||||||
|
-- make sure the results read are same as the previous transaction block
|
||||||
|
SELECT count(*), sum(x) FROM
|
||||||
|
read_intermediate_results('{test_from_100802_to_1,test_from_100802_to_2}'::text[],'binary') AS res (x int);
|
||||||
|
ROLLBACk;
|
||||||
|
|
||||||
|
SET search_path TO 'public';
|
||||||
|
RESET citus.shard_replication_factor;
|
||||||
|
RESET citus.shard_count;
|
||||||
|
RESET citus.task_assignment_policy;
|
||||||
|
DROP SCHEMA failure_distributed_results CASCADE;
|
||||||
|
RESET client_min_messages;
|
|
@ -141,3 +141,17 @@ BEGIN
|
||||||
END LOOP;
|
END LOOP;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
-- partition_task_list_results tests the internal PartitionTasklistResults function
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text,
|
||||||
|
query text,
|
||||||
|
target_table regclass,
|
||||||
|
binaryFormat bool DEFAULT true)
|
||||||
|
RETURNS TABLE(resultId text,
|
||||||
|
nodeId int,
|
||||||
|
rowCount bigint,
|
||||||
|
targetShardId bigint,
|
||||||
|
targetShardIndex int)
|
||||||
|
LANGUAGE C STRICT VOLATILE
|
||||||
|
AS 'citus', $$partition_task_list_results$$;
|
||||||
|
|
Loading…
Reference in New Issue