From 96eca92fc7fc2e69781de7209fb4532ac4aa7653 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 21 Aug 2017 22:38:14 +0200 Subject: [PATCH] Consider dropped columns that precede the partition column in COPY --- .../commands/create_distributed_table.c | 14 +++++- src/backend/distributed/commands/multi_copy.c | 48 +++++++++---------- src/include/distributed/multi_copy.h | 4 ++ .../regress/expected/multi_create_table.out | 21 +++++--- src/test/regress/sql/multi_create_table.sql | 8 ++-- 5 files changed, 58 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 103fedc4f..2c80e603b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -755,6 +755,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId) List *columnNameList = NIL; Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; + Var *partitionColumn = NULL; + int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; bool stopOnFailure = true; EState *estate = NULL; @@ -784,6 +786,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId) slot = MakeSingleTupleTableSlot(tupleDescriptor); columnNameList = TupleDescColumnNameList(tupleDescriptor); + /* determine the partition column in the tuple descriptor */ + partitionColumn = PartitionColumn(distributedRelationId, 0); + if (partitionColumn != NULL) + { + partitionColumnIndex = partitionColumn->varattno - 1; + } + /* initialise per-tuple memory context */ estate = CreateExecutorState(); econtext = GetPerTupleExprContext(estate); @@ -791,8 +800,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId) copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, - columnNameList, estate, - stopOnFailure); + columnNameList, + partitionColumnIndex, + estate, stopOnFailure); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index de8758512..0128c1695 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -299,6 +299,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) bool *columnNulls = NULL; int columnIndex = 0; List *columnNameList = NIL; + Var *partitionColumn = NULL; + int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; TupleTableSlot *tupleTableSlot = NULL; EState *executorState = NULL; @@ -326,6 +328,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) tupleTableSlot->tts_values = columnValues; tupleTableSlot->tts_isnull = columnNulls; + /* determine the partition column index in the tuple descriptor */ + partitionColumn = PartitionColumn(tableId, 0); + if (partitionColumn != NULL) + { + partitionColumnIndex = partitionColumn->varattno - 1; + } + + /* build the list of column names for remote COPY statements */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; @@ -350,8 +360,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } /* set up the destination for the COPY */ - copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, - stopOnFailure); + copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, + executorState, stopOnFailure); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1638,10 +1648,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) /* * CreateCitusCopyDestReceiver creates a DestReceiver that copies into * a distributed table. + * + * The caller should provide the list of column names to use in the + * remote COPY statement, and the partition column index in the tuple + * descriptor (*not* the column name list). */ CitusCopyDestReceiver * -CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState, - bool stopOnFailure) +CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, + EState *executorState, bool stopOnFailure) { CitusCopyDestReceiver *copyDest = NULL; @@ -1657,6 +1671,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS /* set up output parameters */ copyDest->distributedRelationId = tableId; copyDest->columnNameList = columnNameList; + copyDest->partitionColumnIndex = partitionColumnIndex; copyDest->executorState = executorState; copyDest->stopOnFailure = stopOnFailure; copyDest->memoryContext = CurrentMemoryContext; @@ -1682,15 +1697,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, char *schemaName = get_namespace_name(schemaOid); Relation distributedRelation = NULL; - int columnIndex = 0; List *columnNameList = copyDest->columnNameList; List *quotedColumnNameList = NIL; ListCell *columnNameCell = NULL; char partitionMethod = '\0'; - Var *partitionColumn = PartitionColumn(tableId, 0); - int partitionColumnIndex = -1; DistTableCacheEntry *cacheEntry = NULL; CopyStmt *copyStatement = NULL; @@ -1774,37 +1786,23 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - /* find the partition column index in the column list */ + /* ensure the column names are properly quoted in the COPY statement */ foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); char *quotedColumnName = (char *) quote_identifier(columnName); - /* load the column information from pg_attribute */ - AttrNumber attrNumber = get_attnum(tableId, columnName); - - /* check whether this is the partition column */ - if (partitionColumn != NULL && attrNumber == partitionColumn->varattno) - { - Assert(partitionColumnIndex == -1); - - partitionColumnIndex = columnIndex; - } - - columnIndex++; - quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName); } - if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1) + if (partitionMethod != DISTRIBUTE_BY_NONE && + copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("the partition column of table %s should have a value", quote_qualified_identifier(schemaName, relationName)))); } - copyDest->partitionColumnIndex = partitionColumnIndex; - /* define the template for the COPY statement that is sent to workers */ copyStatement = makeNode(CopyStmt); copyStatement->relation = makeRangeVar(schemaName, relationName, -1); @@ -1870,7 +1868,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * tables. Note that, reference tables has NULL partition column values so * skip the check. */ - if (partitionColumnIndex >= 0) + if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX) { if (columnNulls[partitionColumnIndex]) { diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 1b6fcc412..93e4d997c 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -20,6 +20,9 @@ #include "tcop/dest.h" +#define INVALID_PARTITION_COLUMN_INDEX -1 + + /* * A smaller version of copy.c's CopyStateData, trimmed to the elements * necessary to copy out results. While it'd be a bit nicer to share code, @@ -90,6 +93,7 @@ typedef struct CitusCopyDestReceiver /* function declarations for copying into a distributed table */ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, + int partitionColumnIndex, EState *executorState, bool stopOnFailure); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index dd55b039c..026ccd48f 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -486,21 +486,28 @@ END; CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (243, 'world', 'hello'); -ALTER TABLE data_load_test DROP COLUMN col2; -SELECT create_distributed_table('data_load_test', 'col1'); +ALTER TABLE data_load_test DROP COLUMN col1; +SELECT create_distributed_table('data_load_test', 'col3'); NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) -SELECT * FROM data_load_test; - col1 | col3 | CoL4") -------+-------+-------- - 132 | world | - 243 | hello | +SELECT * FROM data_load_test ORDER BY col2; + col2 | col3 | CoL4") +-------+-------+-------- + hello | world | + world | hello | (2 rows) +-- make sure the tuple went to the right shard +SELECT * FROM data_load_test WHERE col3 = 'world'; + col2 | col3 | CoL4") +-------+-------+-------- + hello | world | +(1 row) + DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 5025b90fa..2aa32e0ed 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -263,9 +263,11 @@ END; CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (243, 'world', 'hello'); -ALTER TABLE data_load_test DROP COLUMN col2; -SELECT create_distributed_table('data_load_test', 'col1'); -SELECT * FROM data_load_test; +ALTER TABLE data_load_test DROP COLUMN col1; +SELECT create_distributed_table('data_load_test', 'col3'); +SELECT * FROM data_load_test ORDER BY col2; +-- make sure the tuple went to the right shard +SELECT * FROM data_load_test WHERE col3 = 'world'; DROP TABLE data_load_test; SET citus.shard_replication_factor TO default;