Consider dropped columns that precede the partition column in COPY

release-6.2
Marco Slot 2017-08-21 22:38:14 +02:00
parent 44eacf14fc
commit 96eca92fc7
5 changed files with 58 additions and 37 deletions

View File

@ -755,6 +755,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
List *columnNameList = NIL; List *columnNameList = NIL;
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Var *partitionColumn = NULL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
bool stopOnFailure = true; bool stopOnFailure = true;
EState *estate = NULL; EState *estate = NULL;
@ -784,6 +786,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
slot = MakeSingleTupleTableSlot(tupleDescriptor); slot = MakeSingleTupleTableSlot(tupleDescriptor);
columnNameList = TupleDescColumnNameList(tupleDescriptor); columnNameList = TupleDescColumnNameList(tupleDescriptor);
/* determine the partition column in the tuple descriptor */
partitionColumn = PartitionColumn(distributedRelationId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* initialise per-tuple memory context */ /* initialise per-tuple memory context */
estate = CreateExecutorState(); estate = CreateExecutorState();
econtext = GetPerTupleExprContext(estate); econtext = GetPerTupleExprContext(estate);
@ -791,8 +800,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
copyDest = copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList, estate, columnNameList,
stopOnFailure); partitionColumnIndex,
estate, stopOnFailure);
/* initialise state for writing to shards, we'll open connections on demand */ /* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor); copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -299,6 +299,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
bool *columnNulls = NULL; bool *columnNulls = NULL;
int columnIndex = 0; int columnIndex = 0;
List *columnNameList = NIL; List *columnNameList = NIL;
Var *partitionColumn = NULL;
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
TupleTableSlot *tupleTableSlot = NULL; TupleTableSlot *tupleTableSlot = NULL;
EState *executorState = NULL; EState *executorState = NULL;
@ -326,6 +328,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
tupleTableSlot->tts_values = columnValues; tupleTableSlot->tts_values = columnValues;
tupleTableSlot->tts_isnull = columnNulls; tupleTableSlot->tts_isnull = columnNulls;
/* determine the partition column index in the tuple descriptor */
partitionColumn = PartitionColumn(tableId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* build the list of column names for remote COPY statements */
for (columnIndex = 0; columnIndex < columnCount; columnIndex++) for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{ {
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
@ -350,8 +360,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
} }
/* set up the destination for the COPY */ /* set up the destination for the COPY */
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
stopOnFailure); executorState, stopOnFailure);
dest = (DestReceiver *) copyDest; dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor); dest->rStartup(dest, 0, tupleDescriptor);
@ -1638,10 +1648,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
/* /*
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into * CreateCitusCopyDestReceiver creates a DestReceiver that copies into
* a distributed table. * a distributed table.
*
* The caller should provide the list of column names to use in the
* remote COPY statement, and the partition column index in the tuple
* descriptor (*not* the column name list).
*/ */
CitusCopyDestReceiver * CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState, CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
bool stopOnFailure) EState *executorState, bool stopOnFailure)
{ {
CitusCopyDestReceiver *copyDest = NULL; CitusCopyDestReceiver *copyDest = NULL;
@ -1657,6 +1671,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS
/* set up output parameters */ /* set up output parameters */
copyDest->distributedRelationId = tableId; copyDest->distributedRelationId = tableId;
copyDest->columnNameList = columnNameList; copyDest->columnNameList = columnNameList;
copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState; copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure; copyDest->stopOnFailure = stopOnFailure;
copyDest->memoryContext = CurrentMemoryContext; copyDest->memoryContext = CurrentMemoryContext;
@ -1682,15 +1697,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
char *schemaName = get_namespace_name(schemaOid); char *schemaName = get_namespace_name(schemaOid);
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
int columnIndex = 0;
List *columnNameList = copyDest->columnNameList; List *columnNameList = copyDest->columnNameList;
List *quotedColumnNameList = NIL; List *quotedColumnNameList = NIL;
ListCell *columnNameCell = NULL; ListCell *columnNameCell = NULL;
char partitionMethod = '\0'; char partitionMethod = '\0';
Var *partitionColumn = PartitionColumn(tableId, 0);
int partitionColumnIndex = -1;
DistTableCacheEntry *cacheEntry = NULL; DistTableCacheEntry *cacheEntry = NULL;
CopyStmt *copyStatement = NULL; CopyStmt *copyStatement = NULL;
@ -1774,37 +1786,23 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->columnOutputFunctions = copyDest->columnOutputFunctions =
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
/* find the partition column index in the column list */ /* ensure the column names are properly quoted in the COPY statement */
foreach(columnNameCell, columnNameList) foreach(columnNameCell, columnNameList)
{ {
char *columnName = (char *) lfirst(columnNameCell); char *columnName = (char *) lfirst(columnNameCell);
char *quotedColumnName = (char *) quote_identifier(columnName); char *quotedColumnName = (char *) quote_identifier(columnName);
/* load the column information from pg_attribute */
AttrNumber attrNumber = get_attnum(tableId, columnName);
/* check whether this is the partition column */
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
{
Assert(partitionColumnIndex == -1);
partitionColumnIndex = columnIndex;
}
columnIndex++;
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName); quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
} }
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1) if (partitionMethod != DISTRIBUTE_BY_NONE &&
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
{ {
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("the partition column of table %s should have a value", errmsg("the partition column of table %s should have a value",
quote_qualified_identifier(schemaName, relationName)))); quote_qualified_identifier(schemaName, relationName))));
} }
copyDest->partitionColumnIndex = partitionColumnIndex;
/* define the template for the COPY statement that is sent to workers */ /* define the template for the COPY statement that is sent to workers */
copyStatement = makeNode(CopyStmt); copyStatement = makeNode(CopyStmt);
copyStatement->relation = makeRangeVar(schemaName, relationName, -1); copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
@ -1870,7 +1868,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
* tables. Note that, reference tables has NULL partition column values so * tables. Note that, reference tables has NULL partition column values so
* skip the check. * skip the check.
*/ */
if (partitionColumnIndex >= 0) if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
{ {
if (columnNulls[partitionColumnIndex]) if (columnNulls[partitionColumnIndex])
{ {

View File

@ -20,6 +20,9 @@
#include "tcop/dest.h" #include "tcop/dest.h"
#define INVALID_PARTITION_COLUMN_INDEX -1
/* /*
* A smaller version of copy.c's CopyStateData, trimmed to the elements * A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary to copy out results. While it'd be a bit nicer to share code, * necessary to copy out results. While it'd be a bit nicer to share code,
@ -90,6 +93,7 @@ typedef struct CitusCopyDestReceiver
/* function declarations for copying into a distributed table */ /* function declarations for copying into a distributed table */
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList, List *columnNameList,
int partitionColumnIndex,
EState *executorState, EState *executorState,
bool stopOnFailure); bool stopOnFailure);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);

View File

@ -486,21 +486,28 @@ END;
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello'); INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2; ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col1'); SELECT create_distributed_table('data_load_test', 'col3');
NOTICE: Copying data from local table... NOTICE: Copying data from local table...
create_distributed_table create_distributed_table
-------------------------- --------------------------
(1 row) (1 row)
SELECT * FROM data_load_test; SELECT * FROM data_load_test ORDER BY col2;
col1 | col3 | CoL4") col2 | col3 | CoL4")
------+-------+-------- -------+-------+--------
132 | world | hello | world |
243 | hello | world | hello |
(2 rows) (2 rows)
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
col2 | col3 | CoL4")
-------+-------+--------
hello | world |
(1 row)
DROP TABLE data_load_test; DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default; SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4; SET citus.shard_count to 4;

View File

@ -263,9 +263,11 @@ END;
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello'); INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2; ALTER TABLE data_load_test DROP COLUMN col1;
SELECT create_distributed_table('data_load_test', 'col1'); SELECT create_distributed_table('data_load_test', 'col3');
SELECT * FROM data_load_test; SELECT * FROM data_load_test ORDER BY col2;
-- make sure the tuple went to the right shard
SELECT * FROM data_load_test WHERE col3 = 'world';
DROP TABLE data_load_test; DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default; SET citus.shard_replication_factor TO default;