mirror of https://github.com/citusdata/citus.git
Merge pull request #1606 from citusdata/fix_copy_dropped_columns
Consider dropped columns that precede the partition column in COPYpull/1608/head
commit
df6d56c1ed
|
@ -1149,6 +1149,8 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||||
List *columnNameList = NIL;
|
List *columnNameList = NIL;
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
||||||
bool stopOnFailure = true;
|
bool stopOnFailure = true;
|
||||||
|
|
||||||
EState *estate = NULL;
|
EState *estate = NULL;
|
||||||
|
@ -1189,6 +1191,13 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||||
slot = MakeSingleTupleTableSlot(tupleDescriptor);
|
slot = MakeSingleTupleTableSlot(tupleDescriptor);
|
||||||
columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
||||||
|
|
||||||
|
/* determine the partition column in the tuple descriptor */
|
||||||
|
partitionColumn = PartitionColumn(distributedRelationId, 0);
|
||||||
|
if (partitionColumn != NULL)
|
||||||
|
{
|
||||||
|
partitionColumnIndex = partitionColumn->varattno - 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* initialise per-tuple memory context */
|
/* initialise per-tuple memory context */
|
||||||
estate = CreateExecutorState();
|
estate = CreateExecutorState();
|
||||||
econtext = GetPerTupleExprContext(estate);
|
econtext = GetPerTupleExprContext(estate);
|
||||||
|
@ -1196,8 +1205,9 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||||
|
|
||||||
copyDest =
|
copyDest =
|
||||||
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
||||||
columnNameList, estate,
|
columnNameList,
|
||||||
stopOnFailure);
|
partitionColumnIndex,
|
||||||
|
estate, stopOnFailure);
|
||||||
|
|
||||||
/* initialise state for writing to shards, we'll open connections on demand */
|
/* initialise state for writing to shards, we'll open connections on demand */
|
||||||
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
||||||
|
|
|
@ -295,6 +295,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
bool *columnNulls = NULL;
|
bool *columnNulls = NULL;
|
||||||
int columnIndex = 0;
|
int columnIndex = 0;
|
||||||
List *columnNameList = NIL;
|
List *columnNameList = NIL;
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
||||||
TupleTableSlot *tupleTableSlot = NULL;
|
TupleTableSlot *tupleTableSlot = NULL;
|
||||||
|
|
||||||
EState *executorState = NULL;
|
EState *executorState = NULL;
|
||||||
|
@ -322,6 +324,14 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
tupleTableSlot->tts_values = columnValues;
|
tupleTableSlot->tts_values = columnValues;
|
||||||
tupleTableSlot->tts_isnull = columnNulls;
|
tupleTableSlot->tts_isnull = columnNulls;
|
||||||
|
|
||||||
|
/* determine the partition column index in the tuple descriptor */
|
||||||
|
partitionColumn = PartitionColumn(tableId, 0);
|
||||||
|
if (partitionColumn != NULL)
|
||||||
|
{
|
||||||
|
partitionColumnIndex = partitionColumn->varattno - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* build the list of column names for remote COPY statements */
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
|
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
|
||||||
|
@ -346,8 +356,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* set up the destination for the COPY */
|
/* set up the destination for the COPY */
|
||||||
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState,
|
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
|
||||||
stopOnFailure);
|
executorState, stopOnFailure);
|
||||||
dest = (DestReceiver *) copyDest;
|
dest = (DestReceiver *) copyDest;
|
||||||
dest->rStartup(dest, 0, tupleDescriptor);
|
dest->rStartup(dest, 0, tupleDescriptor);
|
||||||
|
|
||||||
|
@ -1714,10 +1724,14 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
||||||
/*
|
/*
|
||||||
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into
|
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into
|
||||||
* a distributed table.
|
* a distributed table.
|
||||||
|
*
|
||||||
|
* The caller should provide the list of column names to use in the
|
||||||
|
* remote COPY statement, and the partition column index in the tuple
|
||||||
|
* descriptor (*not* the column name list).
|
||||||
*/
|
*/
|
||||||
CitusCopyDestReceiver *
|
CitusCopyDestReceiver *
|
||||||
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState,
|
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
|
||||||
bool stopOnFailure)
|
EState *executorState, bool stopOnFailure)
|
||||||
{
|
{
|
||||||
CitusCopyDestReceiver *copyDest = NULL;
|
CitusCopyDestReceiver *copyDest = NULL;
|
||||||
|
|
||||||
|
@ -1733,6 +1747,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS
|
||||||
/* set up output parameters */
|
/* set up output parameters */
|
||||||
copyDest->distributedRelationId = tableId;
|
copyDest->distributedRelationId = tableId;
|
||||||
copyDest->columnNameList = columnNameList;
|
copyDest->columnNameList = columnNameList;
|
||||||
|
copyDest->partitionColumnIndex = partitionColumnIndex;
|
||||||
copyDest->executorState = executorState;
|
copyDest->executorState = executorState;
|
||||||
copyDest->stopOnFailure = stopOnFailure;
|
copyDest->stopOnFailure = stopOnFailure;
|
||||||
copyDest->memoryContext = CurrentMemoryContext;
|
copyDest->memoryContext = CurrentMemoryContext;
|
||||||
|
@ -1758,15 +1773,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
char *schemaName = get_namespace_name(schemaOid);
|
char *schemaName = get_namespace_name(schemaOid);
|
||||||
|
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
int columnIndex = 0;
|
|
||||||
List *columnNameList = copyDest->columnNameList;
|
List *columnNameList = copyDest->columnNameList;
|
||||||
List *quotedColumnNameList = NIL;
|
List *quotedColumnNameList = NIL;
|
||||||
|
|
||||||
ListCell *columnNameCell = NULL;
|
ListCell *columnNameCell = NULL;
|
||||||
|
|
||||||
char partitionMethod = '\0';
|
char partitionMethod = '\0';
|
||||||
Var *partitionColumn = PartitionColumn(tableId, 0);
|
|
||||||
int partitionColumnIndex = -1;
|
|
||||||
DistTableCacheEntry *cacheEntry = NULL;
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
|
|
||||||
CopyStmt *copyStatement = NULL;
|
CopyStmt *copyStatement = NULL;
|
||||||
|
@ -1853,37 +1865,23 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
copyDest->columnOutputFunctions =
|
copyDest->columnOutputFunctions =
|
||||||
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
|
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
|
||||||
|
|
||||||
/* find the partition column index in the column list */
|
/* ensure the column names are properly quoted in the COPY statement */
|
||||||
foreach(columnNameCell, columnNameList)
|
foreach(columnNameCell, columnNameList)
|
||||||
{
|
{
|
||||||
char *columnName = (char *) lfirst(columnNameCell);
|
char *columnName = (char *) lfirst(columnNameCell);
|
||||||
char *quotedColumnName = (char *) quote_identifier(columnName);
|
char *quotedColumnName = (char *) quote_identifier(columnName);
|
||||||
|
|
||||||
/* load the column information from pg_attribute */
|
|
||||||
AttrNumber attrNumber = get_attnum(tableId, columnName);
|
|
||||||
|
|
||||||
/* check whether this is the partition column */
|
|
||||||
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
|
||||||
{
|
|
||||||
Assert(partitionColumnIndex == -1);
|
|
||||||
|
|
||||||
partitionColumnIndex = columnIndex;
|
|
||||||
}
|
|
||||||
|
|
||||||
columnIndex++;
|
|
||||||
|
|
||||||
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
|
quotedColumnNameList = lappend(quotedColumnNameList, quotedColumnName);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1)
|
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||||
|
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
errmsg("the partition column of table %s should have a value",
|
errmsg("the partition column of table %s should have a value",
|
||||||
quote_qualified_identifier(schemaName, relationName))));
|
quote_qualified_identifier(schemaName, relationName))));
|
||||||
}
|
}
|
||||||
|
|
||||||
copyDest->partitionColumnIndex = partitionColumnIndex;
|
|
||||||
|
|
||||||
/* define the template for the COPY statement that is sent to workers */
|
/* define the template for the COPY statement that is sent to workers */
|
||||||
copyStatement = makeNode(CopyStmt);
|
copyStatement = makeNode(CopyStmt);
|
||||||
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
|
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
|
||||||
|
@ -1945,7 +1943,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
* tables. Note that, reference tables has NULL partition column values so
|
* tables. Note that, reference tables has NULL partition column values so
|
||||||
* skip the check.
|
* skip the check.
|
||||||
*/
|
*/
|
||||||
if (partitionColumnIndex >= 0)
|
if (partitionColumnIndex != INVALID_PARTITION_COLUMN_INDEX)
|
||||||
{
|
{
|
||||||
if (columnNulls[partitionColumnIndex])
|
if (columnNulls[partitionColumnIndex])
|
||||||
{
|
{
|
||||||
|
|
|
@ -99,6 +99,8 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
List *columnNameList = NIL;
|
List *columnNameList = NIL;
|
||||||
bool stopOnFailure = false;
|
bool stopOnFailure = false;
|
||||||
char partitionMethod = 0;
|
char partitionMethod = 0;
|
||||||
|
Var *partitionColumn = NULL;
|
||||||
|
int partitionColumnIndex = -1;
|
||||||
|
|
||||||
CitusCopyDestReceiver *copyDest = NULL;
|
CitusCopyDestReceiver *copyDest = NULL;
|
||||||
|
|
||||||
|
@ -108,17 +110,32 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
stopOnFailure = true;
|
stopOnFailure = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
partitionColumn = PartitionColumn(targetRelationId, 0);
|
||||||
|
|
||||||
/* build the list of column names for the COPY statement */
|
/* build the list of column names for the COPY statement */
|
||||||
foreach(insertTargetCell, insertTargetList)
|
foreach(insertTargetCell, insertTargetList)
|
||||||
{
|
{
|
||||||
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
|
TargetEntry *insertTargetEntry = (TargetEntry *) lfirst(insertTargetCell);
|
||||||
|
char *columnName = insertTargetEntry->resname;
|
||||||
|
|
||||||
|
/* load the column information from pg_attribute */
|
||||||
|
AttrNumber attrNumber = get_attnum(targetRelationId, columnName);
|
||||||
|
|
||||||
|
/* check whether this is the partition column */
|
||||||
|
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
||||||
|
{
|
||||||
|
Assert(partitionColumnIndex == -1);
|
||||||
|
|
||||||
|
partitionColumnIndex = list_length(columnNameList);
|
||||||
|
}
|
||||||
|
|
||||||
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
|
columnNameList = lappend(columnNameList, insertTargetEntry->resname);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* set up a DestReceiver that copies into the distributed table */
|
/* set up a DestReceiver that copies into the distributed table */
|
||||||
copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList,
|
copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList,
|
||||||
executorState, stopOnFailure);
|
partitionColumnIndex, executorState,
|
||||||
|
stopOnFailure);
|
||||||
|
|
||||||
ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
|
ExecuteIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest);
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,9 @@
|
||||||
#include "tcop/dest.h"
|
#include "tcop/dest.h"
|
||||||
|
|
||||||
|
|
||||||
|
#define INVALID_PARTITION_COLUMN_INDEX -1
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||||
* necessary to copy out results. While it'd be a bit nicer to share code,
|
* necessary to copy out results. While it'd be a bit nicer to share code,
|
||||||
|
@ -93,6 +96,7 @@ typedef struct CitusCopyDestReceiver
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
List *columnNameList,
|
List *columnNameList,
|
||||||
|
int partitionColumnIndex,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
bool stopOnFailure);
|
bool stopOnFailure);
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
|
|
|
@ -533,21 +533,28 @@ END;
|
||||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
||||||
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||||
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||||
ALTER TABLE data_load_test DROP COLUMN col2;
|
ALTER TABLE data_load_test DROP COLUMN col1;
|
||||||
SELECT create_distributed_table('data_load_test', 'col1');
|
SELECT create_distributed_table('data_load_test', 'col3');
|
||||||
NOTICE: Copying data from local table...
|
NOTICE: Copying data from local table...
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
--------------------------
|
--------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM data_load_test;
|
SELECT * FROM data_load_test ORDER BY col2;
|
||||||
col1 | col3 | CoL4")
|
col2 | col3 | CoL4")
|
||||||
------+-------+--------
|
-------+-------+--------
|
||||||
132 | world |
|
hello | world |
|
||||||
243 | hello |
|
world | hello |
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
-- make sure the tuple went to the right shard
|
||||||
|
SELECT * FROM data_load_test WHERE col3 = 'world';
|
||||||
|
col2 | col3 | CoL4")
|
||||||
|
-------+-------+--------
|
||||||
|
hello | world |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE data_load_test;
|
DROP TABLE data_load_test;
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
SET citus.shard_count to 4;
|
SET citus.shard_count to 4;
|
||||||
|
|
|
@ -2401,21 +2401,35 @@ SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4;
|
||||||
-- Drop the view now, because the column we are about to drop depends on it
|
-- Drop the view now, because the column we are about to drop depends on it
|
||||||
DROP VIEW test_view;
|
DROP VIEW test_view;
|
||||||
-- Make sure we handle dropped columns correctly
|
-- Make sure we handle dropped columns correctly
|
||||||
TRUNCATE raw_events_first;
|
CREATE TABLE drop_col_table (col1 text, col2 text, col3 text);
|
||||||
ALTER TABLE raw_events_first DROP COLUMN value_1;
|
SELECT create_distributed_table('drop_col_table', 'col2');
|
||||||
INSERT INTO raw_events_first (user_id, value_4)
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE drop_col_table DROP COLUMN col1;
|
||||||
|
INSERT INTO drop_col_table (col3, col2)
|
||||||
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
|
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
|
||||||
SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id;
|
SELECT * FROM drop_col_table ORDER BY col2, col3;
|
||||||
user_id | value_4
|
col2 | col3
|
||||||
---------+---------
|
------+------
|
||||||
3 | 1
|
1 | 3
|
||||||
6 | 2
|
2 | 6
|
||||||
9 | 3
|
3 | 9
|
||||||
12 | 4
|
4 | 12
|
||||||
15 | 5
|
5 | 15
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
-- make sure the tuple went to the right shard
|
||||||
|
SELECT * FROM drop_col_table WHERE col2 = '1';
|
||||||
|
col2 | col3
|
||||||
|
------+------
|
||||||
|
1 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
DROP TABLE drop_col_table;
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
DROP TABLE raw_events_first CASCADE;
|
DROP TABLE raw_events_first CASCADE;
|
||||||
|
|
|
@ -781,6 +781,26 @@ SELECT create_distributed_table('tt1','id');
|
||||||
DROP TABLE tt1;
|
DROP TABLE tt1;
|
||||||
END;
|
END;
|
||||||
|
|
||||||
|
-- Test dropping a column in front of the partition column
|
||||||
|
CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int);
|
||||||
|
SELECT create_distributed_table('drop_copy_test_table','col3');
|
||||||
|
|
||||||
|
ALTER TABLE drop_copy_test_table drop column col1;
|
||||||
|
COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV;
|
||||||
|
,1,
|
||||||
|
,2,
|
||||||
|
\.
|
||||||
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
|
|
||||||
|
ALTER TABLE drop_copy_test_table drop column col4;
|
||||||
|
COPY drop_copy_test_table (col2,col3) from STDIN with CSV;
|
||||||
|
,1
|
||||||
|
,2
|
||||||
|
\.
|
||||||
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
|
|
||||||
|
DROP TABLE drop_copy_test_table;
|
||||||
|
|
||||||
-- There should be no "tt1" shard on the worker nodes
|
-- There should be no "tt1" shard on the worker nodes
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
||||||
|
|
|
@ -1050,6 +1050,34 @@ SELECT create_distributed_table('tt1','id');
|
||||||
\copy tt1 from STDIN;
|
\copy tt1 from STDIN;
|
||||||
DROP TABLE tt1;
|
DROP TABLE tt1;
|
||||||
END;
|
END;
|
||||||
|
-- Test dropping a column in front of the partition column
|
||||||
|
CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int);
|
||||||
|
SELECT create_distributed_table('drop_copy_test_table','col3');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE drop_copy_test_table drop column col1;
|
||||||
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
|
COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV;
|
||||||
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
|
col2 | col3 | col4
|
||||||
|
------+------+------
|
||||||
|
| 1 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER TABLE drop_copy_test_table drop column col4;
|
||||||
|
COPY drop_copy_test_table (col2,col3) from STDIN with CSV;
|
||||||
|
SELECT * FROM drop_copy_test_table WHERE col3 = 1;
|
||||||
|
col2 | col3
|
||||||
|
------+------
|
||||||
|
| 1
|
||||||
|
| 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE drop_copy_test_table;
|
||||||
-- There should be no "tt1" shard on the worker nodes
|
-- There should be no "tt1" shard on the worker nodes
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
SELECT relname FROM pg_class WHERE relname LIKE 'tt1%';
|
||||||
|
|
|
@ -284,9 +284,11 @@ END;
|
||||||
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 text, "CoL4"")" int);
|
||||||
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||||
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||||
ALTER TABLE data_load_test DROP COLUMN col2;
|
ALTER TABLE data_load_test DROP COLUMN col1;
|
||||||
SELECT create_distributed_table('data_load_test', 'col1');
|
SELECT create_distributed_table('data_load_test', 'col3');
|
||||||
SELECT * FROM data_load_test;
|
SELECT * FROM data_load_test ORDER BY col2;
|
||||||
|
-- make sure the tuple went to the right shard
|
||||||
|
SELECT * FROM data_load_test WHERE col3 = 'world';
|
||||||
DROP TABLE data_load_test;
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
|
|
|
@ -1903,17 +1903,22 @@ SELECT user_id, value_4 FROM test_view ORDER BY user_id, value_4;
|
||||||
DROP VIEW test_view;
|
DROP VIEW test_view;
|
||||||
|
|
||||||
-- Make sure we handle dropped columns correctly
|
-- Make sure we handle dropped columns correctly
|
||||||
TRUNCATE raw_events_first;
|
CREATE TABLE drop_col_table (col1 text, col2 text, col3 text);
|
||||||
|
SELECT create_distributed_table('drop_col_table', 'col2');
|
||||||
|
|
||||||
ALTER TABLE raw_events_first DROP COLUMN value_1;
|
ALTER TABLE drop_col_table DROP COLUMN col1;
|
||||||
|
|
||||||
INSERT INTO raw_events_first (user_id, value_4)
|
INSERT INTO drop_col_table (col3, col2)
|
||||||
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
|
SELECT value_4, user_id FROM raw_events_second LIMIT 5;
|
||||||
|
|
||||||
SELECT user_id, value_4 FROM raw_events_first ORDER BY user_id;
|
SELECT * FROM drop_col_table ORDER BY col2, col3;
|
||||||
|
|
||||||
|
-- make sure the tuple went to the right shard
|
||||||
|
SELECT * FROM drop_col_table WHERE col2 = '1';
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
DROP TABLE drop_col_table;
|
||||||
DROP TABLE raw_table;
|
DROP TABLE raw_table;
|
||||||
DROP TABLE summary_table;
|
DROP TABLE summary_table;
|
||||||
DROP TABLE raw_events_first CASCADE;
|
DROP TABLE raw_events_first CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue