From 1df9601e1326bd78be2e83e5f3b2eff366d3e1c6 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 10 Mar 2020 10:34:23 +0300 Subject: [PATCH] not use local copy if current transaction is connected to local group If current transaction is connected to local group we should not use local copy, because we might not see some of the changes that are made over the connection to the local group. --- .../commands/create_distributed_table.c | 5 +-- src/backend/distributed/commands/multi_copy.c | 12 +++---- .../executor/insert_select_executor.c | 10 ++---- src/include/distributed/commands/multi_copy.h | 3 +- .../regress/expected/local_shard_copy.out | 33 +++++++++++++++---- .../expected/local_shard_execution.out | 2 +- src/test/regress/sql/local_shard_copy.sql | 11 +++++-- 7 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index aa76f338d..60caa58a5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1258,15 +1258,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - /* here we already have the data locally */ - bool hasCopyDataLocally = true; copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, estate, stopOnFailure, - NULL, - hasCopyDataLocally); + NULL); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8d2dd9928..0e698fe24 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -431,12 +431,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) stopOnFailure = true; } - bool hasCopyDataLocally = false; - /* set up the destination for the COPY */ copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, stopOnFailure, NULL, - hasCopyDataLocally); + executorState, stopOnFailure, NULL); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1979,12 +1976,12 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultIdPrefix, bool hasCopyDataLocally) + char *intermediateResultIdPrefix) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); - copyDest->shouldUseLocalCopy = !hasCopyDataLocally && ShouldExecuteCopyLocally(); + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(); /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; @@ -2040,7 +2037,8 @@ ShouldExecuteCopyLocally() return true; } - return IsMultiStatementTransaction(); + /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ + return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f5b3d2cce..6663acc49 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -581,16 +581,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the intermediate table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, stopOnFailure, - intermediateResultIdPrefix, - hasCopyDataLocally); + intermediateResultIdPrefix); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -626,15 +623,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the distributed table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, - stopOnFailure, NULL, - hasCopyDataLocally); + stopOnFailure, NULL); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index b887cc5ad..9ec8f280b 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -144,8 +144,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultPrefix, - bool hasCopyDataLocally); + char *intermediateResultPrefix); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index b7521f9da..fbf9dc43b 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -242,26 +242,25 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard -CONTEXT: COPY distributed_table, line 1: "1, 100" -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- - 100 -(1 row) +(0 rows) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- - 26 + 21 (1 row) + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true @@ -307,8 +306,18 @@ ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 @@ -328,8 +337,18 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 8d3b35ca6..e2ed1dedb 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -618,7 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index e9b548a94..3b2dc902c 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -124,6 +124,10 @@ BEGIN; ROLLBACK; BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; 1, 100 @@ -132,10 +136,7 @@ BEGIN; 4, 400 5, 500 \. - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; - SELECT count(*) FROM distributed_table; -- verify that the copy is successful. SELECT count(*) FROM distributed_table; @@ -193,6 +194,8 @@ TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '|'; @@ -205,6 +208,8 @@ SELECT count(*) FROM distributed_table; ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '[';