diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4df4f4f62..ab0c8f485 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -123,6 +123,17 @@ static int64 RemoteCreateEmptyShard(char *relationName); static void MasterUpdateShardStatistics(uint64 shardId); static void RemoteUpdateShardStatistics(uint64 shardId); +static void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result); +static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName); +static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor); +static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor, + TupleDesc inputTupleDescriptor, + Oid destRelId, List *columnNameList, + Oid *finalColumnTypeArray); +static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, + bool binaryFormat); +static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath); + /* Private functions copied and adapted from copy.c in PostgreSQL */ static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); static void CopySendString(CopyOutState outputState, const char *str); @@ -598,7 +609,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) /* replicate row to shard placements */ resetStringInfo(copyOutState->fe_msgbuf); AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); + copyOutState, columnOutputFunctions, NULL); SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId, shardConnections->connectionList); @@ -1218,28 +1229,195 @@ ReportCopyError(MultiConnection *connection, PGresult *result) /* - * ColumnOutputFunctions walks over a table's columns, and finds each column's - * type information. The function then resolves each type's output function, - * and stores and returns these output functions in an array. + * ConversionPathForTypes fills *result with all the data necessary for converting + * Datums of type inputType to Datums of type destType. */ -FmgrInfo * -ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) +static void +ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result) +{ + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = COERCION_PATH_RELABELTYPE; + + if (destType == inputType) + { + result->coercionType = COERCION_PATH_RELABELTYPE; + return; + } + + coercionType = find_coercion_pathway(destType, inputType, + COERCION_EXPLICIT, + &coercionFuncId); + + switch (coercionType) + { + case COERCION_PATH_NONE: + { + ereport(ERROR, (errmsg("cannot cast %d to %d", inputType, destType))); + } + + case COERCION_PATH_ARRAYCOERCE: + { + ereport(ERROR, (errmsg("can not run query which uses an implicit coercion" + " between array types"))); + } + + case COERCION_PATH_COERCEVIAIO: + { + result->coercionType = COERCION_PATH_COERCEVIAIO; + + { + bool typisvarlena = false; /* ignored */ + Oid iofunc = InvalidOid; + getTypeOutputInfo(inputType, &iofunc, &typisvarlena); + fmgr_info(iofunc, &(result->outputFunction)); + } + + { + Oid iofunc = InvalidOid; + getTypeInputInfo(destType, &iofunc, &(result->typioparam)); + fmgr_info(iofunc, &(result->inputFunction)); + } + + return; + } + + case COERCION_PATH_FUNC: + { + result->coercionType = COERCION_PATH_FUNC; + fmgr_info(coercionFuncId, &(result->coerceFunction)); + return; + } + + case COERCION_PATH_RELABELTYPE: + { + result->coercionType = COERCION_PATH_RELABELTYPE; + return; /* the types are binary compatible, no need to call a function */ + } + + default: + Assert(false); /* there are no other options for this enum */ + } +} + + +/* + * Returns the type of the provided column of the provided tuple. Throws an error if the + * column does not exist or is dropped. + * + * tupleDescriptor and relationId must refer to the same table. + */ +static Oid +TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName) +{ + AttrNumber destAttrNumber = get_attnum(relationId, columnName); + Form_pg_attribute attr = NULL; + + if (destAttrNumber == InvalidAttrNumber) + { + ereport(ERROR, (errmsg("invalid attr? %s", columnName))); + } + + attr = TupleDescAttr(tupleDescriptor, destAttrNumber - 1); + return attr->atttypid; +} + + +/* + * Walks a TupleDesc and returns an array of the types of each attribute. Will return + * InvalidOid in the place of dropped attributes. + */ +static Oid * +TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor) +{ + int columnCount = tupleDescriptor->natts; + Oid *typeArray = palloc0(columnCount * sizeof(Oid)); + int columnIndex = 0; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Form_pg_attribute attr = TupleDescAttr(tupleDescriptor, columnIndex); + if (attr->attisdropped) + { + typeArray[columnIndex] = InvalidOid; + } + else + { + typeArray[columnIndex] = attr->atttypid; + } + } + + return typeArray; +} + + +/* + * ColumnCoercionPaths scans the input and output tuples looking for mismatched types, + * it then returns an array of coercion functions to use on the input tuples, and an + * array of types which descript the output tuple + */ +static CopyCoercionData * +ColumnCoercionPaths(TupleDesc destTupleDescriptor, TupleDesc inputTupleDescriptor, + Oid destRelId, List *columnNameList, + Oid *finalColumnTypeArray) +{ + int columnIndex = 0; + int columnCount = inputTupleDescriptor->natts; + CopyCoercionData *coercePaths = palloc0(columnCount * sizeof(CopyCoercionData)); + Oid *inputTupleTypes = TypeArrayFromTupleDescriptor(inputTupleDescriptor); + ListCell *currentColumnName = list_head(columnNameList); + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Oid destTupleType = InvalidOid; + Oid inputTupleType = inputTupleTypes[columnIndex]; + char *columnName = lfirst(currentColumnName); + + if (inputTupleType == InvalidOid) + { + /* this was a dropped column and will not be in the incoming tuples */ + continue; + } + + destTupleType = TypeForColumnName(destRelId, destTupleDescriptor, columnName); + + finalColumnTypeArray[columnIndex] = destTupleType; + + ConversionPathForTypes(inputTupleType, destTupleType, + &coercePaths[columnIndex]); + + currentColumnName = lnext(currentColumnName); + + if (currentColumnName == NULL) + { + /* the rest of inputTupleDescriptor are dropped columns, return early! */ + break; + } + } + + return coercePaths; +} + + +/* + * TypeOutputFunctions takes an array of types and returns an array of output functions + * for those types. + */ +static FmgrInfo * +TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat) { - uint32 columnCount = (uint32) rowDescriptor->natts; FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo)); uint32 columnIndex = 0; for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex]; - Form_pg_attribute currentColumn = TupleDescAttr(rowDescriptor, columnIndex); - Oid columnTypeId = currentColumn->atttypid; - Oid outputFunctionId = InvalidOid; + Oid columnTypeId = typeIdArray[columnIndex]; bool typeVariableLength = false; + Oid outputFunctionId = InvalidOid; - if (currentColumn->attisdropped) + /* If there are any dropped columns it'll show up as a NULL */ + if (columnTypeId == InvalidOid) { - /* dropped column, leave the output function NULL */ continue; } else if (binaryFormat) @@ -1258,6 +1436,23 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) } +/* + * ColumnOutputFunctions is a wrapper around TypeOutputFunctions, it takes a + * tupleDescriptor and returns an array of output functions, one for each column in + * the tuple. + */ +FmgrInfo * +ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) +{ + uint32 columnCount = (uint32) rowDescriptor->natts; + Oid *columnTypes = TypeArrayFromTupleDescriptor(rowDescriptor); + FmgrInfo *outputFunctions = + TypeOutputFunctions(columnCount, columnTypes, binaryFormat); + + return outputFunctions; +} + + /* * AppendCopyRowData serializes one row using the column output functions, * and appends the data to the row output state object's message buffer. @@ -1268,7 +1463,8 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) */ void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, - CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions) + CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions, + CopyCoercionData *columnCoercionPaths) { uint32 totalColumnCount = (uint32) rowDescriptor->natts; uint32 availableColumnCount = AvailableColumnCount(rowDescriptor); @@ -1288,6 +1484,11 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, bool isNull = isNullArray[columnIndex]; bool lastColumn = false; + if (!isNull && columnCoercionPaths != NULL) + { + value = CoerceColumnValue(value, &columnCoercionPaths[columnIndex]); + } + if (currentColumn->attisdropped) { continue; @@ -1346,6 +1547,54 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, } +/* + * CoerceColumnValue follows the instructions in *coercionPath and uses them to convert + * inputValue into a Datum of the correct type. + */ +static Datum +CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath) +{ + switch (coercionPath->coercionType) + { + case 0: + { + return inputValue; /* this was a dropped column */ + } + + case COERCION_PATH_RELABELTYPE: + { + return inputValue; /* no need to do anything */ + } + + case COERCION_PATH_FUNC: + { + FmgrInfo *coerceFunction = &(coercionPath->coerceFunction); + Datum outputValue = FunctionCall1(coerceFunction, inputValue); + return outputValue; + } + + case COERCION_PATH_COERCEVIAIO: + { + FmgrInfo *outFunction = &(coercionPath->outputFunction); + Datum textRepr = FunctionCall1(outFunction, inputValue); + + FmgrInfo *inFunction = &(coercionPath->inputFunction); + Oid typioparam = coercionPath->typioparam; + Datum outputValue = FunctionCall3(inFunction, textRepr, typioparam, + Int32GetDatum(-1)); + + return outputValue; + } + + default: + { + /* this should never happen */ + ereport(ERROR, (errmsg("unsupported coercion type"))); + } + } +} + + /* * AvailableColumnCount returns the number of columns in a tuple descriptor, excluding * columns that were dropped. @@ -1853,9 +2102,19 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->copyOutState = copyOutState; - /* prepare output functions */ - copyDest->columnOutputFunctions = - ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + /* prepare functions to call on received tuples */ + { + TupleDesc destTupleDescriptor = distributedRelation->rd_att; + int columnCount = inputTupleDescriptor->natts; + Oid *finalTypeArray = palloc0(columnCount * sizeof(Oid)); + + copyDest->columnCoercionPaths = + ColumnCoercionPaths(destTupleDescriptor, inputTupleDescriptor, + tableId, columnNameList, finalTypeArray); + + copyDest->columnOutputFunctions = + TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary); + } /* ensure the column names are properly quoted in the COPY statement */ foreach(columnNameCell, columnNameList) @@ -1906,6 +2165,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) HTAB *shardConnectionHash = copyDest->shardConnectionHash; CopyOutState copyOutState = copyDest->copyOutState; FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; bool stopOnFailure = copyDest->stopOnFailure; @@ -1937,6 +2197,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) */ if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX) { + CopyCoercionData *coercePath = &columnCoercionPaths[partitionColumnIndex]; + if (columnNulls[partitionColumnIndex]) { Oid relationId = copyDest->distributedRelationId; @@ -1953,6 +2215,9 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) /* find the partition column value */ partitionColumnValue = columnValues[partitionColumnIndex]; + + /* annoyingly this is evaluated twice, but at least we don't crash! */ + partitionColumnValue = CoerceColumnValue(partitionColumnValue, coercePath); } /* @@ -1995,7 +2260,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) /* replicate row to shard placements */ resetStringInfo(copyOutState->fe_msgbuf); AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); + copyOutState, columnOutputFunctions, columnCoercionPaths); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList); MemoryContextSwitchTo(oldContext); @@ -2066,5 +2331,10 @@ CitusCopyDestReceiverDestroy(DestReceiver *destReceiver) pfree(copyDest->columnOutputFunctions); } + if (copyDest->columnCoercionPaths) + { + pfree(copyDest->columnCoercionPaths); + } + pfree(copyDest); } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index 15c646dba..9f61a6d40 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -64,8 +64,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query, static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse); static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery); static Query * WrapSubquery(Query *subquery); -static void CastSelectTargetList(List *selectTargetList, Oid targetRelationId, - List *insertTargetList); static bool CheckInsertSelectQuery(Query *query); @@ -1185,10 +1183,6 @@ CreateCoordinatorInsertSelectPlan(Query *parse) ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); - /* make sure the SELECT returns the right type for copying into the table */ - CastSelectTargetList(selectQuery->targetList, targetRelationId, - insertSelectQuery->targetList); - multiPlan->insertSelectSubquery = selectQuery; multiPlan->insertTargetList = insertSelectQuery->targetList; multiPlan->targetRelationId = targetRelationId; @@ -1309,62 +1303,3 @@ WrapSubquery(Query *subquery) return outerQuery; } - - -/* - * CastSelectTargetList adds casts to the target entries in selectTargetList - * to match the type in insertTargetList. This ensures that the results of - * the SELECT will have the right type when serialised during COPY. For - * example, a float that is inserted into a an int column normally has an - * implicit cast, but if we send it through the COPY protocol the serialised - * form would contain decimal notation, which is not valid for int. - */ -static void -CastSelectTargetList(List *selectTargetList, Oid targetRelationId, List *insertTargetList) -{ - ListCell *insertTargetCell = NULL; - ListCell *selectTargetCell = NULL; - - /* add casts when the SELECT output does not directly match the table */ - forboth(insertTargetCell, insertTargetList, - selectTargetCell, selectTargetList) - { - TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); - TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell); - - Var *columnVar = NULL; - Oid columnType = InvalidOid; - int32 columnTypeMod = 0; - Oid selectOutputType = InvalidOid; - - /* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */ - if (!IsA(insertTargetEntry->expr, Var)) - { - ereport(ERROR, (errmsg("can only handle regular columns in the target " - "list"))); - } - - columnVar = (Var *) insertTargetEntry->expr; - columnType = get_atttype(targetRelationId, columnVar->varattno); - columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno); - selectOutputType = columnVar->vartype; - - /* - * If the type in the target list does not match the type of the column, - * we need to cast to the column type. PostgreSQL would do this - * automatically during the insert, but we're passing the SELECT - * output directly to COPY. - */ - if (columnType != selectOutputType) - { - Expr *selectExpression = selectTargetEntry->expr; - Expr *typeCastedSelectExpr = - (Expr *) coerce_to_target_type(NULL, (Node *) selectExpression, - selectOutputType, columnType, - columnTypeMod, COERCION_EXPLICIT, - COERCE_IMPLICIT_CAST, -1); - - selectTargetEntry->expr = typeCastedSelectExpr; - } - } -} diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index f68967934..bc8e29da2 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -869,7 +869,7 @@ FilterAndPartitionTable(const char *filterQuery, heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray); AppendCopyRowData(valueArray, isNullArray, rowDescriptor, - rowOutputState, columnOutputFunctions); + rowOutputState, columnOutputFunctions, NULL); rowText = rowOutputState->fe_msgbuf; diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index a40f9e3e7..6e51efbcc 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -17,6 +17,7 @@ #include "distributed/metadata_cache.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" +#include "parser/parse_coerce.h" #include "tcop/dest.h" @@ -51,6 +52,17 @@ typedef struct NodeAddress int32 nodePort; } NodeAddress; +/* struct to allow rReceive to coerce tuples before sending them to workers */ +typedef struct CopyCoercionData +{ + CoercionPathType coercionType; + FmgrInfo coerceFunction; + + FmgrInfo inputFunction; + FmgrInfo outputFunction; + Oid typioparam; /* inputFunction has an extra param */ +} CopyCoercionData; + /* CopyDestReceiver can be used to stream results into a distributed table */ typedef struct CitusCopyDestReceiver { @@ -88,6 +100,9 @@ typedef struct CitusCopyDestReceiver CopyOutState copyOutState; FmgrInfo *columnOutputFunctions; + /* instructions for coercing incoming tuples */ + CopyCoercionData *columnCoercionPaths; + /* number of tuples sent */ int64 tuplesSent; } CitusCopyDestReceiver; @@ -103,7 +118,8 @@ extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryForm extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, CopyOutState rowOutputState, - FmgrInfo *columnOutputFunctions); + FmgrInfo *columnOutputFunctions, + CopyCoercionData *columnCoercionPaths); extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index d99e91fca..1c332e84a 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -1101,21 +1101,19 @@ INSERT INTO lineitem_hash_part WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) SELECT s FROM cte1; Custom Scan (Citus INSERT ... SELECT via coordinator) - -> Subquery Scan on citus_insert_select_subquery + -> CTE Scan on cte1 CTE cte1 -> Function Scan on generate_series s - -> CTE Scan on cte1 - CTE cte1 - -> Limit - -> CTE Scan on cte1 cte1_1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT via coordinator) - -> Subquery Scan on citus_insert_select_subquery - -> HashAggregate - Group Key: s.s - -> Append - -> Function Scan on generate_series s - -> Function Scan on generate_series s_1 + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index 2d9f73c63..4d856e33c 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -1101,21 +1101,19 @@ INSERT INTO lineitem_hash_part WITH cte1 AS (SELECT * FROM cte1 LIMIT 5) SELECT s FROM cte1; Custom Scan (Citus INSERT ... SELECT via coordinator) - -> Subquery Scan on citus_insert_select_subquery + -> CTE Scan on cte1 CTE cte1 -> Function Scan on generate_series s - -> CTE Scan on cte1 - CTE cte1 - -> Limit - -> CTE Scan on cte1 cte1_1 + CTE cte1 + -> Limit + -> CTE Scan on cte1 cte1_1 EXPLAIN (COSTS OFF) INSERT INTO lineitem_hash_part ( SELECT s FROM generate_series(1,5) s) UNION ( SELECT s FROM generate_series(5,10) s); Custom Scan (Citus INSERT ... SELECT via coordinator) - -> Subquery Scan on citus_insert_select_subquery - -> HashAggregate - Group Key: s.s - -> Append - -> Function Scan on generate_series s - -> Function Scan on generate_series s_1 + -> HashAggregate + Group Key: s.s + -> Append + -> Function Scan on generate_series s + -> Function Scan on generate_series s_1 diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 93b565d62..8749d135e 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -2504,6 +2504,144 @@ SELECT * FROM drop_col_table WHERE col2 = '1'; (1 row) RESET client_min_messages; +-- make sure casts are handled correctly +CREATE TABLE coerce_events(user_id int, time timestamp, value_1 numeric); +SELECT create_distributed_table('coerce_events', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE coerce_agg (user_id int, value_1_agg int); +SELECT create_distributed_table('coerce_agg', 'user_id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO coerce_events(user_id, value_1) VALUES (1, 1), (2, 2), (10, 10); +-- numeric -> int (straight function) +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +ORDER BY 2 DESC, 1 DESC +LIMIT 5; +-- int -> text +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE text; +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +SELECT * FROM coerce_agg; + user_id | value_1_agg +---------+------------- + 10 | 10 + 1 | 1 + 1 | 1 + 10 | 10 + 2 | 2 + 2 | 2 +(6 rows) + +TRUNCATE coerce_agg; +-- int -> char(1) +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(1); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +ERROR: value too long for type character(1) +DETAIL: (null) +SELECT * FROM coerce_agg; + user_id | value_1_agg +---------+------------- +(0 rows) + +TRUNCATE coerce_agg; +TRUNCATE coerce_events; +-- char(5) -> char(1) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(5); +INSERT INTO coerce_events(user_id, value_1) VALUES (1, 'aaaaa'), (2, 'bbbbb'); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +ERROR: value too long for type character(1) +DETAIL: (null) +-- char(1) -> char(5) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(1) USING value_1::char(1); +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(5); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +SELECT * FROM coerce_agg; + user_id | value_1_agg +---------+------------- + 1 | a + 2 | b +(2 rows) + +TRUNCATE coerce_agg; +TRUNCATE coerce_events; +-- integer -> integer (check VALUE < 5) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer USING NULL; +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE integer USING NULL; +ALTER TABLE coerce_agg ADD CONSTRAINT small_number CHECK (value_1_agg < 5); +INSERT INTO coerce_events (user_id, value_1) VALUES (1, 1), (10, 10); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop; +ERROR: new row for relation "coerce_agg_13300060" violates check constraint "small_number_13300060" +DETAIL: Failing row contains (10, 10). +CONTEXT: while executing command on localhost:57637 +SELECT * FROM coerce_agg; + user_id | value_1_agg +---------+------------- +(0 rows) + +-- integer[3] -> text[3] +TRUNCATE coerce_events; +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer[3] USING NULL; +INSERT INTO coerce_events(user_id, value_1) VALUES (1, '{1,1,1}'), (2, '{2,2,2}'); +ALTER TABLE coerce_agg DROP COLUMN value_1_agg; +ALTER TABLE coerce_agg ADD COLUMN value_1_agg text[3]; +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +SELECT * FROM coerce_agg; + user_id | value_1_agg +---------+------------- + 1 | {1,1,1} + 2 | {2,2,2} +(2 rows) + +-- wrap in a transaction to improve performance +BEGIN; +DROP TABLE coerce_events; +DROP TABLE coerce_agg; DROP TABLE drop_col_table; DROP TABLE raw_table; DROP TABLE summary_table; @@ -2516,3 +2654,4 @@ DROP TABLE table_with_serial; DROP TABLE text_table; DROP TABLE char_table; DROP TABLE table_with_starts_with_defaults; +COMMIT; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index 715830760..32444a280 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -297,6 +297,7 @@ ON (f.id = f2.id)) as outer_most GROUP BY outer_most.id; + -- subqueries in WHERE clause INSERT INTO raw_events_second (user_id) @@ -1956,6 +1957,115 @@ SELECT * FROM drop_col_table WHERE col2 = '1'; RESET client_min_messages; +-- make sure casts are handled correctly +CREATE TABLE coerce_events(user_id int, time timestamp, value_1 numeric); +SELECT create_distributed_table('coerce_events', 'user_id'); + +CREATE TABLE coerce_agg (user_id int, value_1_agg int); +SELECT create_distributed_table('coerce_agg', 'user_id'); + +INSERT INTO coerce_events(user_id, value_1) VALUES (1, 1), (2, 2), (10, 10); + +-- numeric -> int (straight function) +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +ORDER BY 2 DESC, 1 DESC +LIMIT 5; + +-- int -> text +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE text; +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; + +SELECT * FROM coerce_agg; + +TRUNCATE coerce_agg; + +-- int -> char(1) +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(1); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; + +SELECT * FROM coerce_agg; + +TRUNCATE coerce_agg; +TRUNCATE coerce_events; + +-- char(5) -> char(1) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(5); +INSERT INTO coerce_events(user_id, value_1) VALUES (1, 'aaaaa'), (2, 'bbbbb'); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; + +-- char(1) -> char(5) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(1) USING value_1::char(1); +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(5); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; +SELECT * FROM coerce_agg; + +TRUNCATE coerce_agg; +TRUNCATE coerce_events; + +-- integer -> integer (check VALUE < 5) +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer USING NULL; +ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE integer USING NULL; +ALTER TABLE coerce_agg ADD CONSTRAINT small_number CHECK (value_1_agg < 5); + +INSERT INTO coerce_events (user_id, value_1) VALUES (1, 1), (10, 10); +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop; + +SELECT * FROM coerce_agg; + +-- integer[3] -> text[3] +TRUNCATE coerce_events; +ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer[3] USING NULL; +INSERT INTO coerce_events(user_id, value_1) VALUES (1, '{1,1,1}'), (2, '{2,2,2}'); +ALTER TABLE coerce_agg DROP COLUMN value_1_agg; +ALTER TABLE coerce_agg ADD COLUMN value_1_agg text[3]; +INSERT INTO coerce_agg(user_id, value_1_agg) +SELECT * +FROM ( + SELECT user_id, value_1 + FROM coerce_events +) AS ftop +LIMIT 5; + +SELECT * FROM coerce_agg; + +-- wrap in a transaction to improve performance +BEGIN; +DROP TABLE coerce_events; +DROP TABLE coerce_agg; DROP TABLE drop_col_table; DROP TABLE raw_table; DROP TABLE summary_table; @@ -1968,3 +2078,4 @@ DROP TABLE table_with_serial; DROP TABLE text_table; DROP TABLE char_table; DROP TABLE table_with_starts_with_defaults; +COMMIT;