diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index 54fedc2a5..c280bf7d0 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -45,7 +45,6 @@ static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); -static Relation CreateCopiedShard(RangeVar *distributedRel, Relation shard); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary); @@ -146,13 +145,12 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat Oid shardOid = GetShardLocalTableOid(relationId, shardId); Relation shard = heap_open(shardOid, RowExclusiveLock); - Relation copiedShard = CreateCopiedShard(copyStatement->relation, shard); ParseState *pState = make_parsestate(NULL); /* p_rtable of pState is set so that we can check constraints. */ - pState->p_rtable = CreateRangeTable(copiedShard, ACL_INSERT); + pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); - CopyState cstate = BeginCopyFrom(pState, copiedShard, NULL, false, + CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, ReadFromLocalBufferCallback, copyStatement->attlist, copyStatement->options); CopyFrom(cstate); @@ -183,42 +181,6 @@ ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) } -/* - * CreateCopiedShard clones deep copies the necessary fields of the given - * relation. - */ -Relation -CreateCopiedShard(RangeVar *distributedRel, Relation shard) -{ - TupleDesc tupleDescriptor = RelationGetDescr(shard); - - Relation copiedDistributedRelation = (Relation) palloc(sizeof(RelationData)); - Form_pg_class copiedDistributedRelationTuple = (Form_pg_class) palloc( - CLASS_TUPLE_SIZE); - - *copiedDistributedRelation = *shard; - *copiedDistributedRelationTuple = *shard->rd_rel; - - copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple; - copiedDistributedRelation->rd_att = CreateTupleDescCopyConstr(tupleDescriptor); - - Oid tableId = RangeVarGetRelid(distributedRel, NoLock, false); - - /* - * BeginCopyFrom opens all partitions of given partitioned table with relation_open - * and it expects its caller to close those relations. We do not have direct access - * to opened relations, thus we are changing relkind of partitioned tables so that - * Postgres will treat those tables as regular relations and will not open its - * partitions. - */ - if (PartitionedTable(tableId)) - { - copiedDistributedRelationTuple->relkind = RELKIND_RELATION; - } - return copiedDistributedRelation; -} - - /* * ReadFromLocalBufferCallback is the copy callback. * It always tries to copy maxread bytes. diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index fbf9dc43b..3eeefb812 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -54,6 +54,24 @@ CREATE TABLE local_table (key int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" DEBUG: building index "local_table_pkey" on table "local_table" serially INSERT INTO local_table SELECT * from generate_series(1, 10); +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); +DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key" +SELECT create_distributed_table('collections_list', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_copy; @@ -240,6 +258,36 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 26 (1 row) +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- +(0 rows) + + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY collections_list, line 1: "1, 0" + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true + count +--------------------------------------------------------------------- + 5 +(1 row) + ROLLBACK; BEGIN; -- run select with local execution diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index 3b2dc902c..af8e3932e 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -22,6 +22,22 @@ INSERT INTO reference_table SELECT * FROM generate_series(1, 10); CREATE TABLE local_table (key int PRIMARY KEY); INSERT INTO local_table SELECT * from generate_series(1, 10); +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); + +SELECT create_distributed_table('collections_list', 'key'); + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); + +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); + -- connection worker and get ready for the tests \c - - - :worker_1_port @@ -123,6 +139,24 @@ BEGIN; ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM collections_list; + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +1, 0 +2, 0 +3, 0 +4, 1 +5, 1 +\. + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; + +ROLLBACK; + BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1;