diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index aa76f338d..60caa58a5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1258,15 +1258,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - /* here we already have the data locally */ - bool hasCopyDataLocally = true; copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, estate, stopOnFailure, - NULL, - hasCopyDataLocally); + NULL); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8d2dd9928..0e698fe24 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -431,12 +431,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) stopOnFailure = true; } - bool hasCopyDataLocally = false; - /* set up the destination for the COPY */ copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, stopOnFailure, NULL, - hasCopyDataLocally); + executorState, stopOnFailure, NULL); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1979,12 +1976,12 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultIdPrefix, bool hasCopyDataLocally) + char *intermediateResultIdPrefix) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); - copyDest->shouldUseLocalCopy = !hasCopyDataLocally && ShouldExecuteCopyLocally(); + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(); /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; @@ -2040,7 +2037,8 @@ ShouldExecuteCopyLocally() return true; } - return IsMultiStatementTransaction(); + /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ + return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f5b3d2cce..6663acc49 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -581,16 +581,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the intermediate table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, stopOnFailure, - intermediateResultIdPrefix, - hasCopyDataLocally); + intermediateResultIdPrefix); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -626,15 +623,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the distributed table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, - stopOnFailure, NULL, - hasCopyDataLocally); + stopOnFailure, NULL); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index b887cc5ad..9ec8f280b 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -144,8 +144,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultPrefix, - bool hasCopyDataLocally); + char *intermediateResultPrefix); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index b7521f9da..fbf9dc43b 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -242,26 +242,25 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard -CONTEXT: COPY distributed_table, line 1: "1, 100" -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- - 100 -(1 row) +(0 rows) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- - 26 + 21 (1 row) + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true @@ -307,8 +306,18 @@ ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 @@ -328,8 +337,18 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 8d3b35ca6..e2ed1dedb 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -618,7 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index e9b548a94..3b2dc902c 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -124,6 +124,10 @@ BEGIN; ROLLBACK; BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; 1, 100 @@ -132,10 +136,7 @@ BEGIN; 4, 400 5, 500 \. - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; - SELECT count(*) FROM distributed_table; -- verify that the copy is successful. SELECT count(*) FROM distributed_table; @@ -193,6 +194,8 @@ TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '|'; @@ -205,6 +208,8 @@ SELECT count(*) FROM distributed_table; ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '[';