enable local execution in INSERT..SELECT and add more tests

We can use local copy in INSERT..SELECT, so the check that disables
local execution is removed.

Also a test for local copy where the data size >
LOCAL_COPY_FLUSH_THRESHOLD is added.

use local execution with insert..select
pull/3557/head
SaitTalhaNisanci 2020-03-12 14:34:38 +03:00
parent 42cfc4c0e9
commit 9d2f3c392a
16 changed files with 116 additions and 83 deletions

View File

@ -35,7 +35,7 @@
#include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h"
static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
CopyOutState localCopyOutState);
@ -60,7 +60,10 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
shardId,
CopyOutState localCopyOutState)
{
/* since we are doing a local copy, the following statements should use local execution to see the changes */
/*
* Since we are doing a local copy, the following statements should
* use local execution to see the changes
*/
TransactionAccessedLocalPlacement = true;
bool isBinaryCopy = localCopyOutState->binary;
@ -76,8 +79,8 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
if (isBinaryCopy)
{
/*
* We're going to flush the buffer to disk by effectively doing a full COPY command.
* Hence we also need to add footers to the current buffer.
* We're going to flush the buffer to disk by effectively doing a full
* COPY command. Hence we also need to add footers to the current buffer.
*/
AppendCopyBinaryFooters(localCopyOutState);
}
@ -108,7 +111,8 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
/*
* AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState.
* AddSlotToBuffer serializes the given slot and adds it to
* the buffer in localCopyOutState.
*/
static void
AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState
@ -139,16 +143,18 @@ ShouldSendCopyNow(StringInfo buffer)
/*
* DoLocalCopy finds the shard table from the distributed relation id, and copies the given
* buffer into the shard.
* CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though
* it was reading from stdin. It then parses the tuples and writes them to the shardOid table.
* CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer
* as though it was reading from stdin. It then parses the tuples and
* writes them to the shardOid table.
*/
static void
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
bool isEndOfCopy)
{
/*
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it.
* We cannot pass additional arguments to ReadFromLocalBufferCallback.
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback
* to read from it. We cannot pass additional arguments to
* ReadFromLocalBufferCallback.
*/
LocalCopyBuffer = buffer;
@ -167,11 +173,7 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat
heap_close(shard, NoLock);
free_parsestate(pState);
FreeStringInfo(buffer);
if (!isEndOfCopy)
{
buffer = makeStringInfo();
}
resetStringInfo(buffer);
}
@ -192,21 +194,21 @@ ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary)
/*
* ReadFromLocalBufferCallback is the copy callback.
* It always tries to copy maxread bytes.
* It always tries to copy maxRead bytes.
*/
static int
ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread)
ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead)
{
int bytesread = 0;
int bytesRead = 0;
int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor;
int bytesToRead = Min(avail, maxread);
int bytesToRead = Min(avail, maxRead);
if (bytesToRead > 0)
{
memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf),
memcpy_s(outBuf, bytesToRead,
&LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead);
}
bytesread += bytesToRead;
bytesRead += bytesToRead;
LocalCopyBuffer->cursor += bytesToRead;
return bytesread;
return bytesRead;
}

View File

@ -553,6 +553,10 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
PlannedStmt *
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{
if (distributedPlan->workerJob == NULL)
{
return NULL;
}
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
LocalPlannedStatement *localPlannedStatement = NULL;

View File

@ -26,6 +26,7 @@
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_planner.h"
#include "distributed/local_executor.h"
#include "distributed/distributed_planner.h"
#include "distributed/recursive_planning.h"
#include "distributed/relation_access_tracking.h"
@ -135,15 +136,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
bool hasReturning = distributedPlan->hasReturning;
HTAB *shardStateHash = NULL;
/*
* INSERT .. SELECT via coordinator consists of two steps, a SELECT is
* followd by a COPY. If the SELECT is executed locally, then the COPY
* would fail since Citus currently doesn't know how to handle COPY
* locally. So, to prevent the command fail, we simply disable local
* execution.
*/
DisableLocalExecution();
/* select query to execute */
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
@ -198,7 +190,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
GetDistributedPlan((CustomScan *) selectPlan->planTree);
Job *distSelectJob = distSelectPlan->workerJob;
List *distSelectTaskList = distSelectJob->taskList;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true;
bool interTransactions = false;
bool binaryFormat =
@ -280,11 +271,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
taskList,
tupleDescriptor,
scanState->tuplestorestate,
hasReturning);
uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState,
taskList,
ROW_MODIFY_COMMUTATIVE,
hasReturning);
executorState->es_processed = rowsInserted;
}
@ -335,17 +325,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
if (prunedTaskList != NIL)
{
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
bool randomAccess = true;
bool interTransactions = false;
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
tupleDescriptor, scanState->tuplestorestate,
hasReturning);
ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList,
ROW_MODIFY_COMMUTATIVE,
hasReturning);
if (SortReturning && hasReturning)
{

View File

@ -231,6 +231,48 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
}
/*
* ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks
* if local execution can be used and executes them.
*/
uint64
ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
List *taskList, RowModifyLevel rowModifyLevel, bool
hasReturning)
{
uint64 processedRows = 0;
List *localTaskList = NIL;
List *remoteTaskList = NIL;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
if (ShouldExecuteTasksLocally(taskList))
{
bool readOnlyPlan = false;
/* set local (if any) & remote tasks */
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
&remoteTaskList);
processedRows += ExecuteLocalTaskList(scanState, localTaskList);
}
else
{
/* all tasks should be executed via remote connections */
remoteTaskList = taskList;
}
/* execute remote tasks if any */
if (list_length(remoteTaskList) > 0)
{
processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList,
tupleDescriptor,
scanState->tuplestorestate,
hasReturning);
}
return processedRows;
}
/*
* ExtractParametersForLocalExecution extracts parameter types and values
* from the given ParamListInfo structure, and fills parameter type and

View File

@ -21,6 +21,9 @@ extern bool TransactionAccessedLocalPlacement;
extern bool TransactionConnectedToLocalGroup;
/* extern function declarations */
extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
List *taskList, RowModifyLevel
rowModifyLevel, bool hasReturning);
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
extern void ExecuteLocalUtilityTaskList(List *localTaskList);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,

View File

@ -37,6 +37,8 @@ SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
-- INSERT..SELECT with COPY under the covers
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
-- router queries execute locally
INSERT INTO test VALUES (1, 1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)

View File

@ -339,6 +339,13 @@ DETAIL: Key (key)=(1) already exists.
CONTEXT: COPY distributed_table_1570001, line 1
ROLLBACK;
TRUNCATE distributed_table;
BEGIN;
-- insert a lot of data ( around 8MB),
-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB)
INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000);
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
ROLLBACK;
COPY distributed_table FROM STDIN WITH delimiter ',';
ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check"
DETAIL: Failing row contains (1, 9).
@ -461,6 +468,7 @@ ROLLBACK;
SET search_path TO local_shard_copy;
SET citus.log_local_commands TO ON;
TRUNCATE TABLE reference_table;
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE
TRUNCATE TABLE local_table;
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
count

View File

@ -276,6 +276,8 @@ RETURNING *;
-- that's why it is disallowed to use local execution even if the SELECT
-- can be executed locally
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
@ -507,28 +509,7 @@ NOTICE: truncate cascades to table "second_distributed_table"
ROLLBACK;
-- load some data so that foreign keys won't complain with the next tests
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
-- a very similar set of operation, but this time use
-- COPY as the first command
BEGIN;
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
-- this could go through local execution, but doesn't because we've already
-- done distributed execution
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
key | value | age
---------------------------------------------------------------------
500 | 500 | 25
(1 row)
-- utility commands could still use distributed execution
TRUNCATE distributed_table CASCADE;
NOTICE: truncate cascades to table "second_distributed_table"
-- ensure that TRUNCATE made it
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
key | value | age
---------------------------------------------------------------------
(0 rows)
ROLLBACK;
NOTICE: executing the copy locally for shard xxxxx
-- show that cascading foreign keys just works fine with local execution
BEGIN;
INSERT INTO reference_table VALUES (701);
@ -619,6 +600,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
(1 row)
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i;
NOTICE: executing the copy locally for shard xxxxx
ROLLBACK;
-- a local query is followed by a command that cannot be executed locally
BEGIN;
@ -1331,7 +1313,10 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
NOTICE: executing the copy locally for shard xxxxx
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
-- show that both local, and mixed local-distributed executions
-- calculate rows processed correctly
BEGIN;

View File

@ -951,6 +951,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio
(1 row)
INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i;
NOTICE: executing the copy locally for shard xxxxx
PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS
DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index;
EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0);
@ -1438,6 +1439,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio
(1 row)
INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i;
NOTICE: executing the copy locally for shard xxxxx
PREPARE router_with_param_and_func_on_non_dist_key(int) AS
DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index;
EXECUTE router_with_param_and_func_on_non_dist_key(0);

View File

@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i
(1 row)
insert into target_table SELECT a FROM source_table LIMIT 10;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint
NOTICE: executing the copy locally for shard xxxxx
ROLLBACK;
\c - - - :master_port
SET search_path TO multi_mx_insert_select_repartition;

View File

@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
4
2
(1 row)
SELECT recover_prepared_transactions();

View File

@ -118,6 +118,8 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
count

View File

@ -17,6 +17,7 @@ SELECT create_reference_table('squares');
(1 row)
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
NOTICE: executing the copy locally for shard xxxxx
-- should be executed locally
SELECT count(*) FROM squares;
NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares

View File

@ -212,6 +212,14 @@ ROLLBACK;
TRUNCATE distributed_table;
BEGIN;
-- insert a lot of data ( around 8MB),
-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB)
INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000);
ROLLBACK;
COPY distributed_table FROM STDIN WITH delimiter ',';
1, 9
\.

View File

@ -305,22 +305,6 @@ ROLLBACK;
-- load some data so that foreign keys won't complain with the next tests
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
-- a very similar set of operation, but this time use
-- COPY as the first command
BEGIN;
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
-- this could go through local execution, but doesn't because we've already
-- done distributed execution
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
-- utility commands could still use distributed execution
TRUNCATE distributed_table CASCADE;
-- ensure that TRUNCATE made it
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
ROLLBACK;
-- show that cascading foreign keys just works fine with local execution
BEGIN;
INSERT INTO reference_table VALUES (701);

View File

@ -85,6 +85,8 @@ SET search_path TO 'truncate_from_workers';
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
BEGIN;
-- we can enable local execution when truncate can be executed locally.
SET citus.enable_local_execution = 'off';
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
SELECT count(*) FROM on_update_fkey_table;
TRUNCATE on_update_fkey_table;