From b4e5f4b10a24da49b2f66bac00de5a59a2672e9b Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Fri, 10 Jan 2020 15:42:18 -0800 Subject: [PATCH] Implement INSERT ... SELECT with repartitioning --- src/backend/distributed/commands/multi_copy.c | 25 ++ .../distributed_intermediate_results.c | 22 +- .../executor/insert_select_executor.c | 309 +++++++++++++++++- .../executor/intermediate_results.c | 2 +- .../distributed/planner/recursive_planning.c | 87 ++++- .../test/distributed_intermediate_results.c | 22 +- src/include/distributed/commands/multi_copy.h | 1 + .../distributed/intermediate_results.h | 2 + src/include/distributed/recursive_planning.h | 4 + .../expected/insert_select_repartition.out | 162 +++++++++ .../expected/intermediate_result_pruning.out | 6 +- .../regress/expected/multi_insert_select.out | 93 ++++-- .../expected/multi_insert_select_conflict.out | 18 +- ...lti_insert_select_non_pushable_queries.out | 2 - .../multi_mx_function_call_delegation.out | 2 +- .../expected/multi_reference_table.out | 4 +- src/test/regress/expected/with_dml.out | 3 +- src/test/regress/multi_schedule | 2 +- .../regress/sql/insert_select_repartition.sql | 77 +++++ 19 files changed, 764 insertions(+), 79 deletions(-) create mode 100644 src/test/regress/expected/insert_select_repartition.out create mode 100644 src/test/regress/sql/insert_select_repartition.sql diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d41c737a4..07004df60 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -88,6 +88,7 @@ #include "foreign/foreign.h" #include "libpq/pqformat.h" #include "nodes/makefuncs.h" +#include "nodes/nodeFuncs.h" #include "tsearch/ts_locale.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -832,6 +833,30 @@ CanUseBinaryCopyFormat(TupleDesc tupleDescription) } +/* + * CanUseBinaryCopyFormatForTargetList returns true if we can use binary + * copy format for all columns of the given target list. + */ +bool +CanUseBinaryCopyFormatForTargetList(List *targetEntryList) +{ + ListCell *targetEntryCell = NULL; + foreach(targetEntryCell, targetEntryList) + { + TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); + Node *targetExpr = (Node *) targetEntry->expr; + + Oid columnType = exprType(targetExpr); + if (!CanUseBinaryCopyFormatForType(columnType)) + { + return false; + } + } + + return true; +} + + /* * CanUseBinaryCopyFormatForType determines whether it is safe to use the * binary copy format for the given type. The binary copy format cannot diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index d03b7e4db..095995856 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -54,6 +54,7 @@ typedef struct NodeToNodeFragmentsTransfer /* forward declarations of local functions */ static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *targetRelation, bool binaryFormat); static List * ExecutePartitionTaskList(List *partitionTaskList, @@ -89,9 +90,13 @@ static void ExecuteFetchTaskList(List *fetchTaskList); * * returnValue[shardIndex] is list of cstrings each of which is a resultId which * correspond to targetRelation->sortedShardIntervalArray[shardIndex]. + * + * partitionColumnIndex determines the column in the selectTaskList to use for + * partitioning. */ List ** RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *targetRelation, bool binaryFormat) { @@ -104,6 +109,7 @@ RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, UseCoordinatedTransaction(); List *fragmentList = PartitionTasklistResults(resultIdPrefix, selectTaskList, + partitionColumnIndex, targetRelation, binaryFormat); return ColocateFragmentsWithRelation(fragmentList, targetRelation); } @@ -119,9 +125,13 @@ RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, * partition of results. Empty results are omitted. Therefore, if we have N tasks * and target relation has M shards, we will have NxM-(number of empty results) * fragments. + * + * partitionColumnIndex determines the column in the selectTaskList to use for + * partitioning. */ List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *targetRelation, bool binaryFormat) { @@ -141,7 +151,8 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, */ UseCoordinatedTransaction(); - WrapTasksForPartitioning(resultIdPrefix, selectTaskList, targetRelation, + WrapTasksForPartitioning(resultIdPrefix, selectTaskList, + partitionColumnIndex, targetRelation, binaryFormat); return ExecutePartitionTaskList(selectTaskList, targetRelation); } @@ -154,6 +165,7 @@ PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, */ static void WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *targetRelation, bool binaryFormat) { @@ -164,11 +176,13 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, ArrayType *minValueArray = NULL; ArrayType *maxValueArray = NULL; Var *partitionColumn = targetRelation->partitionColumn; - int partitionColumnIndex = partitionColumn->varoattno - 1; - Oid intervalTypeId = partitionColumn->vartype; - int32 intervalTypeMod = partitionColumn->vartypmod; + Oid intervalTypeId = InvalidOid; + int32 intervalTypeMod = 0; Oid intervalTypeOutFunc = InvalidOid; bool intervalTypeVarlena = false; + + GetIntervalTypeInfo(targetRelation->partitionMethod, partitionColumn, + &intervalTypeId, &intervalTypeMod); getTypeOutputInfo(intervalTypeId, &intervalTypeOutFunc, &intervalTypeVarlena); ShardMinMaxValueArrays(shardIntervalArray, shardCount, intervalTypeOutFunc, diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 93d8f3aad..0a5f33bf7 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -13,19 +13,24 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commands/multi_copy.h" +#include "distributed/adaptive_executor.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" +#include "distributed/intermediate_results.h" #include "distributed/local_executor.h" #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" -#include "distributed/adaptive_executor.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" +#include "distributed/shardinterval_utils.h" +#include "distributed/subplan_execution.h" #include "distributed/transaction_management.h" #include "executor/executor.h" #include "nodes/execnodes.h" @@ -63,6 +68,14 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); static void AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor); +static bool IsSupportedRedistributionTarget(Oid targetRelationId); +static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, + DistTableCacheEntry *targetRelation, + List **redistributedResults, + bool useBinaryFormat); +static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); +static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool + hasOnConflict); /* @@ -115,11 +128,9 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) Oid targetRelationId = insertRte->relid; char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix; bool hasReturning = distributedPlan->hasReturning; + bool hasOnConflict = insertSelectQuery->onConflict != NULL; HTAB *shardStateHash = NULL; - ereport(DEBUG1, (errmsg("Collecting INSERT ... SELECT results on coordinator"))); - - /* * INSERT .. SELECT via coordinator consists of two steps, a SELECT is * followd by a COPY. If the SELECT is executed locally, then the COPY @@ -157,8 +168,99 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (insertSelectQuery->onConflict || hasReturning) + if (IsRedistributablePlan(selectPlan->planTree, hasReturning, hasOnConflict) && + IsSupportedRedistributionTarget(targetRelationId)) { + ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); + + DistributedPlan *distSelectPlan = + GetDistributedPlan((CustomScan *) selectPlan->planTree); + Job *distSelectJob = distSelectPlan->workerJob; + List *distSelectTaskList = distSelectJob->taskList; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + bool randomAccess = true; + bool interTransactions = false; + bool binaryFormat = + CanUseBinaryCopyFormatForTargetList(selectQuery->targetList); + + ExecuteSubPlans(distSelectPlan); + + /* + * We have a separate directory for each transaction, so choosing + * the same result prefix won't cause filename conflicts. Results + * directory name also includes node id and database id, so we don't + * need to include them in the filename. Jobs are executed + * sequentially, so we also don't need to include job id here. + */ + char *distResultPrefix = "repartitioned_results"; + + DistTableCacheEntry *targetRelation = + DistributedTableCacheEntry(targetRelationId); + + int partitionColumnIndex = + PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn); + if (partitionColumnIndex == -1) + { + char *relationName = get_rel_name(targetRelationId); + Oid schemaOid = get_rel_namespace(targetRelationId); + char *schemaName = get_namespace_name(schemaOid); + + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg( + "the partition column of table %s should have a value", + quote_qualified_identifier(schemaName, + relationName)))); + } + + TargetEntry *selectPartitionTE = list_nth(selectQuery->targetList, + partitionColumnIndex); + const char *partitionColumnName = selectPartitionTE->resname ? + selectPartitionTE->resname : "(none)"; + + ereport(DEBUG2, (errmsg( + "partitioning SELECT query by column index %d with name %s", + partitionColumnIndex, quote_literal_cstr( + partitionColumnName)))); + + List **redistributedResults = RedistributeTaskListResults(distResultPrefix, + distSelectTaskList, + partitionColumnIndex, + targetRelation, + binaryFormat); + + /* + * At this point select query has been executed on workers and results + * have been fetched in such a way that they are colocated with corresponding + * target shard. Create and execute a list of tasks of form + * INSERT INTO ... SELECT * FROM read_intermediate_results(...); + */ + List *taskList = RedistributedInsertSelectTaskList(insertSelectQuery, + targetRelation, + redistributedResults, + binaryFormat); + + scanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + + TransactionProperties xactProperties = { + .errorOnAnyFailure = true, + .useRemoteTransactionBlocks = TRANSACTION_BLOCKS_REQUIRED, + .requires2PC = false + }; + int64 rowsInserted = ExecuteTaskListExtended(ROW_MODIFY_COMMUTATIVE, taskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning, + MaxAdaptiveExecutorPoolSize, + &xactProperties); + + executorState->es_processed = rowsInserted; + } + else if (insertSelectQuery->onConflict || hasReturning) + { + ereport(DEBUG1, (errmsg( + "Collecting INSERT ... SELECT results on coordinator"))); + /* * If we also have a workerJob that means there is a second step * to the INSERT...SELECT. This happens when there is a RETURNING @@ -222,6 +324,9 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) } else { + ereport(DEBUG1, (errmsg( + "Collecting INSERT ... SELECT results on coordinator"))); + ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan, executorState); } @@ -578,7 +683,9 @@ ExecutingInsertSelect(void) /* - * AddInsertSelectCasts + * AddInsertSelectCasts makes sure that the types in columns in targetList + * have the same type as given tuple descriptor by adding necessary type + * casts. */ static void AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor) @@ -605,3 +712,193 @@ AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor) } } } + + +/* + * IsSupportedRedistributionTarget determines whether re-partitioning into the + * given target relation is supported. + */ +static bool +IsSupportedRedistributionTarget(Oid targetRelationId) +{ + DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(targetRelationId); + + /* only range and hash-distributed tables are currently supported */ + if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH && + tableEntry->partitionMethod != DISTRIBUTE_BY_RANGE) + { + return false; + } + + return true; +} + + +/* + * RedistributedInsertSelectTaskList returns a task list to insert given + * redistributedResults into the given target relation. + * redistributedResults[shardIndex] is list of cstrings each of which is + * a result name which should be inserted into + * targetRelation->sortedShardIntervalArray[shardIndex]. + */ +static List * +RedistributedInsertSelectTaskList(Query *insertSelectQuery, + DistTableCacheEntry *targetRelation, + List **redistributedResults, + bool useBinaryFormat) +{ + List *taskList = NIL; + + /* + * Make a copy of the INSERT ... SELECT. We'll repeatedly replace the + * subquery of insertResultQuery for different intermediate results and + * then deparse it. + */ + Query *insertResultQuery = copyObject(insertSelectQuery); + RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery); + RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery); + List *selectTargetList = selectRte->subquery->targetList; + Oid targetRelationId = targetRelation->relationId; + + int shardCount = targetRelation->shardIntervalArrayLength; + int shardOffset = 0; + uint32 taskIdIndex = 1; + uint64 jobId = INVALID_JOB_ID; + + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); + + /* + * If the type of insert column and target table's column type is + * different from each other. Cast insert column't type to target + * table's column + */ + AddInsertSelectCasts(insertSelectQuery->targetList, destTupleDescriptor); + + for (shardOffset = 0; shardOffset < shardCount; shardOffset++) + { + ShardInterval *targetShardInterval = + targetRelation->sortedShardIntervalArray[shardOffset]; + List *resultIdList = redistributedResults[targetShardInterval->shardIndex]; + uint64 shardId = targetShardInterval->shardId; + StringInfo queryString = makeStringInfo(); + + /* skip empty tasks */ + if (resultIdList == NIL) + { + continue; + } + + /* sort result ids for consistent test output */ + List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp); + + /* generate the query on the intermediate result */ + Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList, + NIL, + sortedResultIds, + useBinaryFormat); + + /* put the intermediate result query in the INSERT..SELECT */ + selectRte->subquery = fragmentSetQuery; + + /* setting an alias simplifies deparsing of RETURNING */ + if (insertRte->alias == NULL) + { + Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL); + insertRte->alias = alias; + } + + /* + * Generate a query string for the query that inserts into a shard and reads + * from an intermediate result. + * + * Since CTEs have already been converted to intermediate results, they need + * to removed from the query. Otherwise, worker queries include both + * intermediate results and CTEs in the query. + */ + insertResultQuery->cteList = NIL; + deparse_shard_query(insertResultQuery, targetRelationId, shardId, queryString); + ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data))); + + LockShardDistributionMetadata(shardId, ShareLock); + List *insertShardPlacementList = FinalizedShardPlacementList(shardId); + + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->relationId = targetShardInterval->relationId; + relationShard->shardId = targetShardInterval->shardId; + + Task *modifyTask = CreateBasicTask(jobId, taskIdIndex, MODIFY_TASK, + queryString->data); + modifyTask->anchorShardId = shardId; + modifyTask->taskPlacementList = insertShardPlacementList; + modifyTask->relationShardList = list_make1(relationShard); + modifyTask->replicationModel = targetRelation->replicationModel; + + taskList = lappend(taskList, modifyTask); + + taskIdIndex++; + } + + heap_close(distributedRelation, NoLock); + + return taskList; +} + + +/* + * PartitionColumnIndex finds the index of given partition column in the + * given target list. + */ +static int +PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) +{ + TargetEntry *insertTargetEntry = NULL; + int targetEntryIndex = 0; + foreach_ptr(insertTargetEntry, insertTargetList) + { + if (insertTargetEntry->resno == partitionColumn->varattno) + { + return targetEntryIndex; + } + + targetEntryIndex++; + } + + return -1; +} + + +/* + * IsRedistributablePlan returns true if the given plan is a redistrituable plan. + */ +static bool +IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool hasOnConflict) +{ + if (hasReturning || hasOnConflict) + { + return false; + } + + /* don't redistribute if query is not distributed or requires merge on coordinator */ + if (!IsCitusCustomScan(selectPlan)) + { + return false; + } + + DistributedPlan *distSelectPlan = + GetDistributedPlan((CustomScan *) selectPlan); + Job *distSelectJob = distSelectPlan->workerJob; + List *distSelectTaskList = distSelectJob->taskList; + + /* + * Don't use redistribution if only one task. This is to keep the existing + * behaviour for CTEs that the last step is a read_intermediate_result() + * call. It doesn't hurt much in other case too. + */ + if (list_length(distSelectTaskList) <= 1) + { + return false; + } + + return true; +} diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 9366e9dfc..4b7197363 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -894,7 +894,7 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId) while (true) { - int waitFlags = WL_SOCKET_READABLE; + int waitFlags = WL_SOCKET_READABLE | WL_POSTMASTER_DEATH; CopyStatus copyStatus = CopyDataFromConnection(connection, &fileCompat, &totalBytesWritten); diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index a71953a42..18cd2f8f8 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -176,6 +176,10 @@ static bool ContainsReferencesToOuterQueryWalker(Node *node, static void WrapFunctionsInSubqueries(Query *query); static void TransformFunctionRTE(RangeTblEntry *rangeTblEntry); static bool ShouldTransformRTE(RangeTblEntry *rangeTableEntry); +static Query * BuildReadIntermediateResultsQuery(List *targetEntryList, + List *columnAliasList, + Const *resultIdConst, Oid functionOid, + bool useBinaryCopyFormat); /* * GenerateSubplansForSubqueriesAndCTEs is a wrapper around RecursivelyPlanSubqueriesAndCTEs. @@ -1541,6 +1545,72 @@ ShouldTransformRTE(RangeTblEntry *rangeTableEntry) */ Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId) +{ + Oid functionOid = CitusReadIntermediateResultFuncId(); + bool useBinaryCopyFormat = CanUseBinaryCopyFormatForTargetList(targetEntryList); + + Const *resultIdConst = makeNode(Const); + resultIdConst->consttype = TEXTOID; + resultIdConst->consttypmod = -1; + resultIdConst->constlen = -1; + resultIdConst->constvalue = CStringGetTextDatum(resultId); + resultIdConst->constbyval = false; + resultIdConst->constisnull = false; + resultIdConst->location = -1; + + return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList, + resultIdConst, functionOid, + useBinaryCopyFormat); +} + + +/* + * BuildSubPlanResultQuery returns a query of the form: + * + * SELECT + * + * FROM + * read_intermediate_results(ARRAY['', ...]::text[], ') + * AS res (); + * + * The caller can optionally supply a columnAliasList, which is useful for + * CTEs that have column aliases. + * + * If useBinaryCopyFormat is true, then 'binary' format is used. Otherwise, + * 'text' format is used. + */ +Query * +BuildReadIntermediateResultsArrayQuery(List *targetEntryList, + List *columnAliasList, + List *resultIdList, + bool useBinaryCopyFormat) +{ + Oid functionOid = CitusReadIntermediateResultArrayFuncId(); + + Const *resultIdConst = makeNode(Const); + resultIdConst->consttype = TEXTARRAYOID; + resultIdConst->consttypmod = -1; + resultIdConst->constlen = -1; + resultIdConst->constvalue = PointerGetDatum(strlist_to_textarray(resultIdList)); + resultIdConst->constbyval = false; + resultIdConst->constisnull = false; + resultIdConst->location = -1; + + return BuildReadIntermediateResultsQuery(targetEntryList, columnAliasList, + resultIdConst, functionOid, + useBinaryCopyFormat); +} + + +/* + * BuildReadIntermediateResultsQuery is the common code for generating + * queries to read from result files. It is used by + * BuildReadIntermediateResultsArrayQuery and BuildSubPlanResultQuery. + */ +static Query * +BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, + Const *resultIdConst, Oid functionOid, + bool useBinaryCopyFormat) { List *funcColNames = NIL; List *funcColTypes = NIL; @@ -1549,7 +1619,6 @@ BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resu ListCell *targetEntryCell = NULL; List *targetList = NIL; int columnNumber = 1; - bool useBinaryCopyFormat = true; Oid copyFormatId = BinaryCopyFormatId(); int columnAliasCount = list_length(columnAliasList); @@ -1608,23 +1677,9 @@ BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resu targetList = lappend(targetList, newTargetEntry); - if (useBinaryCopyFormat && !CanUseBinaryCopyFormatForType(columnType)) - { - useBinaryCopyFormat = false; - } - columnNumber++; } - Const *resultIdConst = makeNode(Const); - resultIdConst->consttype = TEXTOID; - resultIdConst->consttypmod = -1; - resultIdConst->constlen = -1; - resultIdConst->constvalue = CStringGetTextDatum(resultId); - resultIdConst->constbyval = false; - resultIdConst->constisnull = false; - resultIdConst->location = -1; - /* build the citus_copy_format parameter for the call to read_intermediate_result */ if (!useBinaryCopyFormat) { @@ -1642,7 +1697,7 @@ BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resu /* build the call to read_intermediate_result */ FuncExpr *funcExpr = makeNode(FuncExpr); - funcExpr->funcid = CitusReadIntermediateResultFuncId(); + funcExpr->funcid = functionOid; funcExpr->funcretset = true; funcExpr->funcvariadic = false; funcExpr->funcformat = 0; diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 474a350df..1c449f69c 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -63,9 +63,18 @@ partition_task_list_results(PG_FUNCTION_ARGS) Job *job = distributedPlan->workerJob; List *taskList = job->taskList; - DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId); + DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId); + + /* + * Here SELECT query's target list should match column list of target relation, + * so their partition column indexes are equal. + */ + int partitionColumnIndex = targetRelation->partitionMethod != DISTRIBUTE_BY_NONE ? + targetRelation->partitionColumn->varattno - 1 : 0; + List *fragmentList = PartitionTasklistResults(resultIdPrefix, taskList, - distTableCacheEntry, binaryFormat); + partitionColumnIndex, + targetRelation, binaryFormat); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); @@ -126,7 +135,16 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) List *taskList = job->taskList; DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId); + + /* + * Here SELECT query's target list should match column list of target relation, + * so their partition column indexes are equal. + */ + int partitionColumnIndex = targetRelation->partitionMethod != DISTRIBUTE_BY_NONE ? + targetRelation->partitionColumn->varattno - 1 : 0; + List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, + partitionColumnIndex, targetRelation, binaryFormat); TupleDesc tupleDescriptor = NULL; diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index a7a7708c5..e01cf9317 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -130,6 +130,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, char *intermediateResultPrefix); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); +extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); extern bool CanUseBinaryCopyFormatForType(Oid typeId); extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index 61e670fbc..028e25d54 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -62,9 +62,11 @@ extern char * CreateIntermediateResultsDirectory(void); /* distributed_intermediate_results.c */ extern List ** RedistributeTaskListResults(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *targetRelation, bool binaryFormat); extern List * PartitionTasklistResults(char *resultIdPrefix, List *selectTaskList, + int partitionColumnIndex, DistTableCacheEntry *distributionScheme, bool binaryFormat); diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 99081c66f..006ee1c08 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -28,6 +28,10 @@ extern List * GenerateSubplansForSubqueriesAndCTEs(uint64 planId, Query *origina extern char * GenerateResultId(uint64 planId, uint32 subPlanId); extern Query * BuildSubPlanResultQuery(List *targetEntryList, List *columnAliasList, char *resultId); +extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList, + List *columnAliasList, + List *resultIdList, + bool useBinaryCopyFormat); extern bool GeneratingSubplans(void); #endif /* RECURSIVE_PLANNING_H */ diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out new file mode 100644 index 000000000..68b889f1c --- /dev/null +++ b/src/test/regress/expected/insert_select_repartition.out @@ -0,0 +1,162 @@ +-- tests behaviour of INSERT INTO ... SELECT with repartitioning +CREATE SCHEMA insert_select_repartition; +SET search_path TO 'insert_select_repartition'; +SET citus.next_shard_id TO 4213581; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +-- Test 1 +-- 4 shards, hash distributed. +-- Negate distribution column value. +SET citus.shard_count TO 4; +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table SELECT * FROM generate_series(1, 10); +CREATE TABLE target_table(a int); +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT -a FROM source_table; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: Subquery contains an operator in the same position as the target table's partition column. +HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name '?column?' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_4213583_to_0,repartitioned_results_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213586 AS citus_table_alias (a) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_4213582_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_4213581_to_2,repartitioned_results_from_4213582_to_2,repartitioned_results_from_4213584_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213588 AS citus_table_alias (a) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_4213581_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +RESET client_min_messages; +SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; + a +--------------------------------------------------------------------- + -7 + -3 + -1 +(3 rows) + +DROP TABLE source_table, target_table; +-- +-- Test 2. +-- range partitioning, composite distribution column +CREATE TYPE composite_key_type AS (f1 int, f2 text); +-- source +CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type); +SELECT create_distributed_table('source_table', 'key', 'range'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +INSERT INTO source_table VALUES (0, (0, 'a'), 1, (0, 'a')); +INSERT INTO source_table VALUES (1, (1, 'b'), 2, (26, 'b')); +INSERT INTO source_table VALUES (2, (2, 'c'), 3, (3, 'c')); +INSERT INTO source_table VALUES (3, (4, 'd'), 4, (27, 'd')); +INSERT INTO source_table VALUES (4, (30, 'e'), 5, (30, 'e')); +INSERT INTO source_table VALUES (5, (31, 'f'), 6, (31, 'f')); +INSERT INTO source_table VALUES (6, (32, 'g'), 50, (8, 'g')); +-- target +CREATE TABLE target_table(f1 int DEFAULT 0, value int, key composite_key_type); +SELECT create_distributed_table('target_table', 'key', 'range'); +NOTICE: using statement-based replication +DETAIL: Streaming replication is supported only for hash-distributed tables. + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT f1, value, mapped_key FROM source_table; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 2 with name 'mapped_key' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, mapped_key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, mapped_key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, mapped_key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, mapped_key insert_select_repartition.composite_key_type) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY key; + f1 | value | key +--------------------------------------------------------------------- + 0 | 1 | (0,a) + 2 | 3 | (3,c) + 6 | 50 | (8,g) + 1 | 2 | (26,b) + 3 | 4 | (27,d) + 4 | 5 | (30,e) + 5 | 6 | (31,f) +(7 rows) + +SELECT * FROM target_table WHERE key = (26, 'b')::composite_key_type; + f1 | value | key +--------------------------------------------------------------------- + 1 | 2 | (26,b) +(1 row) + +-- with explicit column names +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +INSERT INTO target_table(value, key) SELECT value, mapped_key FROM source_table; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 2 with name 'mapped_key' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, value, key) SELECT f1, value, mapped_key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, mapped_key insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, value, key) SELECT f1, value, mapped_key FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, value integer, mapped_key insert_select_repartition.composite_key_type) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY key; + f1 | value | key +--------------------------------------------------------------------- + 0 | 1 | (0,a) + 0 | 3 | (3,c) + 0 | 50 | (8,g) + 0 | 2 | (26,b) + 0 | 4 | (27,d) + 0 | 5 | (30,e) + 0 | 6 | (31,f) +(7 rows) + +-- missing value for a column +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 1 with name 'key_renamed' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213591 AS citus_table_alias (f1, key) SELECT f1, key_renamed FROM read_intermediate_results('{repartitioned_results_from_4213589_to_0,repartitioned_results_from_4213590_to_0}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key_renamed insert_select_repartition.composite_key_type) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213592 AS citus_table_alias (f1, key) SELECT f1, key_renamed FROM read_intermediate_results('{repartitioned_results_from_4213589_to_1,repartitioned_results_from_4213590_to_1}'::text[], 'text'::citus_copy_format) intermediate_result(f1 integer, key_renamed insert_select_repartition.composite_key_type) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY key; + f1 | value | key +--------------------------------------------------------------------- + 0 | | (0,a) + 0 | | (3,c) + 0 | | (8,g) + 0 | | (26,b) + 0 | | (27,d) + 0 | | (30,e) + 0 | | (31,f) +(7 rows) + +-- missing value for distribution column +INSERT INTO target_table(value) SELECT value FROM source_table; +ERROR: the partition column of table insert_select_repartition.target_table should have a value +DROP TABLE source_table, target_table; +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/intermediate_result_pruning.out b/src/test/regress/expected/intermediate_result_pruning.out index 3da52fee2..fb20c941b 100644 --- a/src/test/regress/expected/intermediate_result_pruning.out +++ b/src/test/regress/expected/intermediate_result_pruning.out @@ -757,9 +757,9 @@ DEBUG: Collecting INSERT ... SELECT results on coordinator INSERT INTO table_1 SELECT * FROM table_2 where value IN (SELECT value FROM table_1 WHERE random() > 1) AND key = 1; DEBUG: volatile functions are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for subquery SELECT value FROM intermediate_result_pruning.table_1 WHERE (random() OPERATOR(pg_catalog.>) (1)::double precision) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text))) AND (key OPERATOR(pg_catalog.=) 1)) +DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx -- a similar query, with more complex subquery INSERT INTO table_1 @@ -781,7 +781,6 @@ INSERT INTO table_1 UNION SELECT * FROM cte_2); DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) @@ -789,6 +788,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_ DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM intermediate_result_pruning.table_2 WHERE ((key OPERATOR(pg_catalog.=) 1) AND ((value)::integer OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)))) +DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx @@ -818,7 +818,6 @@ INSERT INTO table_1 where table_2.key != 1 AND foo.key = table_2.value::int; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 1) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan XXX_1 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 1) DEBUG: generating subplan XXX_2 for subquery SELECT key FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.=) 2) @@ -826,6 +825,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_ DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 3) INTERSECT SELECT table_1.key FROM intermediate_result_pruning.table_1 WHERE (table_1.key OPERATOR(pg_catalog.=) 4) DEBUG: generating subplan XXX_3 for subquery SELECT cte_1.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_1 UNION SELECT cte_2.key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_2 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT table_2.key, table_2.value FROM intermediate_result_pruning.table_2, (SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) foo WHERE ((table_2.key OPERATOR(pg_catalog.<>) 1) AND (foo.key OPERATOR(pg_catalog.=) (table_2.value)::integer)) +DEBUG: performing repartitioned INSERT ... SELECT DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx DEBUG: Subplan XXX_2 will be sent to localhost:xxxxx diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 1f504a725..6b5c6b2aa 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -631,8 +631,8 @@ INSERT INTO agg_events (value_1_agg, user_id) FROM raw_events_first; DEBUG: DISTINCT ON (non-partition column) clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT user_id, value_1_agg FROM agg_events ORDER BY 1,2; DEBUG: Router planner cannot handle multi-shard select queries user_id | value_1_agg @@ -693,13 +693,13 @@ INSERT INTO agg_events FROM fist_table_agg; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for CTE fist_table_agg: SELECT (max(value_1) OPERATOR(pg_catalog.+) 1) AS v1_agg, user_id FROM public.raw_events_first GROUP BY user_id DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id, v1_agg FROM (SELECT fist_table_agg.v1_agg, fist_table_agg.user_id FROM (SELECT intermediate_result.v1_agg, intermediate_result.user_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v1_agg integer, user_id integer)) fist_table_agg) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator ROLLBACK; -- We don't support CTEs that are referenced in the target list INSERT INTO agg_events @@ -710,7 +710,6 @@ INSERT INTO agg_events raw_events_first; DEBUG: CTE sub_cte is going to be inlined via distributed planning DEBUG: Subqueries without relations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses HINT: Consider using an equality filter on the distributed table's partition column. @@ -724,8 +723,13 @@ FROM ((SELECT user_id FROM raw_events_first) UNION (SELECT user_id FROM raw_events_second)) as foo; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'user_id' +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300000 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300004_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300001 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300005_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300002 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300006_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_first_13300003 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300007_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id integer) ROLLBACK; -- We do support set operations through recursive planning BEGIN; @@ -735,7 +739,6 @@ INSERT INTO (SELECT user_id FROM raw_events_first) INTERSECT (SELECT user_id FROM raw_events_first); DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan XXX_1 for subquery SELECT user_id FROM public.raw_events_first @@ -747,6 +750,7 @@ DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.user_id DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator ROLLBACK; -- If the query is router plannable then it is executed via the coordinator INSERT INTO @@ -757,9 +761,9 @@ FROM ((SELECT user_id FROM raw_events_first WHERE user_id = 15) EXCEPT (SELECT user_id FROM raw_events_second where user_id = 17)) as foo; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator -- some supported LEFT joins INSERT INTO agg_events (user_id) SELECT @@ -1064,7 +1068,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, WHERE raw_events_first.user_id = raw_events_second.user_id GROUP BY raw_events_second.value_3) AS foo; DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] @@ -1083,6 +1086,7 @@ DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, v1, v4 FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 bigint, id double precision)) foo DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator ERROR: the partition column of table public.agg_events cannot be NULL -- error cases -- no part column at all @@ -1092,8 +1096,8 @@ SELECT value_1 FROM raw_events_first; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (value_1) @@ -1101,8 +1105,8 @@ SELECT user_id FROM raw_events_first; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: the query doesn't include the target table's partition column -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT ERROR: the partition column of table public.raw_events_second should have a value INSERT INTO raw_events_second (user_id) @@ -1110,9 +1114,11 @@ SELECT value_1 FROM raw_events_first; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries -ERROR: the partition column of table public.raw_events_second cannot be NULL +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'value_1' +ERROR: the partition column value cannot be NULL +CONTEXT: while executing command on localhost:xxxxx INSERT INTO raw_events_second (user_id) SELECT user_id * 2 @@ -1120,8 +1126,13 @@ FROM raw_events_first; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an operator in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name '?column?' +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0,repartitioned_results_from_13300001_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_13300000_to_1,repartitioned_results_from_13300001_to_1,repartitioned_results_from_13300003_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_13300001_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT "?column?" FROM read_intermediate_results('{repartitioned_results_from_13300000_to_3,repartitioned_results_from_13300002_to_3,repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result("?column?" integer) INSERT INTO raw_events_second (user_id) SELECT user_id :: bigint @@ -1129,8 +1140,13 @@ FROM raw_events_first; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit cast in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'user_id' +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1147,9 +1163,11 @@ GROUP BY user_id; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an aggregation in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries -ERROR: the partition column of table public.agg_events cannot be NULL +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'avg' +ERROR: the partition column value cannot be NULL +CONTEXT: while executing command on localhost:xxxxx INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1166,9 +1184,11 @@ GROUP BY user_id, value_2; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries -ERROR: the partition column of table public.agg_events cannot be NULL +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'value_2' +ERROR: the partition column value cannot be NULL +CONTEXT: while executing command on localhost:xxxxx -- tables should be co-located INSERT INTO agg_events (user_id) SELECT @@ -1177,10 +1197,10 @@ FROM reference_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator -- foo2 is recursively planned and INSERT...SELECT is done via coordinator INSERT INTO agg_events (user_id) @@ -1205,7 +1225,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] @@ -1223,6 +1242,8 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'id' -- the second part of the query is not routable since -- GROUP BY not on the partition column (i.e., value_1) and thus join -- on f.id = f2.id is not on the partition key (instead on the sum of partition key) @@ -1250,7 +1271,6 @@ FROM (SELECT SUM(raw_events_second.value_4) AS v4, HAVING SUM(raw_events_second.value_4) > 10) AS foo2 ) as f2 ON (f.id = f2.id); DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1] @@ -1268,6 +1288,8 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f.id FROM ((SELECT foo.id FROM (SELECT raw_events_first.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'id' -- cannot pushdown the query since the JOIN is not equi JOIN INSERT INTO agg_events (user_id, value_4_agg) @@ -1676,7 +1698,6 @@ SELECT user_id, FROM raw_events_second GROUP BY grouping sets ( ( user_id ), ( value_1 ), ( user_id, value_1 ), ( ) ); DEBUG: grouping sets are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries ERROR: could not run distributed query with GROUPING SETS, CUBE, or ROLLUP HINT: Consider using an equality filter on the distributed table's partition column. @@ -2091,61 +2112,71 @@ FROM text_table ; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a case expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'case' INSERT INTO text_table (part_col) SELECT COALESCE(part_col, 'onder') FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a coalesce expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'coalesce' INSERT INTO text_table (part_col) SELECT GREATEST(part_col, 'jason') FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'greatest' INSERT INTO text_table (part_col) SELECT LEAST(part_col, 'andres') FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains a min/max expression in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'least' INSERT INTO text_table (part_col) SELECT NULLIF(part_col, 'metin') FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'nullif' INSERT INTO text_table (part_col) SELECT part_col isnull FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name '?column?' INSERT INTO text_table (part_col) SELECT part_col::text from char_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'part_col' INSERT INTO text_table (part_col) SELECT (part_col = 'burak') is true FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name '?column?' INSERT INTO text_table (part_col) SELECT val FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'val' INSERT INTO text_table (part_col) SELECT val::text FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'val' RESET client_min_messages; insert into table_with_starts_with_defaults (b,c) select b,c FROM table_with_starts_with_defaults; -- Test on partition column without native hash function diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index f543ceb6f..b1d1803dd 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -93,10 +93,10 @@ FROM ( ) as foo ON CONFLICT DO NOTHING; DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Collecting INSERT ... SELECT results on coordinator -- Subquery should be recursively planned due to the limit and update on conflict -- Query is wrapped by CTE to return ordered result. WITH inserted_table AS ( @@ -115,10 +115,10 @@ WITH inserted_table AS ( DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 RETURNING target_table.col_1, target_table.col_2 DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 1 @@ -151,13 +151,13 @@ WITH inserted_table AS ( DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM ((SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1 LIMIT 5) UNION (SELECT source_table_2.col_1, source_table_2.col_2, source_table_2.col_3 FROM on_conflict.source_table_2 LIMIT 5)) foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0 RETURNING target_table.col_1, target_table.col_2 DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 0 @@ -180,18 +180,18 @@ WITH cte AS( ) INSERT INTO target_table SELECT * FROM cte ON CONFLICT DO NOTHING; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator -- Get the select part from cte and update on conflict WITH cte AS( SELECT col_1, col_2 FROM source_table_1 ) INSERT INTO target_table SELECT * FROM cte ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte.col_1, cte.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- @@ -247,10 +247,10 @@ WITH inserted_table AS ( DEBUG: generating subplan XXX_1 for CTE inserted_table: WITH cte AS (SELECT source_table_1.col_1, source_table_1.col_2, source_table_1.col_3 FROM on_conflict.source_table_1), cte_2 AS (SELECT cte.col_1, cte.col_2 FROM cte) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1 -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator col_1 | col_2 --------------------------------------------------------------------- 1 | 2 @@ -270,9 +270,9 @@ UPDATE target_table SET col_2 = 4 WHERE col_1 IN (SELECT col_1 FROM cte); DEBUG: generating subplan XXX_1 for CTE cte: WITH basic AS (SELECT source_table_1.col_1, source_table_1.col_2 FROM on_conflict.source_table_1) INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM basic ON CONFLICT DO NOTHING RETURNING target_table.col_1, target_table.col_2 DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: Plan XXX query after replacing subqueries and CTEs: UPDATE on_conflict.target_table SET col_2 = 4 WHERE (col_1 OPERATOR(pg_catalog.=) ANY (SELECT cte.col_1 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte)) -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE basic: SELECT col_1, col_2 FROM on_conflict.source_table_1 DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT basic.col_1, basic.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) basic) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator RESET client_min_messages; -- Following query is not supported since error checks of the subquery pushdown planner -- and insert select planner have not been unified. It should work after unifying them. @@ -481,13 +481,13 @@ FROM ( ) as foo ON CONFLICT(col_1) DO UPDATE SET col_2 = 0; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_1 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 LIMIT 5 DEBUG: push down of limit count: 5 DEBUG: generating subplan XXX_2 for subquery SELECT col_1, col_2, col_3 FROM on_conflict.source_table_2 LIMIT 5 DEBUG: generating subplan XXX_3 for subquery SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) UNION SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) foo +DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- @@ -512,10 +512,10 @@ WITH cte AS( ) INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE cte: SELECT col_1, col_2, col_3 FROM on_conflict.source_table_1 DEBUG: generating subplan XXX_2 for CTE cte_2: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2, intermediate_result.col_3 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer, col_3 integer)) cte DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT cte_2.col_1, cte_2.col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_2) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out index 07280ec91..d37928073 100644 --- a/src/test/regress/expected/multi_insert_select_non_pushable_queries.out +++ b/src/test/regress/expected/multi_insert_select_non_pushable_queries.out @@ -159,7 +159,6 @@ FROM ( GROUP BY t1.user_id, hasdone_event ) t GROUP BY user_id, hasdone_event; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for subquery SELECT u.user_id, 'step=>1'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[100, 101, 102]))) DEBUG: generating subplan XXX_2 for subquery SELECT u.user_id, 'step=>2'::text AS event, e."time" FROM public.users_table u, public.events_table e WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.>=) 10) AND (u.user_id OPERATOR(pg_catalog.<=) 25) AND (e.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[103, 104, 105]))) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) @@ -310,7 +309,6 @@ GROUP BY ORDER BY count_pay; DEBUG: Set operations are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for subquery SELECT users_table.user_id, 'action=>1'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 10) AND (events_table.event_type OPERATOR(pg_catalog.<) 12)) DEBUG: generating subplan XXX_2 for subquery SELECT users_table.user_id, 'action=>2'::text AS event, events_table."time" FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (users_table.user_id OPERATOR(pg_catalog.>=) 10) AND (users_table.user_id OPERATOR(pg_catalog.<=) 70) AND (events_table.event_type OPERATOR(pg_catalog.>) 12) AND (events_table.event_type OPERATOR(pg_catalog.<) 14)) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) UNION SELECT intermediate_result.user_id, intermediate_result.event, intermediate_result."time" FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event text, "time" timestamp without time zone) diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 23e6f5e33..935a01770 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -438,8 +438,8 @@ SELECT create_distributed_function('delegated_function(int)', 'a'); SET client_min_messages TO DEBUG1; INSERT INTO test SELECT delegated_function(1); DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: not pushing down function calls in INSERT ... SELECT +DEBUG: Collecting INSERT ... SELECT results on coordinator -- Don't push down in subqueries or CTEs. SELECT * FROM test WHERE not exists( SELECT delegated_function(4) diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 4a4f79b7f..8b15a8ff5 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -1170,7 +1170,7 @@ WHERE colocated_table_test_2.value_4 = reference_table_test.value_4; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: performing repartitioned INSERT ... SELECT INSERT INTO colocated_table_test (value_1, value_2) SELECT @@ -1181,7 +1181,7 @@ WHERE colocated_table_test_2.value_4 = reference_table_test.value_4; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: performing repartitioned INSERT ... SELECT RESET client_min_messages; -- some tests for mark_tables_colocated -- should error out diff --git a/src/test/regress/expected/with_dml.out b/src/test/regress/expected/with_dml.out index 5936d623f..2367e602a 100644 --- a/src/test/regress/expected/with_dml.out +++ b/src/test/regress/expected/with_dml.out @@ -101,6 +101,7 @@ DEBUG: CTE ids_to_insert is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT (((tenant_id)::integer OPERATOR(pg_catalog.*) 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.>) 7) DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) ids_to_insert.tenant_id) DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator -- not a very meaningful query -- but has two modifying CTEs along with another -- modify statement @@ -132,11 +133,11 @@ INSERT INTO second_distributed_table SELECT * FROM copy_to_other_table; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: generating subplan XXX_1 for CTE copy_to_other_table: INSERT INTO with_dml.distributed_table (tenant_id, dept) SELECT tenant_id, dept FROM with_dml.second_distributed_table WHERE (dept OPERATOR(pg_catalog.=) 3) ON CONFLICT(tenant_id) DO UPDATE SET dept = 4 RETURNING distributed_table.tenant_id, distributed_table.dept DEBUG: generating subplan XXX_2 for CTE main_table_deleted: DELETE FROM with_dml.distributed_table WHERE ((dept OPERATOR(pg_catalog.<) 10) AND (NOT (EXISTS (SELECT 1 FROM with_dml.second_distributed_table WHERE ((second_distributed_table.dept OPERATOR(pg_catalog.=) 1) AND (second_distributed_table.tenant_id OPERATOR(pg_catalog.=) distributed_table.tenant_id)))))) RETURNING tenant_id, dept DEBUG: generating subplan XXX_3 for subquery SELECT main_table_deleted.tenant_id, main_table_deleted.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) main_table_deleted EXCEPT SELECT copy_to_other_table.tenant_id, copy_to_other_table.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) copy_to_other_table DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT tenant_id, dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator SET citus.force_max_query_parallelization TO off; -- CTE inside the UPDATE statement UPDATE diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 6990f6d27..4fc14f907 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -42,7 +42,7 @@ test: multi_read_from_secondaries test: multi_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select -test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors +test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors insert_select_repartition test: multi_insert_select_conflict test: multi_row_insert diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql new file mode 100644 index 000000000..be0686ef7 --- /dev/null +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -0,0 +1,77 @@ +-- tests behaviour of INSERT INTO ... SELECT with repartitioning +CREATE SCHEMA insert_select_repartition; +SET search_path TO 'insert_select_repartition'; + +SET citus.next_shard_id TO 4213581; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + +-- Test 1 +-- 4 shards, hash distributed. +-- Negate distribution column value. +SET citus.shard_count TO 4; +CREATE TABLE source_table(a int); +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table SELECT * FROM generate_series(1, 10); +CREATE TABLE target_table(a int); +SELECT create_distributed_table('target_table', 'a'); + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT -a FROM source_table; +RESET client_min_messages; + +SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; + +DROP TABLE source_table, target_table; + +-- +-- Test 2. +-- range partitioning, composite distribution column +CREATE TYPE composite_key_type AS (f1 int, f2 text); + +-- source +CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type); +SELECT create_distributed_table('source_table', 'key', 'range'); +CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); + +INSERT INTO source_table VALUES (0, (0, 'a'), 1, (0, 'a')); +INSERT INTO source_table VALUES (1, (1, 'b'), 2, (26, 'b')); +INSERT INTO source_table VALUES (2, (2, 'c'), 3, (3, 'c')); +INSERT INTO source_table VALUES (3, (4, 'd'), 4, (27, 'd')); +INSERT INTO source_table VALUES (4, (30, 'e'), 5, (30, 'e')); +INSERT INTO source_table VALUES (5, (31, 'f'), 6, (31, 'f')); +INSERT INTO source_table VALUES (6, (32, 'g'), 50, (8, 'g')); + +-- target +CREATE TABLE target_table(f1 int DEFAULT 0, value int, key composite_key_type); +SELECT create_distributed_table('target_table', 'key', 'range'); +CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}'); + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT f1, value, mapped_key FROM source_table; +RESET client_min_messages; + +SELECT * FROM target_table ORDER BY key; +SELECT * FROM target_table WHERE key = (26, 'b')::composite_key_type; + +-- with explicit column names +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +INSERT INTO target_table(value, key) SELECT value, mapped_key FROM source_table; +RESET client_min_messages; +SELECT * FROM target_table ORDER BY key; + +-- missing value for a column +TRUNCATE target_table; +SET client_min_messages TO DEBUG2; +INSERT INTO target_table(key) SELECT mapped_key AS key_renamed FROM source_table; +RESET client_min_messages; +SELECT * FROM target_table ORDER BY key; + +-- missing value for distribution column +INSERT INTO target_table(value) SELECT value FROM source_table; + +DROP TABLE source_table, target_table; + +SET client_min_messages TO WARNING; +DROP SCHEMA insert_select_repartition CASCADE;