mirror of https://github.com/citusdata/citus.git
Repartitioned INSERT/SELECT: cast columns in SELECT targets
parent
d67a384350
commit
89463f9760
|
@ -67,15 +67,17 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
||||||
List *insertTargetList);
|
List *insertTargetList);
|
||||||
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
||||||
static void AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor);
|
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
|
Oid targetRelationId);
|
||||||
static bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
static bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||||
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
List **redistributedResults,
|
List **redistributedResults,
|
||||||
bool useBinaryFormat);
|
bool useBinaryFormat);
|
||||||
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||||
static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool
|
static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning);
|
||||||
hasOnConflict);
|
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
|
int targetTypeMod);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -128,7 +130,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
|
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
|
||||||
bool hasReturning = distributedPlan->hasReturning;
|
bool hasReturning = distributedPlan->hasReturning;
|
||||||
bool hasOnConflict = insertSelectQuery->onConflict != NULL;
|
|
||||||
HTAB *shardStateHash = NULL;
|
HTAB *shardStateHash = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -146,6 +147,16 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
selectRte->subquery = selectQuery;
|
selectRte->subquery = selectQuery;
|
||||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the type of insert column and target table's column type is
|
||||||
|
* different from each other. Cast insert column't type to target
|
||||||
|
* table's column
|
||||||
|
*/
|
||||||
|
selectQuery->targetList =
|
||||||
|
AddInsertSelectCasts(insertSelectQuery->targetList,
|
||||||
|
selectQuery->targetList,
|
||||||
|
targetRelationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make a copy of the query, since pg_plan_query may scribble on it and we
|
* Make a copy of the query, since pg_plan_query may scribble on it and we
|
||||||
* want it to be replanned every time if it is stored in a prepared
|
* want it to be replanned every time if it is stored in a prepared
|
||||||
|
@ -168,7 +179,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsRedistributablePlan(selectPlan->planTree, hasReturning, hasOnConflict) &&
|
if (IsRedistributablePlan(selectPlan->planTree, hasReturning) &&
|
||||||
IsSupportedRedistributionTarget(targetRelationId))
|
IsSupportedRedistributionTarget(targetRelationId))
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
||||||
|
@ -460,16 +471,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||||
uint32 taskIdIndex = 1;
|
uint32 taskIdIndex = 1;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
|
|
||||||
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
|
||||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the type of insert column and target table's column type is
|
|
||||||
* different from each other. Cast insert column't type to target
|
|
||||||
* table's column
|
|
||||||
*/
|
|
||||||
AddInsertSelectCasts(insertSelectQuery->targetList, destTupleDescriptor);
|
|
||||||
|
|
||||||
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
for (int shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
||||||
{
|
{
|
||||||
ShardInterval *targetShardInterval =
|
ShardInterval *targetShardInterval =
|
||||||
|
@ -529,8 +530,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||||
taskIdIndex++;
|
taskIdIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(distributedRelation, NoLock);
|
|
||||||
|
|
||||||
return taskList;
|
return taskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -683,33 +682,182 @@ ExecutingInsertSelect(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AddInsertSelectCasts makes sure that the types in columns in targetList
|
* AddInsertSelectCasts makes sure that the types in columns in the given
|
||||||
* have the same type as given tuple descriptor by adding necessary type
|
* target lists have the same type as the columns of the given relation.
|
||||||
* casts.
|
* It might add casts to ensure that.
|
||||||
|
*
|
||||||
|
* It returns the updated selectTargetList.
|
||||||
*/
|
*/
|
||||||
static void
|
static List *
|
||||||
AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor)
|
AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
|
Oid targetRelationId)
|
||||||
{
|
{
|
||||||
ListCell *targetEntryCell = NULL;
|
ListCell *insertEntryCell = NULL;
|
||||||
|
ListCell *selectEntryCell = NULL;
|
||||||
|
List *projectedEntries = NIL;
|
||||||
|
List *nonProjectedEntries = NIL;
|
||||||
|
|
||||||
foreach(targetEntryCell, targetList)
|
/*
|
||||||
|
* ReorderInsertSelectTargetLists() makes sure that first few columns of
|
||||||
|
* the SELECT query match the insert targets. It might contain additional
|
||||||
|
* items for GROUP BY, etc.
|
||||||
|
*/
|
||||||
|
Assert(list_length(insertTargetList) <= list_length(selectTargetList));
|
||||||
|
|
||||||
|
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
||||||
|
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
|
|
||||||
|
int targetEntryIndex = 0;
|
||||||
|
forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList)
|
||||||
{
|
{
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell);
|
||||||
Var *insertColumn = (Var *) targetEntry->expr;
|
TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell);
|
||||||
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, targetEntry->resno -
|
Var *insertColumn = (Var *) insertEntry->expr;
|
||||||
1);
|
Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor,
|
||||||
|
insertEntry->resno - 1);
|
||||||
|
|
||||||
if (insertColumn->vartype != attr->atttypid)
|
Oid sourceType = insertColumn->vartype;
|
||||||
|
Oid targetType = attr->atttypid;
|
||||||
|
if (sourceType != targetType)
|
||||||
|
{
|
||||||
|
insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType,
|
||||||
|
attr->attcollation, attr->atttypmod);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We cannot modify the selectEntry in-place, because ORDER BY or
|
||||||
|
* GROUP BY clauses might be pointing to it with comparison types
|
||||||
|
* of the source type. So instead we keep the original one as a
|
||||||
|
* non-projected entry, so GROUP BY and ORDER BY are happy, and
|
||||||
|
* create a duplicated projected entry with the coerced expression.
|
||||||
|
*/
|
||||||
|
TargetEntry *coercedEntry = copyObject(selectEntry);
|
||||||
|
coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType,
|
||||||
|
targetType, attr->attcollation,
|
||||||
|
attr->atttypmod);
|
||||||
|
coercedEntry->ressortgroupref = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The only requirement is that users don't use this name in ORDER BY
|
||||||
|
* or GROUP BY, and it should be unique across the same query.
|
||||||
|
*/
|
||||||
|
StringInfo resnameString = makeStringInfo();
|
||||||
|
appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex);
|
||||||
|
coercedEntry->resname = resnameString->data;
|
||||||
|
|
||||||
|
projectedEntries = lappend(projectedEntries, coercedEntry);
|
||||||
|
|
||||||
|
if (selectEntry->ressortgroupref != 0)
|
||||||
|
{
|
||||||
|
selectEntry->resjunk = true;
|
||||||
|
nonProjectedEntries = lappend(nonProjectedEntries, selectEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
projectedEntries = lappend(projectedEntries, selectEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
targetEntryIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int entryIndex = list_length(insertTargetList);
|
||||||
|
entryIndex < list_length(selectTargetList);
|
||||||
|
entryIndex++)
|
||||||
|
{
|
||||||
|
nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList,
|
||||||
|
entryIndex));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* selectEntry->resno must be the ordinal number of the entry */
|
||||||
|
selectTargetList = list_concat(projectedEntries, nonProjectedEntries);
|
||||||
|
int entryResNo = 1;
|
||||||
|
foreach(selectEntryCell, selectTargetList)
|
||||||
|
{
|
||||||
|
TargetEntry *selectEntry = lfirst(selectEntryCell);
|
||||||
|
selectEntry->resno = entryResNo++;
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(distributedRelation, NoLock);
|
||||||
|
|
||||||
|
return selectTargetList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CastExpr returns an expression which casts the given expr from sourceType to
|
||||||
|
* the given targetType.
|
||||||
|
*/
|
||||||
|
static Expr *
|
||||||
|
CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
|
int targetTypeMod)
|
||||||
|
{
|
||||||
|
Oid coercionFuncId = InvalidOid;
|
||||||
|
CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType,
|
||||||
|
COERCION_EXPLICIT,
|
||||||
|
&coercionFuncId);
|
||||||
|
|
||||||
|
if (coercionType == COERCION_PATH_FUNC)
|
||||||
|
{
|
||||||
|
FuncExpr *coerceExpr = makeNode(FuncExpr);
|
||||||
|
coerceExpr->funcid = coercionFuncId;
|
||||||
|
coerceExpr->args = list_make1(copyObject(expr));
|
||||||
|
coerceExpr->funccollid = targetCollation;
|
||||||
|
coerceExpr->funcresulttype = targetType;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_RELABELTYPE)
|
||||||
|
{
|
||||||
|
RelabelType *coerceExpr = makeNode(RelabelType);
|
||||||
|
coerceExpr->arg = copyObject(expr);
|
||||||
|
coerceExpr->resulttype = targetType;
|
||||||
|
coerceExpr->resulttypmod = targetTypeMod;
|
||||||
|
coerceExpr->resultcollid = targetCollation;
|
||||||
|
coerceExpr->relabelformat = COERCE_IMPLICIT_CAST;
|
||||||
|
coerceExpr->location = -1;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_ARRAYCOERCE)
|
||||||
|
{
|
||||||
|
Oid sourceBaseType = get_base_element_type(sourceType);
|
||||||
|
Oid targetBaseType = get_base_element_type(targetType);
|
||||||
|
|
||||||
|
CaseTestExpr *elemExpr = makeNode(CaseTestExpr);
|
||||||
|
elemExpr->collation = targetCollation;
|
||||||
|
elemExpr->typeId = sourceBaseType;
|
||||||
|
elemExpr->typeMod = -1;
|
||||||
|
|
||||||
|
Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType,
|
||||||
|
targetBaseType, targetCollation,
|
||||||
|
targetTypeMod);
|
||||||
|
|
||||||
|
ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr);
|
||||||
|
coerceExpr->arg = copyObject(expr);
|
||||||
|
coerceExpr->elemexpr = elemCastExpr;
|
||||||
|
coerceExpr->resultcollid = targetCollation;
|
||||||
|
coerceExpr->resulttype = targetType;
|
||||||
|
coerceExpr->resulttypmod = targetTypeMod;
|
||||||
|
coerceExpr->location = -1;
|
||||||
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||||
|
|
||||||
|
return (Expr *) coerceExpr;
|
||||||
|
}
|
||||||
|
else if (coercionType == COERCION_PATH_COERCEVIAIO)
|
||||||
{
|
{
|
||||||
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
CoerceViaIO *coerceExpr = makeNode(CoerceViaIO);
|
||||||
coerceExpr->arg = (Expr *) copyObject(insertColumn);
|
coerceExpr->arg = (Expr *) copyObject(expr);
|
||||||
coerceExpr->resulttype = attr->atttypid;
|
coerceExpr->resulttype = targetType;
|
||||||
coerceExpr->resultcollid = attr->attcollation;
|
coerceExpr->resultcollid = targetCollation;
|
||||||
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
coerceExpr->coerceformat = COERCE_IMPLICIT_CAST;
|
||||||
coerceExpr->location = -1;
|
coerceExpr->location = -1;
|
||||||
|
|
||||||
targetEntry->expr = (Expr *) coerceExpr;
|
return (Expr *) coerceExpr;
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d",
|
||||||
|
sourceType, targetType)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -765,16 +913,6 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
uint32 taskIdIndex = 1;
|
uint32 taskIdIndex = 1;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
|
|
||||||
Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock);
|
|
||||||
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the type of insert column and target table's column type is
|
|
||||||
* different from each other. Cast insert column't type to target
|
|
||||||
* table's column
|
|
||||||
*/
|
|
||||||
AddInsertSelectCasts(insertSelectQuery->targetList, destTupleDescriptor);
|
|
||||||
|
|
||||||
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
|
||||||
{
|
{
|
||||||
ShardInterval *targetShardInterval =
|
ShardInterval *targetShardInterval =
|
||||||
|
@ -839,8 +977,6 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
taskIdIndex++;
|
taskIdIndex++;
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(distributedRelation, NoLock);
|
|
||||||
|
|
||||||
return taskList;
|
return taskList;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,7 +1008,7 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
||||||
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool hasOnConflict)
|
IsRedistributablePlan(Plan *selectPlan, bool hasReturning)
|
||||||
{
|
{
|
||||||
if (hasReturning)
|
if (hasReturning)
|
||||||
{
|
{
|
||||||
|
|
|
@ -4,7 +4,6 @@ SET search_path TO 'insert_select_repartition';
|
||||||
SET citus.next_shard_id TO 4213581;
|
SET citus.next_shard_id TO 4213581;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.replication_model TO 'streaming';
|
SET citus.replication_model TO 'streaming';
|
||||||
-- Test 1
|
|
||||||
-- 4 shards, hash distributed.
|
-- 4 shards, hash distributed.
|
||||||
-- Negate distribution column value.
|
-- Negate distribution column value.
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
|
@ -46,8 +45,8 @@ SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
--
|
--
|
||||||
-- Test 2.
|
|
||||||
-- range partitioning, composite distribution column
|
-- range partitioning, composite distribution column
|
||||||
|
--
|
||||||
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
||||||
-- source
|
-- source
|
||||||
CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type);
|
CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type);
|
||||||
|
@ -183,6 +182,92 @@ SELECT * FROM target_table ORDER BY key;
|
||||||
-- missing value for distribution column
|
-- missing value for distribution column
|
||||||
INSERT INTO target_table(value) SELECT value FROM source_table;
|
INSERT INTO target_table(value) SELECT value FROM source_table;
|
||||||
ERROR: the partition column of table insert_select_repartition.target_table should have a value
|
ERROR: the partition column of table insert_select_repartition.target_table should have a value
|
||||||
|
DROP TABLE source_table, target_table;
|
||||||
|
-- different column types
|
||||||
|
-- verifies that we add necessary casts, otherwise even shard routing won't
|
||||||
|
-- work correctly and we will see 2 values for the same primary key.
|
||||||
|
CREATE TABLE target_table(col_1 int primary key, col_2 int);
|
||||||
|
SELECT create_distributed_table('target_table','col_1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6);
|
||||||
|
CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric);
|
||||||
|
SELECT create_distributed_table('source_table','col_1');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5);
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table
|
||||||
|
SELECT
|
||||||
|
col_1, col_2
|
||||||
|
FROM
|
||||||
|
source_table
|
||||||
|
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
|
||||||
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
|
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213597_to_0,repartitioned_results_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY 1;
|
||||||
|
col_1 | col_2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
2 | 3
|
||||||
|
3 | 3
|
||||||
|
4 | 5
|
||||||
|
5 | 5
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table;
|
||||||
|
--
|
||||||
|
-- array coercion
|
||||||
|
--
|
||||||
|
SET citus.shard_count TO 3;
|
||||||
|
CREATE TABLE source_table(a int, mapped_key int, c float[]);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]),
|
||||||
|
(3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]);
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
CREATE TABLE target_table(a int, b int[]);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
||||||
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
|
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key'
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[])
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[])
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-4 | {3}
|
||||||
|
-3 | {}
|
||||||
|
-2 | {4,6}
|
||||||
|
-1 | {1,2,3}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -1083,7 +1083,7 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-10
|
||||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
|
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
|
||||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
|
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.value_3 AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_3
|
DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.value_3 AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_3
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, v1, v4 FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 bigint, id double precision)) foo
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(id) AS auto_coerced_by_citus_0, int4(v1) AS auto_coerced_by_citus_1, int8(v4) AS auto_coerced_by_citus_2 FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 bigint, id double precision)) foo
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
@ -1142,11 +1142,11 @@ DETAIL: Subquery contains an explicit cast in the same position as the target t
|
||||||
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'user_id'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint)
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer)
|
||||||
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint)
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer)
|
||||||
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint)
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer)
|
||||||
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint)
|
DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer)
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
(value_3_agg,
|
(value_3_agg,
|
||||||
value_4_agg,
|
value_4_agg,
|
||||||
|
@ -1165,7 +1165,7 @@ DETAIL: Subquery contains an aggregation in the same position as the target tab
|
||||||
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'avg'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
ERROR: the partition column value cannot be NULL
|
ERROR: the partition column value cannot be NULL
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
INSERT INTO agg_events
|
INSERT INTO agg_events
|
||||||
|
@ -1240,10 +1240,10 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-10
|
||||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
|
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
|
||||||
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
|
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)
|
DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric)
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(f2.id) AS auto_coerced_by_citus_0 FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id)))
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'id'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
-- the second part of the query is not routable since
|
-- the second part of the query is not routable since
|
||||||
-- GROUP BY not on the partition column (i.e., value_1) and thus join
|
-- GROUP BY not on the partition column (i.e., value_1) and thus join
|
||||||
-- on f.id = f2.id is not on the partition key (instead on the sum of partition key)
|
-- on f.id = f2.id is not on the partition key (instead on the sum of partition key)
|
||||||
|
@ -2149,7 +2149,7 @@ DETAIL: Subquery contains an expression that is not a simple column reference i
|
||||||
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name '?column?'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
INSERT INTO text_table (part_col) SELECT part_col::text from char_table;
|
INSERT INTO text_table (part_col) SELECT part_col::text from char_table;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
|
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
|
||||||
|
@ -2163,13 +2163,13 @@ DETAIL: Subquery contains an expression that is not a simple column reference i
|
||||||
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name '?column?'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
INSERT INTO text_table (part_col) SELECT val FROM text_table;
|
INSERT INTO text_table (part_col) SELECT val FROM text_table;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
|
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
DEBUG: partitioning SELECT query by column index 0 with name 'val'
|
DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0'
|
||||||
INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
|
INSERT INTO text_table (part_col) SELECT val::text FROM text_table;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
|
DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column.
|
||||||
|
|
|
@ -54,7 +54,7 @@ FROM
|
||||||
ON CONFLICT DO NOTHING;
|
ON CONFLICT DO NOTHING;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
-- Since partition columns do not match, pull the data to the coordinator
|
-- Since partition columns do not match, pull the data to the coordinator
|
||||||
-- and update the non-partition column. Query is wrapped by CTE to return
|
-- and update the non-partition column. Query is wrapped by CTE to return
|
||||||
-- ordered result.
|
-- ordered result.
|
||||||
|
@ -216,9 +216,9 @@ WITH cte AS(
|
||||||
)
|
)
|
||||||
INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
||||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: CTE cte is going to be inlined via distributed planning
|
DEBUG: CTE cte is going to be inlined via distributed planning
|
||||||
DEBUG: CTE cte_2 is going to be inlined via distributed planning
|
DEBUG: CTE cte_2 is going to be inlined via distributed planning
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
SELECT * FROM target_table ORDER BY 1;
|
SELECT * FROM target_table ORDER BY 1;
|
||||||
col_1 | col_2
|
col_1 | col_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -395,7 +395,7 @@ FROM
|
||||||
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
|
ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2;
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
|
DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery.
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
SELECT * FROM target_table ORDER BY 1;
|
SELECT * FROM target_table ORDER BY 1;
|
||||||
col_1 | col_2
|
col_1 | col_2
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -1926,40 +1926,40 @@ INSERT INTO articles_hash
|
||||||
SELECT * FROM articles_hash WHERE author_id = $2 AND word_count = $1 OFFSET 0;
|
SELECT * FROM articles_hash WHERE author_id = $2 AND word_count = $1 OFFSET 0;
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
EXECUTE insert_sel(1,1);
|
EXECUTE insert_sel(1,1);
|
||||||
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: Deferred pruning for a fast-path router query
|
DEBUG: Deferred pruning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
-- one final interesting preperad statement
|
-- one final interesting preperad statement
|
||||||
-- where one of the filters is on the target list
|
-- where one of the filters is on the target list
|
||||||
PREPARE fast_path_agg_filter(int, int) AS
|
PREPARE fast_path_agg_filter(int, int) AS
|
||||||
|
|
|
@ -96,7 +96,6 @@ INSERT INTO distributed_table
|
||||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
|
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
|
||||||
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: CTE ids_to_insert is going to be inlined via distributed planning
|
DEBUG: CTE ids_to_insert is going to be inlined via distributed planning
|
||||||
DEBUG: generating subplan XXX_1 for subquery SELECT (((tenant_id)::integer OPERATOR(pg_catalog.*) 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.>) 7)
|
DEBUG: generating subplan XXX_1 for subquery SELECT (((tenant_id)::integer OPERATOR(pg_catalog.*) 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.>) 7)
|
||||||
DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) ids_to_insert.tenant_id)
|
DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) ids_to_insert.tenant_id)
|
||||||
|
|
|
@ -6,7 +6,6 @@ SET citus.next_shard_id TO 4213581;
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.replication_model TO 'streaming';
|
SET citus.replication_model TO 'streaming';
|
||||||
|
|
||||||
-- Test 1
|
|
||||||
-- 4 shards, hash distributed.
|
-- 4 shards, hash distributed.
|
||||||
-- Negate distribution column value.
|
-- Negate distribution column value.
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
|
@ -25,8 +24,8 @@ SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a;
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Test 2.
|
|
||||||
-- range partitioning, composite distribution column
|
-- range partitioning, composite distribution column
|
||||||
|
--
|
||||||
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
CREATE TYPE composite_key_type AS (f1 int, f2 text);
|
||||||
|
|
||||||
-- source
|
-- source
|
||||||
|
@ -82,13 +81,15 @@ INSERT INTO target_table(value) SELECT value FROM source_table;
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
-- different column types
|
-- different column types
|
||||||
|
-- verifies that we add necessary casts, otherwise even shard routing won't
|
||||||
|
-- work correctly and we will see 2 values for the same primary key.
|
||||||
CREATE TABLE target_table(col_1 int primary key, col_2 int);
|
CREATE TABLE target_table(col_1 int primary key, col_2 int);
|
||||||
SELECT create_distributed_table('target_table','col_1');
|
SELECT create_distributed_table('target_table','col_1');
|
||||||
INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6);
|
INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6);
|
||||||
|
|
||||||
CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric);
|
CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric);
|
||||||
SELECT create_distributed_table('source_table','col_1');
|
SELECT create_distributed_table('source_table','col_1');
|
||||||
INSERT INTO source_table VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5);
|
INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5);
|
||||||
|
|
||||||
SET client_min_messages TO DEBUG2;
|
SET client_min_messages TO DEBUG2;
|
||||||
INSERT INTO target_table
|
INSERT INTO target_table
|
||||||
|
@ -103,5 +104,26 @@ SELECT * FROM target_table ORDER BY 1;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- array coercion
|
||||||
|
--
|
||||||
|
SET citus.shard_count TO 3;
|
||||||
|
CREATE TABLE source_table(a int, mapped_key int, c float[]);
|
||||||
|
SELECT create_distributed_table('source_table', 'a');
|
||||||
|
INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]),
|
||||||
|
(3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]);
|
||||||
|
|
||||||
|
SET citus.shard_count TO 2;
|
||||||
|
CREATE TABLE target_table(a int, b int[]);
|
||||||
|
SELECT create_distributed_table('target_table', 'a');
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue