mirror of https://github.com/citusdata/citus.git
Support implicit casts during INSERT/SELECT
It's possible to build INSERT SELECT queries which include implicit casts, currently we attempt to support these by adding explicit casts to the SELECT query, but this sometimes crashes because we don't update all nodes with the new types. (SortClauses, for instance) This commit removes those explicit casts and passes an unmodified SELECT query to the COPY executor (how we implement INSERT SELECT under the scenes). In lieu of those cases, COPY has been given some extra logic to inspect queries, notice that the types don't line up with the table it's supposed to be inserting into, and "manually" casting every tuple before sending them to workers.pull/1761/head
parent
074ae766de
commit
7be1545843
|
@ -123,6 +123,17 @@ static int64 RemoteCreateEmptyShard(char *relationName);
|
||||||
static void MasterUpdateShardStatistics(uint64 shardId);
|
static void MasterUpdateShardStatistics(uint64 shardId);
|
||||||
static void RemoteUpdateShardStatistics(uint64 shardId);
|
static void RemoteUpdateShardStatistics(uint64 shardId);
|
||||||
|
|
||||||
|
static void ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result);
|
||||||
|
static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName);
|
||||||
|
static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor);
|
||||||
|
static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor,
|
||||||
|
TupleDesc inputTupleDescriptor,
|
||||||
|
Oid destRelId, List *columnNameList,
|
||||||
|
Oid *finalColumnTypeArray);
|
||||||
|
static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
|
||||||
|
bool binaryFormat);
|
||||||
|
static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath);
|
||||||
|
|
||||||
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
||||||
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
||||||
static void CopySendString(CopyOutState outputState, const char *str);
|
static void CopySendString(CopyOutState outputState, const char *str);
|
||||||
|
@ -598,7 +609,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
/* replicate row to shard placements */
|
/* replicate row to shard placements */
|
||||||
resetStringInfo(copyOutState->fe_msgbuf);
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
copyOutState, columnOutputFunctions);
|
copyOutState, columnOutputFunctions, NULL);
|
||||||
SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId,
|
SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId,
|
||||||
shardConnections->connectionList);
|
shardConnections->connectionList);
|
||||||
|
|
||||||
|
@ -1218,28 +1229,195 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColumnOutputFunctions walks over a table's columns, and finds each column's
|
* ConversionPathForTypes fills *result with all the data necessary for converting
|
||||||
* type information. The function then resolves each type's output function,
|
* Datums of type inputType to Datums of type destType.
|
||||||
* and stores and returns these output functions in an array.
|
|
||||||
*/
|
*/
|
||||||
FmgrInfo *
|
static void
|
||||||
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
ConversionPathForTypes(Oid inputType, Oid destType, CopyCoercionData *result)
|
||||||
|
{
|
||||||
|
Oid coercionFuncId = InvalidOid;
|
||||||
|
CoercionPathType coercionType = COERCION_PATH_RELABELTYPE;
|
||||||
|
|
||||||
|
if (destType == inputType)
|
||||||
|
{
|
||||||
|
result->coercionType = COERCION_PATH_RELABELTYPE;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
coercionType = find_coercion_pathway(destType, inputType,
|
||||||
|
COERCION_EXPLICIT,
|
||||||
|
&coercionFuncId);
|
||||||
|
|
||||||
|
switch (coercionType)
|
||||||
|
{
|
||||||
|
case COERCION_PATH_NONE:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot cast %d to %d", inputType, destType)));
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_ARRAYCOERCE:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("can not run query which uses an implicit coercion"
|
||||||
|
" between array types")));
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_COERCEVIAIO:
|
||||||
|
{
|
||||||
|
result->coercionType = COERCION_PATH_COERCEVIAIO;
|
||||||
|
|
||||||
|
{
|
||||||
|
bool typisvarlena = false; /* ignored */
|
||||||
|
Oid iofunc = InvalidOid;
|
||||||
|
getTypeOutputInfo(inputType, &iofunc, &typisvarlena);
|
||||||
|
fmgr_info(iofunc, &(result->outputFunction));
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
Oid iofunc = InvalidOid;
|
||||||
|
getTypeInputInfo(destType, &iofunc, &(result->typioparam));
|
||||||
|
fmgr_info(iofunc, &(result->inputFunction));
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_FUNC:
|
||||||
|
{
|
||||||
|
result->coercionType = COERCION_PATH_FUNC;
|
||||||
|
fmgr_info(coercionFuncId, &(result->coerceFunction));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_RELABELTYPE:
|
||||||
|
{
|
||||||
|
result->coercionType = COERCION_PATH_RELABELTYPE;
|
||||||
|
return; /* the types are binary compatible, no need to call a function */
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
Assert(false); /* there are no other options for this enum */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns the type of the provided column of the provided tuple. Throws an error if the
|
||||||
|
* column does not exist or is dropped.
|
||||||
|
*
|
||||||
|
* tupleDescriptor and relationId must refer to the same table.
|
||||||
|
*/
|
||||||
|
static Oid
|
||||||
|
TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName)
|
||||||
|
{
|
||||||
|
AttrNumber destAttrNumber = get_attnum(relationId, columnName);
|
||||||
|
Form_pg_attribute attr = NULL;
|
||||||
|
|
||||||
|
if (destAttrNumber == InvalidAttrNumber)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("invalid attr? %s", columnName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
attr = TupleDescAttr(tupleDescriptor, destAttrNumber - 1);
|
||||||
|
return attr->atttypid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Walks a TupleDesc and returns an array of the types of each attribute. Will return
|
||||||
|
* InvalidOid in the place of dropped attributes.
|
||||||
|
*/
|
||||||
|
static Oid *
|
||||||
|
TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor)
|
||||||
|
{
|
||||||
|
int columnCount = tupleDescriptor->natts;
|
||||||
|
Oid *typeArray = palloc0(columnCount * sizeof(Oid));
|
||||||
|
int columnIndex = 0;
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute attr = TupleDescAttr(tupleDescriptor, columnIndex);
|
||||||
|
if (attr->attisdropped)
|
||||||
|
{
|
||||||
|
typeArray[columnIndex] = InvalidOid;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
typeArray[columnIndex] = attr->atttypid;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return typeArray;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnCoercionPaths scans the input and output tuples looking for mismatched types,
|
||||||
|
* it then returns an array of coercion functions to use on the input tuples, and an
|
||||||
|
* array of types which descript the output tuple
|
||||||
|
*/
|
||||||
|
static CopyCoercionData *
|
||||||
|
ColumnCoercionPaths(TupleDesc destTupleDescriptor, TupleDesc inputTupleDescriptor,
|
||||||
|
Oid destRelId, List *columnNameList,
|
||||||
|
Oid *finalColumnTypeArray)
|
||||||
|
{
|
||||||
|
int columnIndex = 0;
|
||||||
|
int columnCount = inputTupleDescriptor->natts;
|
||||||
|
CopyCoercionData *coercePaths = palloc0(columnCount * sizeof(CopyCoercionData));
|
||||||
|
Oid *inputTupleTypes = TypeArrayFromTupleDescriptor(inputTupleDescriptor);
|
||||||
|
ListCell *currentColumnName = list_head(columnNameList);
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
Oid destTupleType = InvalidOid;
|
||||||
|
Oid inputTupleType = inputTupleTypes[columnIndex];
|
||||||
|
char *columnName = lfirst(currentColumnName);
|
||||||
|
|
||||||
|
if (inputTupleType == InvalidOid)
|
||||||
|
{
|
||||||
|
/* this was a dropped column and will not be in the incoming tuples */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
destTupleType = TypeForColumnName(destRelId, destTupleDescriptor, columnName);
|
||||||
|
|
||||||
|
finalColumnTypeArray[columnIndex] = destTupleType;
|
||||||
|
|
||||||
|
ConversionPathForTypes(inputTupleType, destTupleType,
|
||||||
|
&coercePaths[columnIndex]);
|
||||||
|
|
||||||
|
currentColumnName = lnext(currentColumnName);
|
||||||
|
|
||||||
|
if (currentColumnName == NULL)
|
||||||
|
{
|
||||||
|
/* the rest of inputTupleDescriptor are dropped columns, return early! */
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return coercePaths;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TypeOutputFunctions takes an array of types and returns an array of output functions
|
||||||
|
* for those types.
|
||||||
|
*/
|
||||||
|
static FmgrInfo *
|
||||||
|
TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat)
|
||||||
{
|
{
|
||||||
uint32 columnCount = (uint32) rowDescriptor->natts;
|
|
||||||
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
|
FmgrInfo *columnOutputFunctions = palloc0(columnCount * sizeof(FmgrInfo));
|
||||||
|
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
|
FmgrInfo *currentOutputFunction = &columnOutputFunctions[columnIndex];
|
||||||
Form_pg_attribute currentColumn = TupleDescAttr(rowDescriptor, columnIndex);
|
Oid columnTypeId = typeIdArray[columnIndex];
|
||||||
Oid columnTypeId = currentColumn->atttypid;
|
|
||||||
Oid outputFunctionId = InvalidOid;
|
|
||||||
bool typeVariableLength = false;
|
bool typeVariableLength = false;
|
||||||
|
Oid outputFunctionId = InvalidOid;
|
||||||
|
|
||||||
if (currentColumn->attisdropped)
|
/* If there are any dropped columns it'll show up as a NULL */
|
||||||
|
if (columnTypeId == InvalidOid)
|
||||||
{
|
{
|
||||||
/* dropped column, leave the output function NULL */
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (binaryFormat)
|
else if (binaryFormat)
|
||||||
|
@ -1258,6 +1436,23 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnOutputFunctions is a wrapper around TypeOutputFunctions, it takes a
|
||||||
|
* tupleDescriptor and returns an array of output functions, one for each column in
|
||||||
|
* the tuple.
|
||||||
|
*/
|
||||||
|
FmgrInfo *
|
||||||
|
ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||||
|
{
|
||||||
|
uint32 columnCount = (uint32) rowDescriptor->natts;
|
||||||
|
Oid *columnTypes = TypeArrayFromTupleDescriptor(rowDescriptor);
|
||||||
|
FmgrInfo *outputFunctions =
|
||||||
|
TypeOutputFunctions(columnCount, columnTypes, binaryFormat);
|
||||||
|
|
||||||
|
return outputFunctions;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AppendCopyRowData serializes one row using the column output functions,
|
* AppendCopyRowData serializes one row using the column output functions,
|
||||||
* and appends the data to the row output state object's message buffer.
|
* and appends the data to the row output state object's message buffer.
|
||||||
|
@ -1268,7 +1463,8 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions)
|
CopyOutState rowOutputState, FmgrInfo *columnOutputFunctions,
|
||||||
|
CopyCoercionData *columnCoercionPaths)
|
||||||
{
|
{
|
||||||
uint32 totalColumnCount = (uint32) rowDescriptor->natts;
|
uint32 totalColumnCount = (uint32) rowDescriptor->natts;
|
||||||
uint32 availableColumnCount = AvailableColumnCount(rowDescriptor);
|
uint32 availableColumnCount = AvailableColumnCount(rowDescriptor);
|
||||||
|
@ -1288,6 +1484,11 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
bool isNull = isNullArray[columnIndex];
|
bool isNull = isNullArray[columnIndex];
|
||||||
bool lastColumn = false;
|
bool lastColumn = false;
|
||||||
|
|
||||||
|
if (!isNull && columnCoercionPaths != NULL)
|
||||||
|
{
|
||||||
|
value = CoerceColumnValue(value, &columnCoercionPaths[columnIndex]);
|
||||||
|
}
|
||||||
|
|
||||||
if (currentColumn->attisdropped)
|
if (currentColumn->attisdropped)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
@ -1346,6 +1547,54 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CoerceColumnValue follows the instructions in *coercionPath and uses them to convert
|
||||||
|
* inputValue into a Datum of the correct type.
|
||||||
|
*/
|
||||||
|
static Datum
|
||||||
|
CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath)
|
||||||
|
{
|
||||||
|
switch (coercionPath->coercionType)
|
||||||
|
{
|
||||||
|
case 0:
|
||||||
|
{
|
||||||
|
return inputValue; /* this was a dropped column */
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_RELABELTYPE:
|
||||||
|
{
|
||||||
|
return inputValue; /* no need to do anything */
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_FUNC:
|
||||||
|
{
|
||||||
|
FmgrInfo *coerceFunction = &(coercionPath->coerceFunction);
|
||||||
|
Datum outputValue = FunctionCall1(coerceFunction, inputValue);
|
||||||
|
return outputValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
case COERCION_PATH_COERCEVIAIO:
|
||||||
|
{
|
||||||
|
FmgrInfo *outFunction = &(coercionPath->outputFunction);
|
||||||
|
Datum textRepr = FunctionCall1(outFunction, inputValue);
|
||||||
|
|
||||||
|
FmgrInfo *inFunction = &(coercionPath->inputFunction);
|
||||||
|
Oid typioparam = coercionPath->typioparam;
|
||||||
|
Datum outputValue = FunctionCall3(inFunction, textRepr, typioparam,
|
||||||
|
Int32GetDatum(-1));
|
||||||
|
|
||||||
|
return outputValue;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
/* this should never happen */
|
||||||
|
ereport(ERROR, (errmsg("unsupported coercion type")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AvailableColumnCount returns the number of columns in a tuple descriptor, excluding
|
* AvailableColumnCount returns the number of columns in a tuple descriptor, excluding
|
||||||
* columns that were dropped.
|
* columns that were dropped.
|
||||||
|
@ -1853,9 +2102,19 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
|
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
|
||||||
copyDest->copyOutState = copyOutState;
|
copyDest->copyOutState = copyOutState;
|
||||||
|
|
||||||
/* prepare output functions */
|
/* prepare functions to call on received tuples */
|
||||||
|
{
|
||||||
|
TupleDesc destTupleDescriptor = distributedRelation->rd_att;
|
||||||
|
int columnCount = inputTupleDescriptor->natts;
|
||||||
|
Oid *finalTypeArray = palloc0(columnCount * sizeof(Oid));
|
||||||
|
|
||||||
|
copyDest->columnCoercionPaths =
|
||||||
|
ColumnCoercionPaths(destTupleDescriptor, inputTupleDescriptor,
|
||||||
|
tableId, columnNameList, finalTypeArray);
|
||||||
|
|
||||||
copyDest->columnOutputFunctions =
|
copyDest->columnOutputFunctions =
|
||||||
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
|
TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary);
|
||||||
|
}
|
||||||
|
|
||||||
/* ensure the column names are properly quoted in the COPY statement */
|
/* ensure the column names are properly quoted in the COPY statement */
|
||||||
foreach(columnNameCell, columnNameList)
|
foreach(columnNameCell, columnNameList)
|
||||||
|
@ -1906,6 +2165,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
||||||
CopyOutState copyOutState = copyDest->copyOutState;
|
CopyOutState copyOutState = copyDest->copyOutState;
|
||||||
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
||||||
|
CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths;
|
||||||
|
|
||||||
bool stopOnFailure = copyDest->stopOnFailure;
|
bool stopOnFailure = copyDest->stopOnFailure;
|
||||||
|
|
||||||
|
@ -1937,6 +2197,8 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
*/
|
*/
|
||||||
if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
|
if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
|
||||||
{
|
{
|
||||||
|
CopyCoercionData *coercePath = &columnCoercionPaths[partitionColumnIndex];
|
||||||
|
|
||||||
if (columnNulls[partitionColumnIndex])
|
if (columnNulls[partitionColumnIndex])
|
||||||
{
|
{
|
||||||
Oid relationId = copyDest->distributedRelationId;
|
Oid relationId = copyDest->distributedRelationId;
|
||||||
|
@ -1953,6 +2215,9 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
|
|
||||||
/* find the partition column value */
|
/* find the partition column value */
|
||||||
partitionColumnValue = columnValues[partitionColumnIndex];
|
partitionColumnValue = columnValues[partitionColumnIndex];
|
||||||
|
|
||||||
|
/* annoyingly this is evaluated twice, but at least we don't crash! */
|
||||||
|
partitionColumnValue = CoerceColumnValue(partitionColumnValue, coercePath);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1995,7 +2260,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
/* replicate row to shard placements */
|
/* replicate row to shard placements */
|
||||||
resetStringInfo(copyOutState->fe_msgbuf);
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
copyOutState, columnOutputFunctions);
|
copyOutState, columnOutputFunctions, columnCoercionPaths);
|
||||||
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList);
|
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
@ -2066,5 +2331,10 @@ CitusCopyDestReceiverDestroy(DestReceiver *destReceiver)
|
||||||
pfree(copyDest->columnOutputFunctions);
|
pfree(copyDest->columnOutputFunctions);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (copyDest->columnCoercionPaths)
|
||||||
|
{
|
||||||
|
pfree(copyDest->columnCoercionPaths);
|
||||||
|
}
|
||||||
|
|
||||||
pfree(copyDest);
|
pfree(copyDest);
|
||||||
}
|
}
|
||||||
|
|
|
@ -64,8 +64,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
|
||||||
static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse);
|
static MultiPlan * CreateCoordinatorInsertSelectPlan(Query *parse);
|
||||||
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
|
static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSelectQuery);
|
||||||
static Query * WrapSubquery(Query *subquery);
|
static Query * WrapSubquery(Query *subquery);
|
||||||
static void CastSelectTargetList(List *selectTargetList, Oid targetRelationId,
|
|
||||||
List *insertTargetList);
|
|
||||||
static bool CheckInsertSelectQuery(Query *query);
|
static bool CheckInsertSelectQuery(Query *query);
|
||||||
|
|
||||||
|
|
||||||
|
@ -1185,10 +1183,6 @@ CreateCoordinatorInsertSelectPlan(Query *parse)
|
||||||
|
|
||||||
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte);
|
||||||
|
|
||||||
/* make sure the SELECT returns the right type for copying into the table */
|
|
||||||
CastSelectTargetList(selectQuery->targetList, targetRelationId,
|
|
||||||
insertSelectQuery->targetList);
|
|
||||||
|
|
||||||
multiPlan->insertSelectSubquery = selectQuery;
|
multiPlan->insertSelectSubquery = selectQuery;
|
||||||
multiPlan->insertTargetList = insertSelectQuery->targetList;
|
multiPlan->insertTargetList = insertSelectQuery->targetList;
|
||||||
multiPlan->targetRelationId = targetRelationId;
|
multiPlan->targetRelationId = targetRelationId;
|
||||||
|
@ -1309,62 +1303,3 @@ WrapSubquery(Query *subquery)
|
||||||
|
|
||||||
return outerQuery;
|
return outerQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* CastSelectTargetList adds casts to the target entries in selectTargetList
|
|
||||||
* to match the type in insertTargetList. This ensures that the results of
|
|
||||||
* the SELECT will have the right type when serialised during COPY. For
|
|
||||||
* example, a float that is inserted into a an int column normally has an
|
|
||||||
* implicit cast, but if we send it through the COPY protocol the serialised
|
|
||||||
* form would contain decimal notation, which is not valid for int.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
CastSelectTargetList(List *selectTargetList, Oid targetRelationId, List *insertTargetList)
|
|
||||||
{
|
|
||||||
ListCell *insertTargetCell = NULL;
|
|
||||||
ListCell *selectTargetCell = NULL;
|
|
||||||
|
|
||||||
/* add casts when the SELECT output does not directly match the table */
|
|
||||||
forboth(insertTargetCell, insertTargetList,
|
|
||||||
selectTargetCell, selectTargetList)
|
|
||||||
{
|
|
||||||
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
|
|
||||||
TargetEntry *selectTargetEntry = (TargetEntry *) lfirst(selectTargetCell);
|
|
||||||
|
|
||||||
Var *columnVar = NULL;
|
|
||||||
Oid columnType = InvalidOid;
|
|
||||||
int32 columnTypeMod = 0;
|
|
||||||
Oid selectOutputType = InvalidOid;
|
|
||||||
|
|
||||||
/* indirection is not supported, e.g. INSERT INTO table (composite_column.x) */
|
|
||||||
if (!IsA(insertTargetEntry->expr, Var))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("can only handle regular columns in the target "
|
|
||||||
"list")));
|
|
||||||
}
|
|
||||||
|
|
||||||
columnVar = (Var *) insertTargetEntry->expr;
|
|
||||||
columnType = get_atttype(targetRelationId, columnVar->varattno);
|
|
||||||
columnTypeMod = get_atttypmod(targetRelationId, columnVar->varattno);
|
|
||||||
selectOutputType = columnVar->vartype;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If the type in the target list does not match the type of the column,
|
|
||||||
* we need to cast to the column type. PostgreSQL would do this
|
|
||||||
* automatically during the insert, but we're passing the SELECT
|
|
||||||
* output directly to COPY.
|
|
||||||
*/
|
|
||||||
if (columnType != selectOutputType)
|
|
||||||
{
|
|
||||||
Expr *selectExpression = selectTargetEntry->expr;
|
|
||||||
Expr *typeCastedSelectExpr =
|
|
||||||
(Expr *) coerce_to_target_type(NULL, (Node *) selectExpression,
|
|
||||||
selectOutputType, columnType,
|
|
||||||
columnTypeMod, COERCION_EXPLICIT,
|
|
||||||
COERCE_IMPLICIT_CAST, -1);
|
|
||||||
|
|
||||||
selectTargetEntry->expr = typeCastedSelectExpr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -869,7 +869,7 @@ FilterAndPartitionTable(const char *filterQuery,
|
||||||
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
heap_deform_tuple(row, rowDescriptor, valueArray, isNullArray);
|
||||||
|
|
||||||
AppendCopyRowData(valueArray, isNullArray, rowDescriptor,
|
AppendCopyRowData(valueArray, isNullArray, rowDescriptor,
|
||||||
rowOutputState, columnOutputFunctions);
|
rowOutputState, columnOutputFunctions, NULL);
|
||||||
|
|
||||||
rowText = rowOutputState->fe_msgbuf;
|
rowText = rowOutputState->fe_msgbuf;
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
|
#include "parser/parse_coerce.h"
|
||||||
#include "tcop/dest.h"
|
#include "tcop/dest.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -51,6 +52,17 @@ typedef struct NodeAddress
|
||||||
int32 nodePort;
|
int32 nodePort;
|
||||||
} NodeAddress;
|
} NodeAddress;
|
||||||
|
|
||||||
|
/* struct to allow rReceive to coerce tuples before sending them to workers */
|
||||||
|
typedef struct CopyCoercionData
|
||||||
|
{
|
||||||
|
CoercionPathType coercionType;
|
||||||
|
FmgrInfo coerceFunction;
|
||||||
|
|
||||||
|
FmgrInfo inputFunction;
|
||||||
|
FmgrInfo outputFunction;
|
||||||
|
Oid typioparam; /* inputFunction has an extra param */
|
||||||
|
} CopyCoercionData;
|
||||||
|
|
||||||
/* CopyDestReceiver can be used to stream results into a distributed table */
|
/* CopyDestReceiver can be used to stream results into a distributed table */
|
||||||
typedef struct CitusCopyDestReceiver
|
typedef struct CitusCopyDestReceiver
|
||||||
{
|
{
|
||||||
|
@ -88,6 +100,9 @@ typedef struct CitusCopyDestReceiver
|
||||||
CopyOutState copyOutState;
|
CopyOutState copyOutState;
|
||||||
FmgrInfo *columnOutputFunctions;
|
FmgrInfo *columnOutputFunctions;
|
||||||
|
|
||||||
|
/* instructions for coercing incoming tuples */
|
||||||
|
CopyCoercionData *columnCoercionPaths;
|
||||||
|
|
||||||
/* number of tuples sent */
|
/* number of tuples sent */
|
||||||
int64 tuplesSent;
|
int64 tuplesSent;
|
||||||
} CitusCopyDestReceiver;
|
} CitusCopyDestReceiver;
|
||||||
|
@ -103,7 +118,8 @@ extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryForm
|
||||||
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
TupleDesc rowDescriptor,
|
TupleDesc rowDescriptor,
|
||||||
CopyOutState rowOutputState,
|
CopyOutState rowOutputState,
|
||||||
FmgrInfo *columnOutputFunctions);
|
FmgrInfo *columnOutputFunctions,
|
||||||
|
CopyCoercionData *columnCoercionPaths);
|
||||||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
|
|
|
@ -1101,10 +1101,9 @@ INSERT INTO lineitem_hash_part
|
||||||
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
|
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
|
||||||
SELECT s FROM cte1;
|
SELECT s FROM cte1;
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
||||||
-> Subquery Scan on citus_insert_select_subquery
|
-> CTE Scan on cte1
|
||||||
CTE cte1
|
CTE cte1
|
||||||
-> Function Scan on generate_series s
|
-> Function Scan on generate_series s
|
||||||
-> CTE Scan on cte1
|
|
||||||
CTE cte1
|
CTE cte1
|
||||||
-> Limit
|
-> Limit
|
||||||
-> CTE Scan on cte1 cte1_1
|
-> CTE Scan on cte1 cte1_1
|
||||||
|
@ -1113,7 +1112,6 @@ INSERT INTO lineitem_hash_part
|
||||||
( SELECT s FROM generate_series(1,5) s) UNION
|
( SELECT s FROM generate_series(1,5) s) UNION
|
||||||
( SELECT s FROM generate_series(5,10) s);
|
( SELECT s FROM generate_series(5,10) s);
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
||||||
-> Subquery Scan on citus_insert_select_subquery
|
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: s.s
|
Group Key: s.s
|
||||||
-> Append
|
-> Append
|
||||||
|
|
|
@ -1101,10 +1101,9 @@ INSERT INTO lineitem_hash_part
|
||||||
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
|
WITH cte1 AS (SELECT * FROM cte1 LIMIT 5)
|
||||||
SELECT s FROM cte1;
|
SELECT s FROM cte1;
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
||||||
-> Subquery Scan on citus_insert_select_subquery
|
-> CTE Scan on cte1
|
||||||
CTE cte1
|
CTE cte1
|
||||||
-> Function Scan on generate_series s
|
-> Function Scan on generate_series s
|
||||||
-> CTE Scan on cte1
|
|
||||||
CTE cte1
|
CTE cte1
|
||||||
-> Limit
|
-> Limit
|
||||||
-> CTE Scan on cte1 cte1_1
|
-> CTE Scan on cte1 cte1_1
|
||||||
|
@ -1113,7 +1112,6 @@ INSERT INTO lineitem_hash_part
|
||||||
( SELECT s FROM generate_series(1,5) s) UNION
|
( SELECT s FROM generate_series(1,5) s) UNION
|
||||||
( SELECT s FROM generate_series(5,10) s);
|
( SELECT s FROM generate_series(5,10) s);
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
||||||
-> Subquery Scan on citus_insert_select_subquery
|
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: s.s
|
Group Key: s.s
|
||||||
-> Append
|
-> Append
|
||||||
|
|
|
@ -2504,6 +2504,144 @@ SELECT * FROM drop_col_table WHERE col2 = '1';
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
-- make sure casts are handled correctly
|
||||||
|
CREATE TABLE coerce_events(user_id int, time timestamp, value_1 numeric);
|
||||||
|
SELECT create_distributed_table('coerce_events', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE coerce_agg (user_id int, value_1_agg int);
|
||||||
|
SELECT create_distributed_table('coerce_agg', 'user_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, 1), (2, 2), (10, 10);
|
||||||
|
-- numeric -> int (straight function)
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
ORDER BY 2 DESC, 1 DESC
|
||||||
|
LIMIT 5;
|
||||||
|
-- int -> text
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE text;
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
10 | 10
|
||||||
|
1 | 1
|
||||||
|
1 | 1
|
||||||
|
10 | 10
|
||||||
|
2 | 2
|
||||||
|
2 | 2
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
-- int -> char(1)
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(1);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
ERROR: value too long for type character(1)
|
||||||
|
DETAIL: (null)
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
-- char(5) -> char(1)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(5);
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, 'aaaaa'), (2, 'bbbbb');
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
ERROR: value too long for type character(1)
|
||||||
|
DETAIL: (null)
|
||||||
|
-- char(1) -> char(5)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(1) USING value_1::char(1);
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(5);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
1 | a
|
||||||
|
2 | b
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
-- integer -> integer (check VALUE < 5)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer USING NULL;
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE integer USING NULL;
|
||||||
|
ALTER TABLE coerce_agg ADD CONSTRAINT small_number CHECK (value_1_agg < 5);
|
||||||
|
INSERT INTO coerce_events (user_id, value_1) VALUES (1, 1), (10, 10);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop;
|
||||||
|
ERROR: new row for relation "coerce_agg_13300060" violates check constraint "small_number_13300060"
|
||||||
|
DETAIL: Failing row contains (10, 10).
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- integer[3] -> text[3]
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer[3] USING NULL;
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, '{1,1,1}'), (2, '{2,2,2}');
|
||||||
|
ALTER TABLE coerce_agg DROP COLUMN value_1_agg;
|
||||||
|
ALTER TABLE coerce_agg ADD COLUMN value_1_agg text[3];
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
user_id | value_1_agg
|
||||||
|
---------+-------------
|
||||||
|
1 | {1,1,1}
|
||||||
|
2 | {2,2,2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- wrap in a transaction to improve performance
|
||||||
|
BEGIN;
|
||||||
|
DROP TABLE coerce_events;
|
||||||
|
DROP TABLE coerce_agg;
|
||||||
DROP TABLE drop_col_table;
|
DROP TABLE drop_col_table;
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
|
@ -2516,3 +2654,4 @@ DROP TABLE table_with_serial;
|
||||||
DROP TABLE text_table;
|
DROP TABLE text_table;
|
||||||
DROP TABLE char_table;
|
DROP TABLE char_table;
|
||||||
DROP TABLE table_with_starts_with_defaults;
|
DROP TABLE table_with_starts_with_defaults;
|
||||||
|
COMMIT;
|
||||||
|
|
|
@ -297,6 +297,7 @@ ON (f.id = f2.id)) as outer_most
|
||||||
GROUP BY
|
GROUP BY
|
||||||
outer_most.id;
|
outer_most.id;
|
||||||
|
|
||||||
|
|
||||||
-- subqueries in WHERE clause
|
-- subqueries in WHERE clause
|
||||||
INSERT INTO raw_events_second
|
INSERT INTO raw_events_second
|
||||||
(user_id)
|
(user_id)
|
||||||
|
@ -1956,6 +1957,115 @@ SELECT * FROM drop_col_table WHERE col2 = '1';
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
-- make sure casts are handled correctly
|
||||||
|
CREATE TABLE coerce_events(user_id int, time timestamp, value_1 numeric);
|
||||||
|
SELECT create_distributed_table('coerce_events', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE coerce_agg (user_id int, value_1_agg int);
|
||||||
|
SELECT create_distributed_table('coerce_agg', 'user_id');
|
||||||
|
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, 1), (2, 2), (10, 10);
|
||||||
|
|
||||||
|
-- numeric -> int (straight function)
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
ORDER BY 2 DESC, 1 DESC
|
||||||
|
LIMIT 5;
|
||||||
|
|
||||||
|
-- int -> text
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE text;
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
|
||||||
|
-- int -> char(1)
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(1);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
|
||||||
|
-- char(5) -> char(1)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(5);
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, 'aaaaa'), (2, 'bbbbb');
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
|
||||||
|
-- char(1) -> char(5)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE char(1) USING value_1::char(1);
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE char(5);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
|
||||||
|
TRUNCATE coerce_agg;
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
|
||||||
|
-- integer -> integer (check VALUE < 5)
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer USING NULL;
|
||||||
|
ALTER TABLE coerce_agg ALTER COLUMN value_1_agg TYPE integer USING NULL;
|
||||||
|
ALTER TABLE coerce_agg ADD CONSTRAINT small_number CHECK (value_1_agg < 5);
|
||||||
|
|
||||||
|
INSERT INTO coerce_events (user_id, value_1) VALUES (1, 1), (10, 10);
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop;
|
||||||
|
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
|
||||||
|
-- integer[3] -> text[3]
|
||||||
|
TRUNCATE coerce_events;
|
||||||
|
ALTER TABLE coerce_events ALTER COLUMN value_1 TYPE integer[3] USING NULL;
|
||||||
|
INSERT INTO coerce_events(user_id, value_1) VALUES (1, '{1,1,1}'), (2, '{2,2,2}');
|
||||||
|
ALTER TABLE coerce_agg DROP COLUMN value_1_agg;
|
||||||
|
ALTER TABLE coerce_agg ADD COLUMN value_1_agg text[3];
|
||||||
|
INSERT INTO coerce_agg(user_id, value_1_agg)
|
||||||
|
SELECT *
|
||||||
|
FROM (
|
||||||
|
SELECT user_id, value_1
|
||||||
|
FROM coerce_events
|
||||||
|
) AS ftop
|
||||||
|
LIMIT 5;
|
||||||
|
|
||||||
|
SELECT * FROM coerce_agg;
|
||||||
|
|
||||||
|
-- wrap in a transaction to improve performance
|
||||||
|
BEGIN;
|
||||||
|
DROP TABLE coerce_events;
|
||||||
|
DROP TABLE coerce_agg;
|
||||||
DROP TABLE drop_col_table;
|
DROP TABLE drop_col_table;
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
|
@ -1968,3 +2078,4 @@ DROP TABLE table_with_serial;
|
||||||
DROP TABLE text_table;
|
DROP TABLE text_table;
|
||||||
DROP TABLE char_table;
|
DROP TABLE char_table;
|
||||||
DROP TABLE table_with_starts_with_defaults;
|
DROP TABLE table_with_starts_with_defaults;
|
||||||
|
COMMIT;
|
||||||
|
|
Loading…
Reference in New Issue