From 9d2f3c392a56e56ac04eff2575bef460503b1164 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 12 Mar 2020 14:34:38 +0300 Subject: [PATCH] enable local execution in INSERT..SELECT and add more tests We can use local copy in INSERT..SELECT, so the check that disables local execution is removed. Also a test for local copy where the data size > LOCAL_COPY_FLUSH_THRESHOLD is added. use local execution with insert..select --- .../distributed/commands/local_multi_copy.c | 44 ++++++++++--------- .../distributed/executor/citus_custom_scan.c | 4 ++ .../executor/insert_select_executor.c | 28 ++++-------- .../distributed/executor/local_executor.c | 42 ++++++++++++++++++ src/include/distributed/local_executor.h | 3 ++ .../expected/coordinator_shouldhaveshards.out | 2 + .../regress/expected/local_shard_copy.out | 8 ++++ .../expected/local_shard_execution.out | 29 +++--------- .../expected/master_evaluation_modify.out | 2 + .../multi_mx_insert_select_repartition.out | 6 +-- .../multi_mx_transaction_recovery.out | 2 +- .../multi_mx_truncate_from_worker.out | 2 + ...licate_reference_tables_to_coordinator.out | 1 + src/test/regress/sql/local_shard_copy.sql | 8 ++++ .../regress/sql/local_shard_execution.sql | 16 ------- .../sql/multi_mx_truncate_from_worker.sql | 2 + 16 files changed, 116 insertions(+), 83 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index 24b2440f7..efa8aa954 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -35,7 +35,7 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" -static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState localCopyOutState); @@ -60,7 +60,10 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in shardId, CopyOutState localCopyOutState) { - /* since we are doing a local copy, the following statements should use local execution to see the changes */ + /* + * Since we are doing a local copy, the following statements should + * use local execution to see the changes + */ TransactionAccessedLocalPlacement = true; bool isBinaryCopy = localCopyOutState->binary; @@ -76,8 +79,8 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in if (isBinaryCopy) { /* - * We're going to flush the buffer to disk by effectively doing a full COPY command. - * Hence we also need to add footers to the current buffer. + * We're going to flush the buffer to disk by effectively doing a full + * COPY command. Hence we also need to add footers to the current buffer. */ AppendCopyBinaryFooters(localCopyOutState); } @@ -108,7 +111,8 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, /* - * AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState. + * AddSlotToBuffer serializes the given slot and adds it to + * the buffer in localCopyOutState. */ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState @@ -139,16 +143,18 @@ ShouldSendCopyNow(StringInfo buffer) /* * DoLocalCopy finds the shard table from the distributed relation id, and copies the given * buffer into the shard. - * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though - * it was reading from stdin. It then parses the tuples and writes them to the shardOid table. + * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer + * as though it was reading from stdin. It then parses the tuples and + * writes them to the shardOid table. */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, bool isEndOfCopy) { /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it. - * We cannot pass additional arguments to ReadFromLocalBufferCallback. + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. */ LocalCopyBuffer = buffer; @@ -167,11 +173,7 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat heap_close(shard, NoLock); free_parsestate(pState); - FreeStringInfo(buffer); - if (!isEndOfCopy) - { - buffer = makeStringInfo(); - } + resetStringInfo(buffer); } @@ -192,21 +194,21 @@ ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) /* * ReadFromLocalBufferCallback is the copy callback. - * It always tries to copy maxread bytes. + * It always tries to copy maxRead bytes. */ static int -ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) +ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) { - int bytesread = 0; + int bytesRead = 0; int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; - int bytesToRead = Min(avail, maxread); + int bytesToRead = Min(avail, maxRead); if (bytesToRead > 0) { - memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), + memcpy_s(outBuf, bytesToRead, &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); } - bytesread += bytesToRead; + bytesRead += bytesToRead; LocalCopyBuffer->cursor += bytesToRead; - return bytesread; + return bytesRead; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c0de40978..c9a02fb50 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -553,6 +553,10 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { + if (distributedPlan->workerJob == NULL) + { + return NULL; + } List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6663acc49..4b7c7cc32 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -26,6 +26,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" +#include "distributed/local_executor.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" @@ -135,15 +136,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; HTAB *shardStateHash = NULL; - /* - * INSERT .. SELECT via coordinator consists of two steps, a SELECT is - * followd by a COPY. If the SELECT is executed locally, then the COPY - * would fail since Citus currently doesn't know how to handle COPY - * locally. So, to prevent the command fail, we simply disable local - * execution. - */ - DisableLocalExecution(); - /* select query to execute */ Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); @@ -198,7 +190,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) GetDistributedPlan((CustomScan *) selectPlan->planTree); Job *distSelectJob = distSelectPlan->workerJob; List *distSelectTaskList = distSelectJob->taskList; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; bool binaryFormat = @@ -280,11 +271,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, - taskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); + uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, + taskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); executorState->es_processed = rowsInserted; } @@ -335,17 +325,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) if (prunedTaskList != NIL) { - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, - tupleDescriptor, scanState->tuplestorestate, - hasReturning); + ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7003ea0b3..4930ad7af 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -231,6 +231,48 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } +/* + * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks + * if local execution can be used and executes them. + */ +uint64 +ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel rowModifyLevel, bool + hasReturning) +{ + uint64 processedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + + if (ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + processedRows += ExecuteLocalTaskList(scanState, localTaskList); + } + else + { + /* all tasks should be executed via remote connections */ + remoteTaskList = taskList; + } + + /* execute remote tasks if any */ + if (list_length(remoteTaskList) > 0) + { + processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning); + } + + return processedRows; +} + + /* * ExtractParametersForLocalExecution extracts parameter types and values * from the given ParamListInfo structure, and fills parameter type and diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 8b11e096c..265434b89 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,6 +21,9 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ +extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel + rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 0af6f175a..023f58720 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -37,6 +37,8 @@ SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- router queries execute locally INSERT INTO test VALUES (1, 1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index d42403016..375f4200a 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -339,6 +339,13 @@ DETAIL: Key (key)=(1) already exists. CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; +BEGIN; +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +ROLLBACK; COPY distributed_table FROM STDIN WITH delimiter ','; ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). @@ -461,6 +468,7 @@ ROLLBACK; SET search_path TO local_shard_copy; SET citus.log_local_commands TO ON; TRUNCATE TABLE reference_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE TRUNCATE TABLE local_table; SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; count diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e2ed1dedb..9dbf9975a 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -276,6 +276,8 @@ RETURNING *; -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; +NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; @@ -507,28 +509,7 @@ NOTICE: truncate cascades to table "second_distributed_table" ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- - 500 | 500 | 25 -(1 row) - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; -NOTICE: truncate cascades to table "second_distributed_table" - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- -(0 rows) - -ROLLBACK; +NOTICE: executing the copy locally for shard xxxxx -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -619,6 +600,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; @@ -1331,7 +1313,10 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx" NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- show that both local, and mixed local-distributed executions -- calculate rows processed correctly BEGIN; diff --git a/src/test/regress/expected/master_evaluation_modify.out b/src/test/regress/expected/master_evaluation_modify.out index a8123b94c..5571bce2e 100644 --- a/src/test/regress/expected/master_evaluation_modify.out +++ b/src/test/regress/expected/master_evaluation_modify.out @@ -951,6 +951,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0); @@ -1438,6 +1439,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE router_with_param_and_func_on_non_dist_key(0); diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index d61e046f2..469b08b3a 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i (1 row) insert into target_table SELECT a FROM source_table LIMIT 10; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 6ef2ba445..d108c61ec 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count --------------------------------------------------------------------- - 4 + 2 (1 row) SELECT recover_prepared_transactions(); diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index abb59b761..9825cd69b 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -118,6 +118,8 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i; SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; count diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index c0292606c..f0fb28056 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -17,6 +17,7 @@ SELECT create_reference_table('squares'); (1 row) INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; +NOTICE: executing the copy locally for shard xxxxx -- should be executed locally SELECT count(*) FROM squares; NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index f07a346e9..28bdffd40 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -212,6 +212,14 @@ ROLLBACK; TRUNCATE distributed_table; +BEGIN; + +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); + +ROLLBACK; + COPY distributed_table FROM STDIN WITH delimiter ','; 1, 9 \. diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index e3688bd9a..22b46abed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -305,22 +305,6 @@ ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; - - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; -ROLLBACK; - -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index aa05aa02a..00492246b 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -85,6 +85,8 @@ SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; TRUNCATE on_update_fkey_table;