Merge pull request #3799 from citusdata/fix-copy-generated

Fix COPY TO's COPY (SELECT) with distributed table having generated columns
pull/3802/head^2
Philip Dubé 2020-04-28 14:53:34 +00:00 committed by GitHub
commit 3fecf0b732
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 80 additions and 15 deletions

View File

@ -264,6 +264,7 @@ static void EndPlacementStateCopyCommand(CopyPlacementState *placementState,
static void UnclaimCopyConnections(List *connectionStateList); static void UnclaimCopyConnections(List *connectionStateList);
static void ShutdownCopyConnectionState(CopyConnectionState *connectionState, static void ShutdownCopyConnectionState(CopyConnectionState *connectionState,
CitusCopyDestReceiver *copyDest); CitusCopyDestReceiver *copyDest);
static SelectStmt * CitusCopySelect(CopyStmt *copyStatement);
static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag); static void CitusCopyTo(CopyStmt *copyStatement, char *completionTag);
static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState, static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState,
MultiConnection *connection); MultiConnection *connection);
@ -2760,22 +2761,9 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
{ {
/* /*
* COPY table TO PROGRAM / file is handled by wrapping the table * COPY table TO PROGRAM / file is handled by wrapping the table
* in a SELECT * FROM table and going through the result COPY logic. * in a SELECT and going through the resulting COPY logic.
*/ */
ColumnRef *allColumns = makeNode(ColumnRef); SelectStmt *selectStmt = CitusCopySelect(copyStatement);
SelectStmt *selectStmt = makeNode(SelectStmt);
ResTarget *selectTarget = makeNode(ResTarget);
allColumns->fields = list_make1(makeNode(A_Star));
allColumns->location = -1;
selectTarget->name = NULL;
selectTarget->indirection = NIL;
selectTarget->val = (Node *) allColumns;
selectTarget->location = -1;
selectStmt->targetList = list_make1(selectTarget);
selectStmt->fromClause = list_make1(copyObject(copyStatement->relation));
/* replace original statement */ /* replace original statement */
copyStatement = copyObject(copyStatement); copyStatement = copyObject(copyStatement);
@ -2837,6 +2825,53 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
} }
/*
* CitusCopySelect generates a SelectStmt such that table may be replaced in
* "COPY table FROM" for an equivalent result.
*/
static SelectStmt *
CitusCopySelect(CopyStmt *copyStatement)
{
SelectStmt *selectStmt = makeNode(SelectStmt);
selectStmt->fromClause = list_make1(copyObject(copyStatement->relation));
Relation distributedRelation = heap_openrv(copyStatement->relation, AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
List *targetList = NIL;
for (int i = 0; i < tupleDescriptor->natts; i++)
{
Form_pg_attribute attr = &tupleDescriptor->attrs[i];
if (attr->attisdropped
#if PG_VERSION_NUM >= PG_VERSION_12
|| attr->attgenerated
#endif
)
{
continue;
}
ColumnRef *column = makeNode(ColumnRef);
column->fields = list_make1(makeString(pstrdup(attr->attname.data)));
column->location = -1;
ResTarget *selectTarget = makeNode(ResTarget);
selectTarget->name = NULL;
selectTarget->indirection = NIL;
selectTarget->val = (Node *) column;
selectTarget->location = -1;
targetList = lappend(targetList, selectTarget);
}
heap_close(distributedRelation, NoLock);
selectStmt->targetList = targetList;
return selectStmt;
}
/* /*
* CitusCopyTo runs a COPY .. TO STDOUT command on each shard to do a full * CitusCopyTo runs a COPY .. TO STDOUT command on each shard to do a full
* table dump. * table dump.
@ -3061,6 +3096,10 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{ {
if (TupleDescAttr(tupDesc, i)->attisdropped) if (TupleDescAttr(tupDesc, i)->attisdropped)
continue; continue;
#if PG_VERSION_NUM >= PG_VERSION_12
if (TupleDescAttr(tupDesc, i)->attgenerated)
continue;
#endif
attnums = lappend_int(attnums, i + 1); attnums = lappend_int(attnums, i + 1);
} }
} }
@ -3085,6 +3124,14 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
continue; continue;
if (namestrcmp(&(att->attname), name) == 0) if (namestrcmp(&(att->attname), name) == 0)
{ {
#if PG_VERSION_NUM >= PG_VERSION_12
if (att->attgenerated)
ereport(ERROR,
(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
errmsg("column \"%s\" is a generated column",
name),
errdetail("Generated columns cannot be used in COPY.")));
#endif
attnum = att->attnum; attnum = att->attnum;
break; break;
} }

View File

@ -47,6 +47,7 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
select create_distributed_table('gen2', 'val2'); select create_distributed_table('gen2', 'val2');
ERROR: cannot distribute relation: gen2 ERROR: cannot distribute relation: gen2
DETAIL: Distribution column must not use GENERATED ALWAYS AS (...) STORED. DETAIL: Distribution column must not use GENERATED ALWAYS AS (...) STORED.
copy gen1 to :'temp_dir''pg12_copy_test_generated';
insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2); insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2);
insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2); insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2);
select * from gen1 order by 1,2,3; select * from gen1 order by 1,2,3;
@ -75,6 +76,17 @@ select * from gen2 order by 1,2,3;
8 | 2 | 4 8 | 2 | 4
(8 rows) (8 rows)
truncate gen1;
copy gen1 from :'temp_dir''pg12_copy_test_generated';
select * from gen1 order by 1,2,3;
id | val2 | val1
---------------------------------------------------------------------
1 | 6 | 4
3 | 8 | 6
5 | 4 | 2
7 | 4 | 2
(4 rows)
-- Test new VACUUM/ANALYZE options -- Test new VACUUM/ANALYZE options
analyze (skip_locked) gen1; analyze (skip_locked) gen1;
vacuum (skip_locked) gen1; vacuum (skip_locked) gen1;

View File

@ -43,12 +43,18 @@ insert into gen2 (id, val1) values (1,4),(3,6),(5,2),(7,2);
select create_distributed_table('gen1', 'id'); select create_distributed_table('gen1', 'id');
select create_distributed_table('gen2', 'val2'); select create_distributed_table('gen2', 'val2');
copy gen1 to :'temp_dir''pg12_copy_test_generated';
insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2); insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2);
insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2); insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2);
select * from gen1 order by 1,2,3; select * from gen1 order by 1,2,3;
select * from gen2 order by 1,2,3; select * from gen2 order by 1,2,3;
truncate gen1;
copy gen1 from :'temp_dir''pg12_copy_test_generated';
select * from gen1 order by 1,2,3;
-- Test new VACUUM/ANALYZE options -- Test new VACUUM/ANALYZE options
analyze (skip_locked) gen1; analyze (skip_locked) gen1;
vacuum (skip_locked) gen1; vacuum (skip_locked) gen1;