not use local copy if current transaction is connected to local group

If current transaction is connected to local group we should not use
local copy, because we might not see some of the changes that are made
over the connection to the local group.
pull/3557/head
SaitTalhaNisanci 2020-03-10 10:34:23 +03:00
parent 39bbec0f30
commit 1df9601e13
7 changed files with 44 additions and 32 deletions

View File

@ -1258,15 +1258,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
/* here we already have the data locally */
bool hasCopyDataLocally = true;
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, stopOnFailure,
NULL,
hasCopyDataLocally);
NULL);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);

View File

@ -431,12 +431,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
stopOnFailure = true;
}
bool hasCopyDataLocally = false;
/* set up the destination for the COPY */
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex,
executorState, stopOnFailure, NULL,
hasCopyDataLocally);
executorState, stopOnFailure, NULL);
dest = (DestReceiver *) copyDest;
dest->rStartup(dest, 0, tupleDescriptor);
@ -1979,12 +1976,12 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState, bool stopOnFailure,
char *intermediateResultIdPrefix, bool hasCopyDataLocally)
char *intermediateResultIdPrefix)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
sizeof(CitusCopyDestReceiver));
copyDest->shouldUseLocalCopy = !hasCopyDataLocally && ShouldExecuteCopyLocally();
copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally();
/* set up the DestReceiver function pointers */
copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive;
@ -2040,7 +2037,8 @@ ShouldExecuteCopyLocally()
return true;
}
return IsMultiStatementTransaction();
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction();
}

View File

@ -581,16 +581,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
bool hasCopyDataLocally = true;
/* set up a DestReceiver that copies into the intermediate table */
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
stopOnFailure,
intermediateResultIdPrefix,
hasCopyDataLocally);
intermediateResultIdPrefix);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
@ -626,15 +623,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
bool hasCopyDataLocally = true;
/* set up a DestReceiver that copies into the distributed table */
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
stopOnFailure, NULL,
hasCopyDataLocally);
stopOnFailure, NULL);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

View File

@ -144,8 +144,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
int partitionColumnIndex,
EState *executorState,
bool stopOnFailure,
char *intermediateResultPrefix,
bool hasCopyDataLocally);
char *intermediateResultPrefix);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);

View File

@ -242,26 +242,25 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
ROLLBACK;
BEGIN;
-- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard
CONTEXT: COPY distributed_table, line 1: "1, 100"
-- run select with local execution
SELECT age FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
age
---------------------------------------------------------------------
100
(1 row)
(0 rows)
SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
count
---------------------------------------------------------------------
26
21
(1 row)
-- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard
CONTEXT: COPY distributed_table, line 1: "1, 100"
-- verify that the copy is successful.
SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
@ -307,8 +306,18 @@ ROLLBACK;
TRUNCATE distributed_table;
-- different delimiters
BEGIN;
-- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
0
(1 row)
-- initial size
SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
count
---------------------------------------------------------------------
0
@ -328,8 +337,18 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
ROLLBACK;
BEGIN;
-- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
0
(1 row)
-- initial size
SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true
count
---------------------------------------------------------------------
0

View File

@ -618,7 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
0
(1 row)
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i;
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i;
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;

View File

@ -124,6 +124,10 @@ BEGIN;
ROLLBACK;
BEGIN;
-- run select with local execution
SELECT age FROM distributed_table WHERE key = 1;
SELECT count(*) FROM distributed_table;
-- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ',';
1, 100
@ -132,10 +136,7 @@ BEGIN;
4, 400
5, 500
\.
-- run select with local execution
SELECT age FROM distributed_table WHERE key = 1;
SELECT count(*) FROM distributed_table;
-- verify that the copy is successful.
SELECT count(*) FROM distributed_table;
@ -193,6 +194,8 @@ TRUNCATE distributed_table;
-- different delimiters
BEGIN;
-- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1;
-- initial size
SELECT count(*) FROM distributed_table;
COPY distributed_table FROM STDIN WITH delimiter '|';
@ -205,6 +208,8 @@ SELECT count(*) FROM distributed_table;
ROLLBACK;
BEGIN;
-- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1;
-- initial size
SELECT count(*) FROM distributed_table;
COPY distributed_table FROM STDIN WITH delimiter '[';