diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c new file mode 100644 index 000000000..f68638ff7 --- /dev/null +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -0,0 +1,320 @@ +/*------------------------------------------------------------------------- + * + * distributed_intermediate_results.c + * Functions for reading and writing distributed intermediate results. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "miscadmin.h" +#include "port.h" + +#include "access/tupdesc.h" +#include "catalog/pg_type.h" +#include "distributed/intermediate_results.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_executor.h" +#include "distributed/transaction_management.h" +#include "distributed/tuplestore.h" +#include "distributed/worker_protocol.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" + + +/* forward declarations of local functions */ +static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat); +static List * ExecutePartitionTaskList(List *partitionTaskList, + DistTableCacheEntry *targetRelation); +static ArrayType * CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int + datumCount, Oid typeId); +static void ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, + Oid intervalTypeId, ArrayType **minValueArray, + ArrayType **maxValueArray); +static char * SourceShardPrefix(char *resultPrefix, uint64 shardId); +static DistributedResultFragment * TupleToDistributedResultFragment( + TupleTableSlot *tupleSlot, DistTableCacheEntry *targetRelation); +static Tuplestorestate * ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc + resultDescriptor); + + +/* + * PartitionTasklistResults executes the given task list, and partitions results + * of each task based on targetRelation's distribution method and intervals. + * Each of the result partitions are stored in the node where task was executed, + * and are named as $resultIdPrefix_from_$sourceShardId_to_$targetShardIndex. + * + * Result is list of DistributedResultFragment, each of which represents a + * partition of results. Empty results are omitted. Therefore, if we have N tasks + * and target relation has M shards, we will have NxM-(number of empty results) + * fragments. + */ +List * +PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat) +{ + /* + * Make sure that this transaction has a distributed transaction ID. + * + * Intermediate results will be stored in a directory that is derived + * from the distributed transaction ID. + */ + UseCoordinatedTransaction(); + + WrapTasksForPartitioning(resultIdPrefix, selectTaskList, targetRelation, + binaryFormat); + return ExecutePartitionTaskList(selectTaskList, targetRelation); +} + + +/* + * WrapTasksForPartitioning wraps the query for each of the tasks by a call + * to worker_partition_query_result(). Target list of the wrapped query should + * match the tuple descriptor in ExecutePartitionTaskList(). + */ +static void +WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *targetRelation, + bool binaryFormat) +{ + ListCell *taskCell = NULL; + ShardInterval **shardIntervalArray = targetRelation->sortedShardIntervalArray; + int shardCount = targetRelation->shardIntervalArrayLength; + + ArrayType *minValueArray = NULL; + ArrayType *maxValueArray = NULL; + Var *partitionColumn = targetRelation->partitionColumn; + int partitionColumnIndex = partitionColumn->varoattno - 1; + Oid intervalTypeId = partitionColumn->vartype; + int32 intervalTypeMod = partitionColumn->vartypmod; + Oid intervalTypeOutFunc = InvalidOid; + bool intervalTypeVarlena = false; + getTypeOutputInfo(intervalTypeId, &intervalTypeOutFunc, &intervalTypeVarlena); + + ShardMinMaxValueArrays(shardIntervalArray, shardCount, intervalTypeOutFunc, + &minValueArray, &maxValueArray); + StringInfo minValuesString = ArrayObjectToString(minValueArray, TEXTOID, + intervalTypeMod); + StringInfo maxValuesString = ArrayObjectToString(maxValueArray, TEXTOID, + intervalTypeMod); + + foreach(taskCell, selectTaskList) + { + Task *selectTask = (Task *) lfirst(taskCell); + StringInfo wrappedQuery = makeStringInfo(); + List *shardPlacementList = selectTask->taskPlacementList; + + ShardPlacement *shardPlacement = linitial(shardPlacementList); + char *taskPrefix = SourceShardPrefix(resultIdPrefix, selectTask->anchorShardId); + char *partitionMethodString = targetRelation->partitionMethod == 'h' ? + "hash" : "range"; + const char *binaryFormatString = binaryFormat ? "true" : "false"; + + appendStringInfo(wrappedQuery, + "SELECT %d, partition_index" + ", %s || '_' || partition_index::text " + ", rows_written " + "FROM worker_partition_query_result" + "(%s,%s,%d,%s,%s,%s,%s) WHERE rows_written > 0", + shardPlacement->nodeId, + quote_literal_cstr(taskPrefix), + quote_literal_cstr(taskPrefix), + quote_literal_cstr(selectTask->queryString), + partitionColumnIndex, + quote_literal_cstr(partitionMethodString), + minValuesString->data, maxValuesString->data, + binaryFormatString); + + selectTask->queryString = wrappedQuery->data; + } +} + + +/* + * SourceShardPrefix returns result id prefix for partitions which have the + * given anchor shard id. + */ +static char * +SourceShardPrefix(char *resultPrefix, uint64 shardId) +{ + StringInfo taskPrefix = makeStringInfo(); + + appendStringInfo(taskPrefix, "%s_from_" UINT64_FORMAT "_to", resultPrefix, shardId); + + return taskPrefix->data; +} + + +/* + * ShardMinMaxValueArrays returns min values and max values of given shard + * intervals. Returned arrays are text arrays. + */ +static void +ShardMinMaxValueArrays(ShardInterval **shardIntervalArray, int shardCount, + Oid intervalTypeOutFunc, ArrayType **minValueArray, + ArrayType **maxValueArray) +{ + Datum *minValues = palloc0(shardCount * sizeof(Datum)); + bool *minValueNulls = palloc0(shardCount * sizeof(bool)); + Datum *maxValues = palloc0(shardCount * sizeof(Datum)); + bool *maxValueNulls = palloc0(shardCount * sizeof(bool)); + for (int shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + minValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->minValueExists; + maxValueNulls[shardIndex] = !shardIntervalArray[shardIndex]->maxValueExists; + + if (!minValueNulls[shardIndex]) + { + Datum minValue = shardIntervalArray[shardIndex]->minValue; + char *minValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc, + minValue)); + minValues[shardIndex] = CStringGetTextDatum(minValueStr); + } + + if (!maxValueNulls[shardIndex]) + { + Datum maxValue = shardIntervalArray[shardIndex]->maxValue; + char *maxValueStr = DatumGetCString(OidFunctionCall1(intervalTypeOutFunc, + maxValue)); + maxValues[shardIndex] = CStringGetTextDatum(maxValueStr); + } + } + + *minValueArray = CreateArrayFromDatums(minValues, minValueNulls, shardCount, TEXTOID); + *maxValueArray = CreateArrayFromDatums(maxValues, maxValueNulls, shardCount, TEXTOID); +} + + +/* + * CreateArrayFromDatums creates an array consisting of given values and nulls. + */ +static ArrayType * +CreateArrayFromDatums(Datum *datumArray, bool *nullsArray, int datumCount, Oid typeId) +{ + bool typeByValue = false; + char typeAlignment = 0; + int16 typeLength = 0; + int dimensions[1] = { datumCount }; + int lowerbounds[1] = { 1 }; + + get_typlenbyvalalign(typeId, &typeLength, &typeByValue, &typeAlignment); + + ArrayType *datumArrayObject = construct_md_array(datumArray, nullsArray, 1, + dimensions, + lowerbounds, typeId, typeLength, + typeByValue, typeAlignment); + + return datumArrayObject; +} + + +/* + * ExecutePartitionTaskList executes the queries formed in WrapTasksForPartitioning(), + * and returns its results as a list of DistributedResultFragment. + */ +static List * +ExecutePartitionTaskList(List *taskList, DistTableCacheEntry *targetRelation) +{ + TupleDesc resultDescriptor = NULL; + Tuplestorestate *resultStore = NULL; + int resultColumnCount = 4; + +#if PG_VERSION_NUM >= 120000 + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount); +#else + resultDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); +#endif + + TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "node_id", + INT8OID, -1, 0); + TupleDescInitEntry(resultDescriptor, (AttrNumber) 2, "partition_index", + INT4OID, -1, 0); + TupleDescInitEntry(resultDescriptor, (AttrNumber) 3, "result_id", + TEXTOID, -1, 0); + TupleDescInitEntry(resultDescriptor, (AttrNumber) 4, "rows_written", + INT8OID, -1, 0); + + resultStore = ExecuteSelectTasksIntoTupleStore(taskList, resultDescriptor); + + List *fragmentList = NIL; + TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(resultDescriptor, + &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(resultStore, true, false, slot)) + { + DistributedResultFragment *distributedResultFragment = + TupleToDistributedResultFragment(slot, targetRelation); + + fragmentList = lappend(fragmentList, distributedResultFragment); + + ExecClearTuple(slot); + } + + return fragmentList; +} + + +/* + * TupleToDistributedResultFragment converts a tuple returned by the query in + * WrapTasksForPartitioning() to a DistributedResultFragment. + */ +static DistributedResultFragment * +TupleToDistributedResultFragment(TupleTableSlot *tupleSlot, + DistTableCacheEntry *targetRelation) +{ + bool isNull = false; + int sourceNodeId = DatumGetInt32(slot_getattr(tupleSlot, 1, &isNull)); + int targetShardIndex = DatumGetInt32(slot_getattr(tupleSlot, 2, &isNull)); + text *resultId = DatumGetTextP(slot_getattr(tupleSlot, 3, &isNull)); + int64 rowCount = DatumGetInt64(slot_getattr(tupleSlot, 4, &isNull)); + + ShardInterval *shardInterval = + targetRelation->sortedShardIntervalArray[targetShardIndex]; + + DistributedResultFragment *distributedResultFragment = + palloc0(sizeof(DistributedResultFragment)); + + distributedResultFragment->nodeId = sourceNodeId; + distributedResultFragment->targetShardIndex = targetShardIndex; + distributedResultFragment->targetShardId = shardInterval->shardId; + distributedResultFragment->resultId = text_to_cstring(resultId); + distributedResultFragment->rowCount = rowCount; + + return distributedResultFragment; +} + + +/* + * ExecuteSelectTasksIntoTupleStore executes the given tasks and returns a tuple + * store containing its results. + */ +static Tuplestorestate * +ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor) +{ + bool hasReturning = true; + int targetPoolSize = MaxAdaptiveExecutorPoolSize; + bool randomAccess = true; + bool interTransactions = false; + TransactionProperties xactProperties = { + .errorOnAnyFailure = true, + .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, + .requires2PC = false + }; + + Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions, + work_mem); + + ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, + resultStore, hasReturning, targetPoolSize, &xactProperties); + + return resultStore; +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 6dc0438ce..3b1b0a967 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -834,12 +834,11 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) if (PQstatus(connection->pgConn) != CONNECTION_OK) { - ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch intermediate " - "results", + ereport(ERROR, (errmsg("cannot connect to %s:%d to fetch intermediate results", remoteHost, remotePort))); } - RemoteTransactionBegin(connection); + RemoteTransactionBeginIfNecessary(connection); for (resultIndex = 0; resultIndex < resultCount; resultIndex++) { @@ -848,9 +847,6 @@ fetch_intermediate_results(PG_FUNCTION_ARGS) totalBytesWritten += FetchRemoteIntermediateResult(connection, resultId); } - RemoteTransactionCommit(connection); - CloseConnection(connection); - PG_RETURN_INT64(totalBytesWritten); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 26b4c1ab3..67f786cfd 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -198,8 +198,6 @@ static List * MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList); static StringInfo CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, char *partitionColumnName); static char * ColumnName(Var *column, List *rangeTableList); -static StringInfo SplitPointArrayString(ArrayType *splitPointObject, - Oid columnType, int32 columnTypeMod); static List * MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex); static StringInfo ColumnNameArrayString(uint32 columnCount, uint64 generatingJobId); @@ -4277,9 +4275,9 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, } ArrayType *splitPointObject = SplitPointObject(intervalArray, intervalCount); - StringInfo splitPointString = SplitPointArrayString(splitPointObject, - partitionColumnType, - partitionColumnTypeMod); + StringInfo splitPointString = ArrayObjectToString(splitPointObject, + partitionColumnType, + partitionColumnTypeMod); char *partitionCommand = NULL; if (partitionType == RANGE_PARTITION_TYPE) @@ -4407,14 +4405,12 @@ ColumnName(Var *column, List *rangeTableList) /* - * SplitPointArrayString takes the array representation of the given split point - * object, and converts this array (and array's typed elements) to their string - * representations. + * ArrayObjectToString converts an SQL object to its string representation. */ -static StringInfo -SplitPointArrayString(ArrayType *splitPointObject, Oid columnType, int32 columnTypeMod) +StringInfo +ArrayObjectToString(ArrayType *arrayObject, Oid columnType, int32 columnTypeMod) { - Datum splitPointDatum = PointerGetDatum(splitPointObject); + Datum arrayDatum = PointerGetDatum(arrayObject); Oid outputFunctionId = InvalidOid; bool typeVariableLength = false; @@ -4430,17 +4426,17 @@ SplitPointArrayString(ArrayType *splitPointObject, Oid columnType, int32 columnT getTypeOutputInfo(arrayOutType, &outputFunctionId, &typeVariableLength); fmgr_info(outputFunctionId, arrayOutFunction); - char *arrayOutputText = OutputFunctionCall(arrayOutFunction, splitPointDatum); + char *arrayOutputText = OutputFunctionCall(arrayOutFunction, arrayDatum); char *arrayOutputEscapedText = quote_literal_cstr(arrayOutputText); /* add an explicit cast to array's string representation */ char *arrayOutTypeName = format_type_with_typemod(arrayOutType, columnTypeMod); - StringInfo splitPointArrayString = makeStringInfo(); - appendStringInfo(splitPointArrayString, "%s::%s", + StringInfo arrayString = makeStringInfo(); + appendStringInfo(arrayString, "%s::%s", arrayOutputEscapedText, arrayOutTypeName); - return splitPointArrayString; + return arrayString; } diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c new file mode 100644 index 000000000..43a4abbd8 --- /dev/null +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -0,0 +1,91 @@ +/*------------------------------------------------------------------------- + * + * test/src/distributed_intermediate_results.c + * + * This file contains functions to test functions related to + * src/backend/distributed/executor/distributed_intermediate_results.c. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#include +#include + +#include "postgres.h" +#include "funcapi.h" +#include "libpq-fe.h" +#include "miscadmin.h" +#include "pgstat.h" + +#include "distributed/commands/multi_copy.h" +#include "distributed/connection_management.h" +#include "distributed/intermediate_results.h" +#include "distributed/multi_executor.h" +#include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" +#include "tcop/tcopprot.h" + +PG_FUNCTION_INFO_V1(partition_task_list_results); + +/* + * partition_task_list_results partitions results of each of distributed + * tasks for the given query with the ranges of the given relation. + * Partitioned results for a task are stored on the node that the task + * was targeted for. + */ +Datum +partition_task_list_results(PG_FUNCTION_ARGS) +{ + text *resultIdPrefixText = PG_GETARG_TEXT_P(0); + char *resultIdPrefix = text_to_cstring(resultIdPrefixText); + text *queryText = PG_GETARG_TEXT_P(1); + char *queryString = text_to_cstring(queryText); + Oid relationId = PG_GETARG_OID(2); + bool binaryFormat = PG_GETARG_BOOL(3); + + Query *parsedQuery = ParseQueryString(queryString, NULL, 0); + PlannedStmt *queryPlan = pg_plan_query(parsedQuery, + CURSOR_OPT_PARALLEL_OK, + NULL); + if (!IsCitusCustomScan(queryPlan->planTree)) + { + ereport(ERROR, (errmsg("query must be distributed and shouldn't require " + "any merging on the coordinator."))); + } + + CustomScan *customScan = (CustomScan *) queryPlan->planTree; + DistributedPlan *distributedPlan = GetDistributedPlan(customScan); + + Job *job = distributedPlan->workerJob; + List *taskList = job->taskList; + + DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId); + List *fragmentList = PartitionTasklistResults(resultIdPrefix, taskList, + distTableCacheEntry, binaryFormat); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + ListCell *fragmentCell = NULL; + + foreach(fragmentCell, fragmentList) + { + DistributedResultFragment *fragment = lfirst(fragmentCell); + + bool columnNulls[5] = { 0 }; + Datum columnValues[5] = { + CStringGetTextDatum(fragment->resultId), + Int32GetDatum(fragment->nodeId), + Int64GetDatum(fragment->rowCount), + Int64GetDatum(fragment->targetShardId), + Int32GetDatum(fragment->targetShardIndex) + }; + + tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); + } + + tuplestore_donestoring(tupleStore); + + PG_RETURN_DATUM(0); +} diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 1e5985d60..b637a37c6 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -22,6 +22,33 @@ #include "utils/palloc.h" +/* + * DistributedResultFragment represents a fragment of a distributed result. + */ +typedef struct DistributedResultFragment +{ + /* result's id, which can be used by read_intermediate_results(), ... */ + char *resultId; + + /* location of the result */ + int nodeId; + + /* number of rows in the result file */ + int rowCount; + + /* + * The fragment contains the rows which match the partitioning method + * and partitioning ranges of targetShardId. The shape of each row matches + * the schema of the relation to which targetShardId belongs to. + */ + uint64 targetShardId; + + /* what is the index of targetShardId in its relation's sorted shard list? */ + int targetShardIndex; +} DistributedResultFragment; + + +/* intermediate_results.c */ extern DestReceiver * CreateRemoteFileDestReceiver(char *resultId, EState *executorState, List *initialNodeList, bool writeLocalFile); @@ -32,5 +59,9 @@ extern int64 IntermediateResultSize(char *resultId); extern char * QueryResultFileName(const char *resultId); extern char * CreateIntermediateResultsDirectory(void); +/* distributed_intermediate_results.c */ +extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, + DistTableCacheEntry *distributionScheme, + bool binaryFormat); #endif /* INTERMEDIATE_RESULTS_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 344fbb315..4b193fabf 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -385,6 +385,8 @@ extern bool ShardIntervalsOverlap(ShardInterval *firstInterval, extern bool CoPartitionedTables(Oid firstRelationId, Oid secondRelationId); extern ShardInterval ** GenerateSyntheticShardIntervalArray(int partitionCount); extern RowModifyLevel RowModifyLevelForQuery(Query *query); +extern StringInfo ArrayObjectToString(ArrayType *arrayObject, + Oid columnType, int32 columnTypeMod); /* function declarations for Task and Task list operations */ diff --git a/src/test/regress/expected/distributed_intermediate_results.out b/src/test/regress/expected/distributed_intermediate_results.out new file mode 100644 index 000000000..cc7ad8af9 --- /dev/null +++ b/src/test/regress/expected/distributed_intermediate_results.out @@ -0,0 +1,159 @@ +-- Test functions for partitioning intermediate results +CREATE SCHEMA distributed_intermediate_results; +SET search_path TO 'distributed_intermediate_results'; +SET citus.next_shard_id TO 4213581; +-- +-- Helper UDFs +-- +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; +-- +-- We don't have extensive tests for partition_task_results, since it will be +-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. +-- +-- +-- partition_task_list_results, hash partitioning, binary format +-- +CREATE TABLE source_table(a int); +SET citus.shard_count TO 3; +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table(a int); +SET citus.shard_count TO 2; +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- should error out +SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table'); +ERROR: query must be distributed and shouldn't require any merging on the coordinator. +SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table'); +ERROR: query must be distributed and shouldn't require any merging on the coordinator. +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_4213581_to_0 | 57637 | 33 | 4213584 | 0 + test_from_4213582_to_0 | 57638 | 16 | 4213584 | 0 + test_from_4213582_to_1 | 57638 | 15 | 4213585 | 1 + test_from_4213583_to_1 | 57637 | 36 | 4213585 | 1 +(4 rows) + +-- fetch from workers +SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', nodeport) > 0 AS fetched + FROM distributed_result_info GROUP BY nodeport ORDER BY nodeport; + nodeport | fetched +--------------------------------------------------------------------- + 57637 | t + 57638 | t +(2 rows) + +-- read all fetched result files +SELECT count(*), sum(x) FROM + read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), + 'binary') AS res (x int); + count | sum +--------------------------------------------------------------------- + 100 | 5050 +(1 row) + +END; +DROP TABLE source_table, target_table, distributed_result_info; +-- +-- partition_task_list_results, range partitioning, text format +-- +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('source_table', + '{0,25,50,76}', + '{24,49,75,200}'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); +CREATE TABLE target_table(a int); +SELECT create_distributed_table('target_table', 'a', 'range'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('target_table', + '{0,25,50,76}', + '{24,49,75,200}'); +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, + 'target_table', false) + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; + resultid | nodeport | rowcount | targetshardid | targetshardindex +--------------------------------------------------------------------- + test_from_4213586_to_0 | 57638 | 7 | 4213590 | 0 + test_from_4213586_to_1 | 57638 | 6 | 4213591 | 1 + test_from_4213586_to_2 | 57638 | 7 | 4213592 | 2 + test_from_4213586_to_3 | 57638 | 4 | 4213593 | 3 + test_from_4213587_to_0 | 57637 | 7 | 4213590 | 0 + test_from_4213587_to_1 | 57637 | 6 | 4213591 | 1 + test_from_4213587_to_2 | 57637 | 8 | 4213592 | 2 + test_from_4213587_to_3 | 57637 | 4 | 4213593 | 3 + test_from_4213588_to_0 | 57638 | 8 | 4213590 | 0 + test_from_4213588_to_1 | 57638 | 6 | 4213591 | 1 + test_from_4213588_to_2 | 57638 | 8 | 4213592 | 2 + test_from_4213588_to_3 | 57638 | 4 | 4213593 | 3 + test_from_4213589_to_0 | 57637 | 8 | 4213590 | 0 + test_from_4213589_to_1 | 57637 | 6 | 4213591 | 1 + test_from_4213589_to_2 | 57637 | 7 | 4213592 | 2 + test_from_4213589_to_3 | 57637 | 4 | 4213593 | 3 +(16 rows) + +-- fetch from workers +SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', nodeport) > 0 AS fetched + FROM distributed_result_info GROUP BY nodeport ORDER BY nodeport; + nodeport | fetched +--------------------------------------------------------------------- + 57637 | t + 57638 | t +(2 rows) + +-- Read all fetched result files. Sum(x) should be 4550, verified by +-- racket -e '(for/sum ([i (range 1 101)]) (modulo (* 3 i i) 100))' +SELECT count(*), sum(x) FROM + read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), + 'text') AS res (x int); + count | sum +--------------------------------------------------------------------- + 100 | 4550 +(1 row) + +END; +DROP TABLE source_table, target_table, distributed_result_info; +SET client_min_messages TO WARNING; +DROP SCHEMA distributed_intermediate_results CASCADE; +\set VERBOSITY default +SET client_min_messages TO DEFAULT; +SET citus.shard_count TO DEFAULT; diff --git a/src/test/regress/expected/multi_test_helpers.out b/src/test/regress/expected/multi_test_helpers.out index 850f143a1..1b17445c8 100644 --- a/src/test/regress/expected/multi_test_helpers.out +++ b/src/test/regress/expected/multi_test_helpers.out @@ -119,3 +119,19 @@ WITH dist_node_summary AS ( SELECT dist_node_check.matches AND dist_placement_check.matches FROM dist_node_check CROSS JOIN dist_placement_check $$; +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/expected/partitioned_intermediate_results.out b/src/test/regress/expected/partitioned_intermediate_results.out index 12bfb4a9a..36d4fe47f 100644 --- a/src/test/regress/expected/partitioned_intermediate_results.out +++ b/src/test/regress/expected/partitioned_intermediate_results.out @@ -341,22 +341,6 @@ BEGIN RAISE NOTICE 'PASSED.'; END; $$ LANGUAGE plpgsql; --- --- Procedure for creating shards for range partitioned distributed table. --- -CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) -AS $$ -DECLARE - new_shardid bigint; - idx int; -BEGIN - FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) - LOOP - SELECT master_create_empty_shard(rel::text) INTO new_shardid; - UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; - END LOOP; -END; -$$ LANGUAGE plpgsql; \set VERBOSITY terse -- hash partitioning, 32 shards SET citus.shard_count TO 32; @@ -436,7 +420,7 @@ SELECT create_distributed_table('t', 'key', 'range'); (1 row) -CALL create_range_partitioned_shards('t', '{0,25,50,76}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... @@ -451,7 +435,7 @@ SELECT create_distributed_table('t', 'key', 'range'); (1 row) -CALL create_range_partitioned_shards('t', '{0,25,50,100}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,100}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); ERROR: could not find shard for partition column value @@ -464,7 +448,7 @@ SELECT create_distributed_table('t', 'key', 'range'); (1 row) -CALL create_range_partitioned_shards('t', '{0,25,50,76}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{50,49,90,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... @@ -481,7 +465,7 @@ SELECT create_distributed_table('t', 'key', 'range'); (1 row) -CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', +CALL public.create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', '{"(24,z)","(49,z)","(74,z)","(100,z)"}'); CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x'); NOTICE: Rows per partition match ... @@ -497,7 +481,7 @@ SELECT create_distributed_table('t', 'key', 'range'); (1 row) -CALL create_range_partitioned_shards('t', '{50,25,76,0}', +CALL public.create_range_partitioned_shards('t', '{50,25,76,0}', '{75,49,200,24}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); NOTICE: Rows per partition match ... diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 37f086b68..b813aec5a 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -70,7 +70,7 @@ test: subquery_prepared_statements pg12 # Miscellaneous tests to check our query planning behavior # ---------- test: multi_deparse_shard_query multi_distributed_transaction_id multi_real_time_transaction intermediate_results limit_intermediate_size -test: multi_explain hyperscale_tutorial partitioned_intermediate_results +test: multi_explain hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql test: sql_procedure multi_function_in_join row_types materialized_view diff --git a/src/test/regress/sql/distributed_intermediate_results.sql b/src/test/regress/sql/distributed_intermediate_results.sql new file mode 100644 index 000000000..4b5cecaa6 --- /dev/null +++ b/src/test/regress/sql/distributed_intermediate_results.sql @@ -0,0 +1,103 @@ +-- Test functions for partitioning intermediate results +CREATE SCHEMA distributed_intermediate_results; +SET search_path TO 'distributed_intermediate_results'; + +SET citus.next_shard_id TO 4213581; + +-- +-- Helper UDFs +-- + +-- partition_task_list_results tests the internal PartitionTasklistResults function +CREATE OR REPLACE FUNCTION pg_catalog.partition_task_list_results(resultIdPrefix text, + query text, + target_table regclass, + binaryFormat bool DEFAULT true) + RETURNS TABLE(resultId text, + nodeId int, + rowCount bigint, + targetShardId bigint, + targetShardIndex int) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$partition_task_list_results$$; + +-- +-- We don't have extensive tests for partition_task_results, since it will be +-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway. +-- + +-- +-- partition_task_list_results, hash partitioning, binary format +-- + +CREATE TABLE source_table(a int); +SET citus.shard_count TO 3; +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table(a int); +SET citus.shard_count TO 2; +SELECT create_distributed_table('target_table', 'a'); + +-- should error out +SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table'); +SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table'); + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +-- fetch from workers +SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', nodeport) > 0 AS fetched + FROM distributed_result_info GROUP BY nodeport ORDER BY nodeport; +-- read all fetched result files +SELECT count(*), sum(x) FROM + read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), + 'binary') AS res (x int); +END; + +DROP TABLE source_table, target_table, distributed_result_info; + +-- +-- partition_task_list_results, range partitioning, text format +-- +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('source_table', + '{0,25,50,76}', + '{24,49,75,200}'); +INSERT INTO source_table SELECT * FROM generate_series(1, 100); + +CREATE TABLE target_table(a int); +SELECT create_distributed_table('target_table', 'a', 'range'); +CALL public.create_range_partitioned_shards('target_table', + '{0,25,50,76}', + '{24,49,75,200}'); + +BEGIN; +CREATE TABLE distributed_result_info AS + SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex + FROM partition_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, + 'target_table', false) + NATURAL JOIN pg_dist_node; +SELECT * FROM distributed_result_info ORDER BY resultId; +-- fetch from workers +SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost', nodeport) > 0 AS fetched + FROM distributed_result_info GROUP BY nodeport ORDER BY nodeport; +-- Read all fetched result files. Sum(x) should be 4550, verified by +-- racket -e '(for/sum ([i (range 1 101)]) (modulo (* 3 i i) 100))' +SELECT count(*), sum(x) FROM + read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info), + 'text') AS res (x int); +END; + +DROP TABLE source_table, target_table, distributed_result_info; + +SET client_min_messages TO WARNING; +DROP SCHEMA distributed_intermediate_results CASCADE; + +\set VERBOSITY default +SET client_min_messages TO DEFAULT; +SET citus.shard_count TO DEFAULT; diff --git a/src/test/regress/sql/multi_test_helpers.sql b/src/test/regress/sql/multi_test_helpers.sql index c20cb7b85..3383c6714 100644 --- a/src/test/regress/sql/multi_test_helpers.sql +++ b/src/test/regress/sql/multi_test_helpers.sql @@ -124,3 +124,20 @@ WITH dist_node_summary AS ( SELECT dist_node_check.matches AND dist_placement_check.matches FROM dist_node_check CROSS JOIN dist_placement_check $$; + +-- +-- Procedure for creating shards for range partitioned distributed table. +-- +CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) +AS $$ +DECLARE + new_shardid bigint; + idx int; +BEGIN + FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) + LOOP + SELECT master_create_empty_shard(rel::text) INTO new_shardid; + UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; + END LOOP; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/partitioned_intermediate_results.sql b/src/test/regress/sql/partitioned_intermediate_results.sql index 8d4d93f13..f6d0ce272 100644 --- a/src/test/regress/sql/partitioned_intermediate_results.sql +++ b/src/test/regress/sql/partitioned_intermediate_results.sql @@ -262,23 +262,6 @@ BEGIN END; $$ LANGUAGE plpgsql; --- --- Procedure for creating shards for range partitioned distributed table. --- -CREATE OR REPLACE PROCEDURE create_range_partitioned_shards(rel regclass, minvalues text[], maxvalues text[]) -AS $$ -DECLARE - new_shardid bigint; - idx int; -BEGIN - FOR idx IN SELECT * FROM generate_series(1, array_length(minvalues, 1)) - LOOP - SELECT master_create_empty_shard(rel::text) INTO new_shardid; - UPDATE pg_dist_shard SET shardminvalue=minvalues[idx], shardmaxvalue=maxvalues[idx] WHERE shardid=new_shardid; - END LOOP; -END; -$$ LANGUAGE plpgsql; - \set VERBOSITY terse -- hash partitioning, 32 shards @@ -319,7 +302,7 @@ DROP TABLE t; -- range partitioning, int partition column CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); -CALL create_range_partitioned_shards('t', '{0,25,50,76}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); DROP TABLE t; @@ -327,7 +310,7 @@ DROP TABLE t; -- not covering ranges, should ERROR CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); -CALL create_range_partitioned_shards('t', '{0,25,50,100}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,100}', '{24,49,75,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); DROP TABLE t; @@ -335,7 +318,7 @@ DROP TABLE t; -- overlapping ranges, we allow this in range partitioned distributed tables, should be fine CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); -CALL create_range_partitioned_shards('t', '{0,25,50,76}', +CALL public.create_range_partitioned_shards('t', '{0,25,50,76}', '{50,49,90,200}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); DROP TABLE t; @@ -345,7 +328,7 @@ CREATE TYPE composite_key_type AS (f1 int, f2 text); SET citus.shard_count TO 8; CREATE TABLE t(key composite_key_type, value int); SELECT create_distributed_table('t', 'key', 'range'); -CALL create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', +CALL public.create_range_partitioned_shards('t', '{"(0,a)","(25,a)","(50,a)","(75,a)"}', '{"(24,z)","(49,z)","(74,z)","(100,z)"}'); CALL test_partition_query_results('t', 'SELECT (x, ''f2_'' || x::text)::composite_key_type, x * x * x FROM generate_series(1, 100) x'); DROP TABLE t; @@ -354,7 +337,7 @@ DROP TYPE composite_key_type; -- unsorted ranges CREATE TABLE t(key int, value int); SELECT create_distributed_table('t', 'key', 'range'); -CALL create_range_partitioned_shards('t', '{50,25,76,0}', +CALL public.create_range_partitioned_shards('t', '{50,25,76,0}', '{75,49,200,24}'); CALL test_partition_query_results('t', 'SELECT x, x * x * x FROM generate_series(1, 105) x'); DROP TABLE t;