From cf375d6a66c63693b46fb1686bfe8101f232d61a Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 21 Aug 2017 22:38:14 +0200 Subject: [PATCH] Consider dropped columns that precede the partition column in COPY --- .../commands/create_distributed_table.c | 14 +++++- src/backend/distributed/commands/multi_copy.c | 48 +++++++++---------- .../executor/insert_select_executor.c | 19 +++++++- src/include/distributed/multi_copy.h | 4 ++ .../regress/expected/multi_create_table.out | 21 +++++--- .../regress/expected/multi_insert_select.out | 36 +++++++++----- src/test/regress/input/multi_copy.source | 20 ++++++++ src/test/regress/output/multi_copy.source | 28 +++++++++++ src/test/regress/sql/multi_create_table.sql | 8 ++-- src/test/regress/sql/multi_insert_select.sql | 13 +++-- 10 files changed, 158 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 48b5b069d..247e41277 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1149,6 +1149,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId) List *columnNameList = NIL; Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; + Var *partitionColumn = NULL; + int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; bool stopOnFailure = true; EState *estate = NULL; @@ -1189,6 +1191,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId) slot = MakeSingleTupleTableSlot(tupleDescriptor); columnNameList = TupleDescColumnNameList(tupleDescriptor); + /* determine the partition column in the tuple descriptor */ + partitionColumn = PartitionColumn(distributedRelationId, 0); + if (partitionColumn != NULL) + { + partitionColumnIndex = partitionColumn->varattno - 1; + } + /* initialise per-tuple memory context */ estate = CreateExecutorState(); econtext = GetPerTupleExprContext(estate); @@ -1196,8 +1205,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId) copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, - columnNameList, estate, - stopOnFailure); + columnNameList, + partitionColumnIndex, + estate, stopOnFailure); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index c84cdcd44..e1dd6dc13 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -295,6 +295,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) bool *columnNulls = NULL; int columnIndex = 0; List *columnNameList = NIL; + Var *partitionColumn = NULL; + int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; TupleTableSlot *tupleTableSlot = NULL; EState *executorState = NULL; @@ -322,6 +324,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) tupleTableSlot->tts_values = columnValues; tupleTableSlot->tts_isnull = columnNulls; + /* determine the partition column index in the tuple descriptor */ + partitionColumn = PartitionColumn(tableId, 0); + if (partitionColumn != NULL) + { + partitionColumnIndex = partitionColumn->varattno - 1; + } + + /* build the list of column names for remote COPY statements */ for (columnIndex = 0; columnIndex < columnCount; columnIndex++) { Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; @@ -346,8 +356,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } /* set up the destination for the COPY */ - copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, - stopOnFailure); + copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, + executorState, stopOnFailure); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1714,10 +1724,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) /* * CreateCitusCopyDestReceiver creates a DestReceiver that copies into * a distributed table. + * + * The caller should provide the list of column names to use in the + * remote COPY statement, and the partition column index in the tuple + * descriptor (*not* the column name list). */ CitusCopyDestReceiver * -CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState, - bool stopOnFailure) +CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, + EState *executorState, bool stopOnFailure) { CitusCopyDestReceiver *copyDest = NULL; @@ -1733,6 +1747,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS /* set up output parameters */ copyDest->distributedRelationId = tableId; copyDest->columnNameList = columnNameList; + copyDest->partitionColumnIndex = partitionColumnIndex; copyDest->executorState = executorState; copyDest->stopOnFailure = stopOnFailure; copyDest->memoryContext = CurrentMemoryContext; @@ -1758,15 +1773,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, char *schemaName = get_namespace_name(schemaOid); Relation distributedRelation = NULL; - int columnIndex = 0; List *columnNameList = copyDest->columnNameList; List *quotedColumnNameList = NIL; ListCell *columnNameCell = NULL; char partitionMethod = '\0'; - Var *partitionColumn = PartitionColumn(tableId, 0); - int partitionColumnIndex = -1; DistTableCacheEntry *cacheEntry = NULL; CopyStmt *copyStatement = NULL; @@ -1853,37 +1865,23 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); - /* find the partition column index in the column list */ + /* ensure the column names are properly quoted in the COPY statement */ foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); char *quotedColumnName = (char *) quote_identifier(columnName); - /* load the column information from pg_attribute */ - AttrNumber attrNumber = get_attnum(tableId, columnName); - - /* check whether this is the partition column */ - if (partitionColumn != NULL && attrNumber == partitionColumn->varattno) - { - Assert(partitionColumnIndex == -1); - - partitionColumnIndex = columnIndex; - } - - columnIndex++; - quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName); } - if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1) + if (partitionMethod != DISTRIBUTE_BY_NONE && + copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), errmsg("the partition column of table %s should have a value", quote_qualified_identifier(schemaName, relationName)))); } - copyDest->partitionColumnIndex = partitionColumnIndex; - /* define the template for the COPY statement that is sent to workers */ copyStatement = makeNode(CopyStmt); copyStatement->relation = makeRangeVar(schemaName, relationName, -1); @@ -1945,7 +1943,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) * tables. Note that, reference tables has NULL partition column values so * skip the check. */ - if (partitionColumnIndex >= 0) + if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX) { if (columnNulls[partitionColumnIndex]) { diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index b4368076c..ff331a682 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -99,6 +99,8 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, List *columnNameList = NIL; bool stopOnFailure = false; char partitionMethod = 0; + Var *partitionColumn = NULL; + int partitionColumnIndex = -1; CitusCopyDestReceiver *copyDest = NULL; @@ -108,17 +110,32 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList, stopOnFailure = true; } + partitionColumn = PartitionColumn(targetRelationId, 0); + /* build the list of column names for the COPY statement */ foreach(insertTargetCell, insertTargetList) { TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell); + char *columnName = insertTargetEntry->resname; + + /* load the column information from pg_attribute */ + AttrNumber attrNumber = get_attnum(targetRelationId, columnName); + + /* check whether this is the partition column */ + if (partitionColumn != NULL && attrNumber == partitionColumn->varattno) + { + Assert(partitionColumnIndex == -1); + + partitionColumnIndex = list_length(columnNameList); + } columnNameList = lappend(columnNameList, insertTargetEntry->resname); } /* set up a DestReceiver that copies into the distributed table */ copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, - executorState, stopOnFailure); + partitionColumnIndex, executorState, + stopOnFailure); ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 07751d598..a40f9e3e7 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -20,6 +20,9 @@ #include "tcop/dest.h" +#define INVALID_PARTITION_COLUMN_INDEX -1 + + /* * A smaller version of copy.c's CopyStateData, trimmed to the elements * necessary to copy out results. While it'd be a bit nicer to share code, @@ -93,6 +96,7 @@ typedef struct CitusCopyDestReceiver /* function declarations for copying into a distributed table */ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, + int partitionColumnIndex, EState *executorState, bool stopOnFailure); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 4d714b46a..bba39e136 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -533,21 +533,28 @@ END; CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (243, 'world', 'hello'); -ALTER TABLE data_load_test DROP COLUMN col2; -SELECT create_distributed_table('data_load_test', 'col1'); +ALTER TABLE data_load_test DROP COLUMN col1; +SELECT create_distributed_table('data_load_test', 'col3'); NOTICE: Copying data from local table... create_distributed_table -------------------------- (1 row) -SELECT * FROM data_load_test; - col1 | col3 | CoL4") -------+-------+-------- - 132 | world | - 243 | hello | +SELECT * FROM data_load_test ORDER BY col2; + col2 | col3 | CoL4") +-------+-------+-------- + hello | world | + world | hello | (2 rows) +-- make sure the tuple went to the right shard +SELECT * FROM data_load_test WHERE col3 = 'world'; + col2 | col3 | CoL4") +-------+-------+-------- + hello | world | +(1 row) + DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 16d3c22e7..6a95b5e47 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -2401,21 +2401,35 @@ SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; -- Drop the view now, because the column we are about to drop depends on it DROP VIEW test_view; -- Make sure we handle dropped columns correctly -TRUNCATE raw_events_first; -ALTER TABLE raw_events_first DROP COLUMN value_1; -INSERT INTO raw_events_first (user_id, value_4) +CREATE TABLE drop_col_table (col1 text, col2 text, col3 text); +SELECT create_distributed_table('drop_col_table', 'col2'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE drop_col_table DROP COLUMN col1; +INSERT INTO drop_col_table (col3, col2) SELECT value_4, user_id FROM raw_events_second LIMIT 5; -SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; - user_id | value_4 ----------+--------- - 3 | 1 - 6 | 2 - 9 | 3 - 12 | 4 - 15 | 5 +SELECT * FROM drop_col_table ORDER BY col2, col3; + col2 | col3 +------+------ + 1 | 3 + 2 | 6 + 3 | 9 + 4 | 12 + 5 | 15 (5 rows) +-- make sure the tuple went to the right shard +SELECT * FROM drop_col_table WHERE col2 = '1'; + col2 | col3 +------+------ + 1 | 3 +(1 row) + RESET client_min_messages; +DROP TABLE drop_col_table; DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE; diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 13714446e..97a0ba017 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -781,6 +781,26 @@ SELECT create_distributed_table('tt1','id'); DROP TABLE tt1; END; +-- Test dropping a column in front of the partition column +CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int); +SELECT create_distributed_table('drop_copy_test_table','col3'); + +ALTER TABLE drop_copy_test_table drop column col1; +COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV; +,1, +,2, +\. +SELECT * FROM drop_copy_test_table WHERE col3 = 1; + +ALTER TABLE drop_copy_test_table drop column col4; +COPY drop_copy_test_table (col2,col3) from STDIN with CSV; +,1 +,2 +\. +SELECT * FROM drop_copy_test_table WHERE col3 = 1; + +DROP TABLE drop_copy_test_table; + -- There should be no "tt1" shard on the worker nodes \c - - - :worker_1_port SELECT relname FROM pg_class WHERE relname LIKE 'tt1%'; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 498b7dbed..c961ddb58 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -1050,6 +1050,34 @@ SELECT create_distributed_table('tt1','id'); \copy tt1 from STDIN; DROP TABLE tt1; END; +-- Test dropping a column in front of the partition column +CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int); +SELECT create_distributed_table('drop_copy_test_table','col3'); + create_distributed_table +-------------------------- + +(1 row) + +ALTER TABLE drop_copy_test_table drop column col1; +NOTICE: using one-phase commit for distributed DDL commands +HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' +COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV; +SELECT * FROM drop_copy_test_table WHERE col3 = 1; + col2 | col3 | col4 +------+------+------ + | 1 | +(1 row) + +ALTER TABLE drop_copy_test_table drop column col4; +COPY drop_copy_test_table (col2,col3) from STDIN with CSV; +SELECT * FROM drop_copy_test_table WHERE col3 = 1; + col2 | col3 +------+------ + | 1 + | 1 +(2 rows) + +DROP TABLE drop_copy_test_table; -- There should be no "tt1" shard on the worker nodes \c - - - :worker_1_port SELECT relname FROM pg_class WHERE relname LIKE 'tt1%'; diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index e2e04da28..531f337ea 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -284,9 +284,11 @@ END; CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); INSERT INTO data_load_test VALUES (243, 'world', 'hello'); -ALTER TABLE data_load_test DROP COLUMN col2; -SELECT create_distributed_table('data_load_test', 'col1'); -SELECT * FROM data_load_test; +ALTER TABLE data_load_test DROP COLUMN col1; +SELECT create_distributed_table('data_load_test', 'col3'); +SELECT * FROM data_load_test ORDER BY col2; +-- make sure the tuple went to the right shard +SELECT * FROM data_load_test WHERE col3 = 'world'; DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql index d1d54d739..e2fb272db 100644 --- a/src/test/regress/sql/multi_insert_select.sql +++ b/src/test/regress/sql/multi_insert_select.sql @@ -1903,17 +1903,22 @@ SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4; DROP VIEW test_view; -- Make sure we handle dropped columns correctly -TRUNCATE raw_events_first; +CREATE TABLE drop_col_table (col1 text, col2 text, col3 text); +SELECT create_distributed_table('drop_col_table', 'col2'); -ALTER TABLE raw_events_first DROP COLUMN value_1; +ALTER TABLE drop_col_table DROP COLUMN col1; -INSERT INTO raw_events_first (user_id, value_4) +INSERT INTO drop_col_table (col3, col2) SELECT value_4, user_id FROM raw_events_second LIMIT 5; -SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id; +SELECT * FROM drop_col_table ORDER BY col2, col3; + +-- make sure the tuple went to the right shard +SELECT * FROM drop_col_table WHERE col2 = '1'; RESET client_min_messages; +DROP TABLE drop_col_table; DROP TABLE raw_table; DROP TABLE summary_table; DROP TABLE raw_events_first CASCADE;