From 527d7d41c142584611ab984b03865760c5a0daf0 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 13:07:19 -0800 Subject: [PATCH 1/2] Implement RedistributeTaskListResult --- .../distributed_intermediate_results.c | 288 ++++++++++++++++++ .../executor/intermediate_results.c | 2 + src/backend/distributed/utils/citus_clauses.c | 6 +- .../distributed/intermediate_results.h | 4 + 4 files changed, 298 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index cd1f80707..480f85b82 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -18,6 +18,7 @@ #include "access/tupdesc.h" #include "catalog/pg_type.h" #include "distributed/intermediate_results.h" +#include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/transaction_management.h" @@ -29,6 +30,28 @@ #include "utils/lsyscache.h" +/* + * NodePair contains the source and destination node in a NodeToNodeFragmentsTransfer. + * It is a separate struct to use it as a key in a hash table. + */ +typedef struct NodePair +{ + int sourceNodeId; + int targetNodeId; +} NodePair; + + +/* + * NodeToNodeFragmentsTransfer contains all fragments that need to be fetched from + * the source node to the destination node in the NodePair. + */ +typedef struct NodeToNodeFragmentsTransfer +{ + NodePair nodes; + List *fragmentList; +} NodeToNodeFragmentsTransfer; + + /* forward declarations of local functions */ static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *targetRelation, @@ -46,6 +69,44 @@ static DistributedResultFragment * TupleToDistributedResultFragment( static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, bool errorOnAnyFailure); +static List ** ColocateFragmentsWithRelation(List *fragmentList, + DistTableCacheEntry *targetRelation); +static List * ColocationTransfers(List *fragmentList, + DistTableCacheEntry *targetRelation); +static List * FragmentTransferTaskList(List *fragmentListTransfers); +static char * QueryStringForFragmentsTransfer( + NodeToNodeFragmentsTransfer *fragmentsTransfer); +static void ExecuteFetchTaskList(List *fetchTaskList); + + +/* + * RedistributeTaskListResults partitions the results of given task list using + * shard ranges and partition method of given targetRelation, and then colocates + * the result files with shards. + * + * If a shard has a replication factor > 1, corresponding result files are copied + * to all nodes containing that shard. + * + * returnValue[shardIndex] is list of cstrings each of which is a resultId which + * correspond to targetRelation->sortedShardIntervalArray[shardIndex]. + */ +List ** +RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat) +{ + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList, + targetRelation, binaryFormat); + return ColocateFragmentsWithRelation(fragmentList, targetRelation); +} /* @@ -64,6 +125,14 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *targetRelation, bool binaryFormat) { + if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH && + targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("repartitioning results of a tasklist is only supported " + "when target relation is hash or range partitioned."))); + } + /* * Make sure that this transaction has a distributed transaction ID. * @@ -333,3 +402,222 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, return resultStore; } + + +/* + * ColocateFragmentsWithRelation moves the fragments in the cluster so they are + * colocated with the shards of target relation. These transfers are done by + * calls to fetch_intermediate_results() between nodes. + * + * returnValue[shardIndex] is list of result Ids that are colocated with + * targetRelation->sortedShardIntervalArray[shardIndex] after fetch tasks are + * done. + */ +static List ** +ColocateFragmentsWithRelation(List *fragmentList, DistTableCacheEntry *targetRelation) +{ + List *fragmentListTransfers = ColocationTransfers(fragmentList, targetRelation); + List *fragmentTransferTaskList = FragmentTransferTaskList(fragmentListTransfers); + + ExecuteFetchTaskList(fragmentTransferTaskList); + + int shardCount = targetRelation->shardIntervalArrayLength; + List **shardResultIdList = palloc0(shardCount * sizeof(List *)); + + ListCell *fragmentCell = NULL; + foreach(fragmentCell, fragmentList) + { + DistributedResultFragment *sourceFragment = lfirst(fragmentCell); + int shardIndex = sourceFragment->targetShardIndex; + + shardResultIdList[shardIndex] = lappend(shardResultIdList[shardIndex], + sourceFragment->resultId); + } + + return shardResultIdList; +} + + +/* + * ColocationTransfers returns a list of transfers to colocate given fragments with + * shards of the target relation. These transfers also take into account replicated + * target relations. This prunes away transfers with same source and target + */ +static List * +ColocationTransfers(List *fragmentList, DistTableCacheEntry *targetRelation) +{ + HASHCTL transferHashInfo; + MemSet(&transferHashInfo, 0, sizeof(HASHCTL)); + transferHashInfo.keysize = sizeof(NodePair); + transferHashInfo.entrysize = sizeof(NodeToNodeFragmentsTransfer); + transferHashInfo.hcxt = CurrentMemoryContext; + HTAB *transferHash = hash_create("Fragment Transfer Hash", 32, &transferHashInfo, + HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + ListCell *fragmentCell = NULL; + foreach(fragmentCell, fragmentList) + { + DistributedResultFragment *fragment = lfirst(fragmentCell); + List *placementList = FinalizedShardPlacementList(fragment->targetShardId); + + ListCell *placementCell = NULL; + foreach(placementCell, placementList) + { + ShardPlacement *placement = lfirst(placementCell); + NodePair transferKey = { + .sourceNodeId = fragment->nodeId, + .targetNodeId = placement->nodeId + }; + + if (transferKey.sourceNodeId == transferKey.targetNodeId) + { + continue; + } + + bool foundInCache = false; + NodeToNodeFragmentsTransfer *fragmentListTransfer = + hash_search(transferHash, &transferKey, HASH_ENTER, &foundInCache); + if (!foundInCache) + { + fragmentListTransfer->nodes = transferKey; + fragmentListTransfer->fragmentList = NIL; + } + + fragmentListTransfer->fragmentList = + lappend(fragmentListTransfer->fragmentList, fragment); + } + } + + List *fragmentListTransfers = NIL; + NodeToNodeFragmentsTransfer *transfer = NULL; + HASH_SEQ_STATUS hashSeqStatus; + + hash_seq_init(&hashSeqStatus, transferHash); + + while ((transfer = hash_seq_search(&hashSeqStatus)) != NULL) + { + fragmentListTransfers = lappend(fragmentListTransfers, transfer); + } + + return fragmentListTransfers; +} + + +/* + * FragmentTransferTaskList returns a list of tasks which performs the given list of + * transfers. Each of the transfers are done by a SQL call to fetch_intermediate_results. + * See QueryStringForFragmentsTransfer for how the query is constructed. + */ +static List * +FragmentTransferTaskList(List *fragmentListTransfers) +{ + List *fetchTaskList = NIL; + ListCell *transferCell = NULL; + + foreach(transferCell, fragmentListTransfers) + { + NodeToNodeFragmentsTransfer *fragmentsTransfer = lfirst(transferCell); + + int targetNodeId = fragmentsTransfer->nodes.targetNodeId; + + /* these should have already been pruned away in ColocationTransfers */ + Assert(targetNodeId != fragmentsTransfer->nodes.sourceNodeId); + + WorkerNode *workerNode = LookupNodeByNodeId(targetNodeId); + + ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement); + targetPlacement->nodeName = workerNode->workerName; + targetPlacement->nodePort = workerNode->workerPort; + targetPlacement->groupId = workerNode->groupId; + + Task *task = CitusMakeNode(Task); + task->taskType = SELECT_TASK; + task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer); + task->taskPlacementList = list_make1(targetPlacement); + + fetchTaskList = lappend(fetchTaskList, task); + } + + return fetchTaskList; +} + + +/* + * QueryStringForFragmentsTransfer returns a query which fetches distributed + * result fragments from source node to target node. See the structure of + * NodeToNodeFragmentsTransfer for details of how these are decided. + */ +static char * +QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer) +{ + ListCell *fragmentCell = NULL; + StringInfo queryString = makeStringInfo(); + StringInfo fragmentNamesArrayString = makeStringInfo(); + int fragmentCount = 0; + NodePair *nodePair = &fragmentsTransfer->nodes; + WorkerNode *sourceNode = LookupNodeByNodeId(nodePair->sourceNodeId); + + appendStringInfoString(fragmentNamesArrayString, "ARRAY["); + + foreach(fragmentCell, fragmentsTransfer->fragmentList) + { + DistributedResultFragment *fragment = lfirst(fragmentCell); + char *fragmentName = fragment->resultId; + + if (fragmentCount > 0) + { + appendStringInfoString(fragmentNamesArrayString, ","); + } + + appendStringInfoString(fragmentNamesArrayString, + quote_literal_cstr(fragmentName)); + + fragmentCount++; + } + + appendStringInfoString(fragmentNamesArrayString, "]::text[]"); + + appendStringInfo(queryString, + "SELECT bytes FROM fetch_intermediate_results(%s,%s,%d) bytes", + fragmentNamesArrayString->data, + quote_literal_cstr(sourceNode->workerName), + sourceNode->workerPort); + + ereport(DEBUG3, (errmsg("fetch task on %s:%d: %s", sourceNode->workerName, + sourceNode->workerPort, queryString->data))); + + return queryString->data; +} + + +/* + * ExecuteFetchTaskList executes a list of fetch_intermediate_results() tasks. + * It ignores the byte_count result of the fetch_intermediate_results() calls. + */ +static void +ExecuteFetchTaskList(List *taskList) +{ + TupleDesc resultDescriptor = NULL; + Tuplestorestate *resultStore = NULL; + int resultColumnCount = 1; + +#if PG_VERSION_NUM >= 120000 + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount); +#else + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); +#endif + + TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "byte_count", INT8OID, -1, 0); + + bool errorOnAnyFailure = true; + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor, + errorOnAnyFailure); + + TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, + &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(resultStore, true, false, slot)) + { + ExecClearTuple(slot); + } +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 3b1b0a967..9366e9dfc 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -847,6 +847,8 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); } + UnclaimConnection(connection); + PG_RETURN_INT64(totalBytesWritten); } diff --git a/src/backend/distributed/utils/citus_clauses.c b/src/backend/distributed/utils/citus_clauses.c index d358ac48d..2f18be9c7 100644 --- a/src/backend/distributed/utils/citus_clauses.c +++ b/src/backend/distributed/utils/citus_clauses.c @@ -234,7 +234,8 @@ citus_evaluate_expr(Expr *expr, Oid result_type, int32 result_typmod, static bool CitusIsVolatileFunctionIdChecker(Oid func_id, void *context) { - if (func_id == CitusReadIntermediateResultFuncId()) + if (func_id == CitusReadIntermediateResultFuncId() || + func_id == CitusReadIntermediateResultArrayFuncId()) { return false; } @@ -273,7 +274,8 @@ CitusIsVolatileFunction(Node *node) static bool CitusIsMutableFunctionIdChecker(Oid func_id, void *context) { - if (func_id == CitusReadIntermediateResultFuncId()) + if (func_id == CitusReadIntermediateResultFuncId() || + func_id == CitusReadIntermediateResultArrayFuncId()) { return false; } diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index b637a37c6..61e670fbc 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -60,6 +60,10 @@ extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); /* distributed_intermediate_results.c */ +extern List ** RedistributeTaskListResults(char *resultIdPrefix, + List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat); extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, DistTableCacheEntry *distributionScheme, bool binaryFormat); From 40ba2cdd6edf964f1acc162ec6fcdc006b9f1ce9 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Thu, 9 Jan 2020 13:08:00 -0800 Subject: [PATCH 2/2] Test RedistributeTaskListResult --- .../test/distributed_intermediate_results.c | 78 +++++ src/test/regress/bin/normalize.sed | 3 + .../distributed_intermediate_results.out | 297 ++++++++++++++++-- .../sql/distributed_intermediate_results.sql | 195 +++++++++++- 4 files changed, 543 insertions(+), 30 deletions(-) diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 43a4abbd8..474a350df 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -18,15 +18,18 @@ #include "miscadmin.h" #include "pgstat.h" +#include "catalog/pg_type.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" #include "distributed/intermediate_results.h" #include "distributed/multi_executor.h" #include "distributed/remote_commands.h" #include "distributed/tuplestore.h" +#include "distributed/listutils.h" #include "tcop/tcopprot.h" PG_FUNCTION_INFO_V1(partition_task_list_results); +PG_FUNCTION_INFO_V1(redistribute_task_list_results); /* * partition_task_list_results partitions results of each of distributed @@ -89,3 +92,78 @@ partition_task_list_results(PG_FUNCTION_ARGS) PG_RETURN_DATUM(0); } + + +/* + * redistribute_task_list_results exposes RedistributeTaskListResult for testing. + * It executes a query and repartitions and colocates its results according to + * a relation. + */ +Datum +redistribute_task_list_results(PG_FUNCTION_ARGS) +{ + text *resultIdPrefixText = PG_GETARG_TEXT_P(0); + char *resultIdPrefix = text_to_cstring(resultIdPrefixText); + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + Oid relationId = PG_GETARG_OID(2); + bool binaryFormat = PG_GETARG_BOOL(3); + + Query *parsedQuery = ParseQueryString(queryString, NULL, 0); + PlannedStmt *queryPlan = pg_plan_query(parsedQuery, + CURSOR_OPT_PARALLEL_OK, + NULL); + if (!IsCitusCustomScan(queryPlan->planTree)) + { + ereport(ERROR, (errmsg("query must be distributed and shouldn't require " + "any merging on the coordinator."))); + } + + CustomScan *customScan = (CustomScan *) queryPlan->planTree; + DistributedPlan *distributedPlan = GetDistributedPlan(customScan); + + Job *job = distributedPlan->workerJob; + List *taskList = job->taskList; + + DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId); + List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, + targetRelation, binaryFormat); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + int shardCount = targetRelation->shardIntervalArrayLength; + + for (int shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + ShardInterval *shardInterval = + targetRelation->sortedShardIntervalArray[shardIndex]; + uint64 shardId = shardInterval->shardId; + + int fragmentCount = list_length(shardResultIds[shardIndex]); + Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum)); + List *sortedResultIds = SortList(shardResultIds[shardIndex], pg_qsort_strcmp); + + ListCell *resultIdCell = NULL; + int resultIdIndex = 0; + foreach(resultIdCell, sortedResultIds) + { + char *resultId = lfirst(resultIdCell); + resultIdValues[resultIdIndex++] = CStringGetTextDatum(resultId); + } + + ArrayType *resultIdArray = DatumArrayToArrayType(resultIdValues, fragmentCount, + TEXTOID); + + bool columnNulls[2] = { 0 }; + Datum columnValues[2] = { + Int64GetDatum(shardId), + PointerGetDatum(resultIdArray) + }; + + tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); + } + + tuplestore_donestoring(tupleStore); + + PG_RETURN_DATUM(0); +} diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index dcc79bfa2..0f9985a73 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -76,6 +76,9 @@ s/_id_other_column_ref_fkey/_id_fkey/g # intermediate_results s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g +# toast tables +s/pg_toast_[0-9]+/pg_toast_xxxxx/g + # Plan numbers are not very stable, so we normalize those # subplan numbers are quite stable so we keep those s/DEBUG: Plan [0-9]+/DEBUG: Plan XXX/g diff --git a/src/test/regress/expected/distributed_intermediate_results.out b/src/test/regress/expected/distributed_intermediate_results.out index 24d3ce1dc..0582c36ab 100644 --- a/src/test/regress/expected/distributed_intermediate_results.out +++ b/src/test/regress/expected/distributed_intermediate_results.out @@ -2,12 +2,26 @@ CREATE SCHEMA distributed_intermediate_results; SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; +SET citus.shard_replication_factor TO 1; +-- redistribute_task_list_results test the internal RedistributeTaskListResult +CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(shardid bigint, + colocated_results text[]) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$redistribute_task_list_results$$; -- --- We don't have extensive tests for partition_task_results, since it will be --- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. +-- We don't have extensive tests for partition_task_results or +-- redistribute_task_list_results, since they will be tested by higher level +-- "INSERT/SELECT with repartitioning" tests anyway. -- -- --- partition_task_list_results, hash partitioning, binary format +-- Case 1. +-- hash partitioning, binary format +-- * partition_task_list_results +-- * redistribute_task_list_results -- CREATE TABLE source_table(a int); SET citus.shard_count TO 3; @@ -26,6 +40,16 @@ SELECT create_distributed_table('target_table', 'a'); (1 row) +CREATE TABLE colocated_with_target(a int); +SELECT create_distributed_table('colocated_with_target', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES (1), (2); +-- partition_task_list_results -- should error out SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table'); ERROR: query must be distributed and shouldn't require any merging on the coordinator. @@ -63,10 +87,44 @@ SELECT count(*), sum(x) FROM 100 | 5050 (1 row) -END; -DROP TABLE source_table, target_table, distributed_result_info; +ROLLBACK; +-- redistribute_task_list_results +-- Verify that redistribute_task_list_results colocated fragments properly by reading the +-- expected colocated results on the same node as each of two shards. +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; + shardid | colocated_results +--------------------------------------------------------------------- + 4213584 | {test_from_4213581_to_0,test_from_4213582_to_0} + 4213585 | {test_from_4213582_to_1,test_from_4213583_to_1} +(2 rows) + +WITH shard_1 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 1 +), shard_2 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 2 +), all_rows AS ( + (SELECT * FROM shard_1) UNION (SELECT * FROM shard_2) +) +SELECT count(*), sum(x) FROM all_rows; + count | sum +--------------------------------------------------------------------- + 100 | 5050 +(1 row) + +ROLLBACK; +DROP TABLE source_table, target_table, colocated_with_target; -- --- partition_task_list_results, range partitioning, text format +-- Case 2. +-- range partitioning, text format +-- * partition_task_list_results +-- * redistribute_task_list_results -- CREATE TABLE source_table(a int); SELECT create_distributed_table('source_table', 'a', 'range'); @@ -89,6 +147,19 @@ SELECT create_distributed_table('target_table', 'a', 'range'); CALL public.create_range_partitioned_shards('target_table', '{0,25,50,76}', '{24,49,75,200}'); +CREATE TABLE colocated_with_target(a int); +SELECT create_distributed_table('colocated_with_target', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('colocated_with_target', + '{0,25,50,76}', + '{24,49,75,200}'); +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES (1), (26), (51), (77); +-- partition_task_list_results BEGIN; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex @@ -98,22 +169,22 @@ CREATE TABLE distributed_result_info AS SELECT * FROM distributed_result_info ORDER BY resultId; resultid | nodeport | rowcount | targetshardid | targetshardindex --------------------------------------------------------------------- - test_from_4213586_to_0 | 57638 | 7 | 4213590 | 0 - test_from_4213586_to_1 | 57638 | 6 | 4213591 | 1 - test_from_4213586_to_2 | 57638 | 7 | 4213592 | 2 - test_from_4213586_to_3 | 57638 | 4 | 4213593 | 3 - test_from_4213587_to_0 | 57637 | 7 | 4213590 | 0 - test_from_4213587_to_1 | 57637 | 6 | 4213591 | 1 - test_from_4213587_to_2 | 57637 | 8 | 4213592 | 2 - test_from_4213587_to_3 | 57637 | 4 | 4213593 | 3 - test_from_4213588_to_0 | 57638 | 8 | 4213590 | 0 - test_from_4213588_to_1 | 57638 | 6 | 4213591 | 1 - test_from_4213588_to_2 | 57638 | 8 | 4213592 | 2 - test_from_4213588_to_3 | 57638 | 4 | 4213593 | 3 - test_from_4213589_to_0 | 57637 | 8 | 4213590 | 0 - test_from_4213589_to_1 | 57637 | 6 | 4213591 | 1 - test_from_4213589_to_2 | 57637 | 7 | 4213592 | 2 - test_from_4213589_to_3 | 57637 | 4 | 4213593 | 3 + test_from_4213588_to_0 | 57638 | 7 | 4213592 | 0 + test_from_4213588_to_1 | 57638 | 6 | 4213593 | 1 + test_from_4213588_to_2 | 57638 | 7 | 4213594 | 2 + test_from_4213588_to_3 | 57638 | 4 | 4213595 | 3 + test_from_4213589_to_0 | 57637 | 7 | 4213592 | 0 + test_from_4213589_to_1 | 57637 | 6 | 4213593 | 1 + test_from_4213589_to_2 | 57637 | 8 | 4213594 | 2 + test_from_4213589_to_3 | 57637 | 4 | 4213595 | 3 + test_from_4213590_to_0 | 57638 | 8 | 4213592 | 0 + test_from_4213590_to_1 | 57638 | 6 | 4213593 | 1 + test_from_4213590_to_2 | 57638 | 8 | 4213594 | 2 + test_from_4213590_to_3 | 57638 | 4 | 4213595 | 3 + test_from_4213591_to_0 | 57637 | 8 | 4213592 | 0 + test_from_4213591_to_1 | 57637 | 6 | 4213593 | 1 + test_from_4213591_to_2 | 57637 | 7 | 4213594 | 2 + test_from_4213591_to_3 | 57637 | 4 | 4213595 | 3 (16 rows) -- fetch from workers @@ -135,10 +206,190 @@ SELECT count(*), sum(x) FROM 100 | 4550 (1 row) +ROLLBACK; +-- redistribute_task_list_results +-- Verify that redistribute_task_list_results colocated fragments properly by reading the +-- expected colocated results on the same node as each of two shards. +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; + shardid | colocated_results +--------------------------------------------------------------------- + 4213592 | {test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0} + 4213593 | {test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1} + 4213594 | {test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2} + 4213595 | {test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3} +(4 rows) + +WITH shard_1 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 1 +), shard_2 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 26 +), shard_3 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 51 +), shard_4 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 77 +), all_rows AS ( + (SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL + (SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4) +) +SELECT count(*), sum(x) FROM all_rows; + count | sum +--------------------------------------------------------------------- + 100 | 4550 +(1 row) + +ROLLBACK; +DROP TABLE source_table, target_table, colocated_with_target; +-- +-- Case 3. +-- range partitioning, text format, replication factor 2 (both source and destination) +-- composite distribution column +-- +-- only redistribute_task_list_results +-- +CREATE TYPE composite_key_type AS (f1 int, f2 text); +SET citus.shard_replication_factor TO 2; +-- source +CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type); +SELECT create_distributed_table('source_table', 'key', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard xxxxx -> shard xxxxx +INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard xxxxx -> shard xxxxx +-- target +CREATE TABLE target_table(key composite_key_type, value int); +SELECT create_distributed_table('target_table', 'key', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +-- colocated with target, used for routing calls to read_intermediate_results +CREATE TABLE colocated_with_target(key composite_key_type, value_sum int); +SELECT create_distributed_table('colocated_with_target', 'key', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES ((0,'a'), 0); +INSERT INTO colocated_with_target VALUES ((25, 'a'), 0); +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; + shardid | colocated_results +--------------------------------------------------------------------- + 4213602 | {test_from_4213600_to_0,test_from_4213601_to_0} + 4213603 | {test_from_4213600_to_1,test_from_4213601_to_1} +(2 rows) + +UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int)) + WHERE key=(0,'a')::composite_key_type; +UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int)) + WHERE key=(25,'a')::composite_key_type; +SELECT * FROM colocated_with_target ORDER BY key; + key | value_sum +--------------------------------------------------------------------- + (0,a) | 11 + (25,a) | 17 +(2 rows) + END; -DROP TABLE source_table, target_table, distributed_result_info; +-- verify that replicas of colocated_with_target are consistent (i.e. copies +-- of result files in both nodes were same when calling read_intermediate_results() +-- in the above UPDATE calls). +\c - - - :worker_1_port +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key; + key | value_sum +--------------------------------------------------------------------- + (0,a) | 11 +(1 row) + +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key; + key | value_sum +--------------------------------------------------------------------- + (25,a) | 17 +(1 row) + +\c - - - :worker_2_port +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key; + key | value_sum +--------------------------------------------------------------------- + (0,a) | 11 +(1 row) + +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key; + key | value_sum +--------------------------------------------------------------------- + (25,a) | 17 +(1 row) + +\c - - - :master_port +SET search_path TO 'distributed_intermediate_results'; +DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info; +DROP TYPE composite_key_type; +-- +-- Case 4. target relation is a reference table or an append partitioned table +-- +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table_reference(a int); +SELECT create_reference_table('target_table_reference'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE target_table_append(a int); +SELECT create_distributed_table('target_table_append', 'a', 'append'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference'); +ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned. +ROLLBACK; +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append'); +ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned. +ROLLBACK; +-- clean-up SET client_min_messages TO WARNING; DROP SCHEMA distributed_intermediate_results CASCADE; \set VERBOSITY default SET client_min_messages TO DEFAULT; SET citus.shard_count TO DEFAULT; +SET citus.shard_replication_factor TO DEFAULT; diff --git a/src/test/regress/sql/distributed_intermediate_results.sql b/src/test/regress/sql/distributed_intermediate_results.sql index 2d59abf45..47a4d9373 100644 --- a/src/test/regress/sql/distributed_intermediate_results.sql +++ b/src/test/regress/sql/distributed_intermediate_results.sql @@ -3,14 +3,29 @@ CREATE SCHEMA distributed_intermediate_results; SET search_path TO 'distributed_intermediate_results'; SET citus.next_shard_id TO 4213581; +SET citus.shard_replication_factor TO 1; + +-- redistribute_task_list_results test the internal RedistributeTaskListResult +CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(shardid bigint, + colocated_results text[]) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$redistribute_task_list_results$$; -- --- We don't have extensive tests for partition_task_results, since it will be --- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. +-- We don't have extensive tests for partition_task_results or +-- redistribute_task_list_results, since they will be tested by higher level +-- "INSERT/SELECT with repartitioning" tests anyway. -- -- --- partition_task_list_results, hash partitioning, binary format +-- Case 1. +-- hash partitioning, binary format +-- * partition_task_list_results +-- * redistribute_task_list_results -- CREATE TABLE source_table(a int); @@ -22,6 +37,13 @@ CREATE TABLE target_table(a int); SET citus.shard_count TO 2; SELECT create_distributed_table('target_table', 'a'); +CREATE TABLE colocated_with_target(a int); +SELECT create_distributed_table('colocated_with_target', 'a'); +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES (1), (2); + +-- partition_task_list_results + -- should error out SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table'); SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table'); @@ -39,12 +61,36 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', SELECT count(*), sum(x) FROM read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), 'binary') AS res (x int); -END; +ROLLBACK; -DROP TABLE source_table, target_table, distributed_result_info; +-- redistribute_task_list_results +-- Verify that redistribute_task_list_results colocated fragments properly by reading the +-- expected colocated results on the same node as each of two shards. +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; +WITH shard_1 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 1 +), shard_2 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 2 +), all_rows AS ( + (SELECT * FROM shard_1) UNION (SELECT * FROM shard_2) +) +SELECT count(*), sum(x) FROM all_rows; +ROLLBACK; + +DROP TABLE source_table, target_table, colocated_with_target; -- --- partition_task_list_results, range partitioning, text format +-- Case 2. +-- range partitioning, text format +-- * partition_task_list_results +-- * redistribute_task_list_results -- CREATE TABLE source_table(a int); SELECT create_distributed_table('source_table', 'a', 'range'); @@ -58,7 +104,15 @@ SELECT create_distributed_table('target_table', 'a', 'range'); CALL public.create_range_partitioned_shards('target_table', '{0,25,50,76}', '{24,49,75,200}'); +CREATE TABLE colocated_with_target(a int); +SELECT create_distributed_table('colocated_with_target', 'a', 'range'); +CALL public.create_range_partitioned_shards('colocated_with_target', + '{0,25,50,76}', + '{24,49,75,200}'); +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES (1), (26), (51), (77); +-- partition_task_list_results BEGIN; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex @@ -74,9 +128,135 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', SELECT count(*), sum(x) FROM read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), 'text') AS res (x int); +ROLLBACK; + +-- redistribute_task_list_results +-- Verify that redistribute_task_list_results colocated fragments properly by reading the +-- expected colocated results on the same node as each of two shards. +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; + +WITH shard_1 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 1 +), shard_2 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 26 +), shard_3 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 51 +), shard_4 AS ( + SELECT t.* FROM colocated_with_target, ( + SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t + WHERE colocated_with_target.a = 77 +), all_rows AS ( + (SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL + (SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4) +) +SELECT count(*), sum(x) FROM all_rows; +ROLLBACK; + +DROP TABLE source_table, target_table, colocated_with_target; + + +-- +-- Case 3. +-- range partitioning, text format, replication factor 2 (both source and destination) +-- composite distribution column +-- +-- only redistribute_task_list_results +-- +CREATE TYPE composite_key_type AS (f1 int, f2 text); +SET citus.shard_replication_factor TO 2; + +-- source +CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type); +SELECT create_distributed_table('source_table', 'key', 'range'); +CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); + +INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard 1 -> shard 1 +INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard 1 -> shard 2 +INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard 1 -> shard 1 +INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard 1 -> shard 2 +INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard 2 -> shard 2 +INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard 2 -> shard 2 +INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard 2 -> shard 1 + +-- target +CREATE TABLE target_table(key composite_key_type, value int); +SELECT create_distributed_table('target_table', 'key', 'range'); +CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); + +-- colocated with target, used for routing calls to read_intermediate_results +CREATE TABLE colocated_with_target(key composite_key_type, value_sum int); +SELECT create_distributed_table('colocated_with_target', 'key', 'range'); +CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +-- one value per shard, so we can route calls to read_intermediate_shards +INSERT INTO colocated_with_target VALUES ((0,'a'), 0); +INSERT INTO colocated_with_target VALUES ((25, 'a'), 0); + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table'); +SELECT * FROM distributed_result_info ORDER BY shardid; + +UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int)) + WHERE key=(0,'a')::composite_key_type; +UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int)) + WHERE key=(25,'a')::composite_key_type; + +SELECT * FROM colocated_with_target ORDER BY key; + END; -DROP TABLE source_table, target_table, distributed_result_info; +-- verify that replicas of colocated_with_target are consistent (i.e. copies +-- of result files in both nodes were same when calling read_intermediate_results() +-- in the above UPDATE calls). + +\c - - - :worker_1_port +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key; +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key; + +\c - - - :worker_2_port +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key; +SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key; + +\c - - - :master_port + +SET search_path TO 'distributed_intermediate_results'; +DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info; +DROP TYPE composite_key_type; + +-- +-- Case 4. target relation is a reference table or an append partitioned table +-- + +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table_reference(a int); +SELECT create_reference_table('target_table_reference'); + +CREATE TABLE target_table_append(a int); +SELECT create_distributed_table('target_table_append', 'a', 'append'); + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference'); +ROLLBACK; + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append'); +ROLLBACK; + +-- clean-up SET client_min_messages TO WARNING; DROP SCHEMA distributed_intermediate_results CASCADE; @@ -84,3 +264,4 @@ DROP SCHEMA distributed_intermediate_results CASCADE; \set VERBOSITY default SET client_min_messages TO DEFAULT; SET citus.shard_count TO DEFAULT; +SET citus.shard_replication_factor TO DEFAULT;