diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 6d620301b..5f86022be 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, MaxAdaptiveExecutorPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, true); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 7e5d4cfe9..a7223f8fb 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -861,7 +861,8 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupporte /* execute remote tasks if any */ if (list_length(remoteTaskList) > 0) { - ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize); + ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize, + false); } } @@ -884,7 +885,7 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, jobIdList); + &xactProperties, jobIdList, true); } @@ -893,7 +894,8 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, * for some of the arguments. */ uint64 -ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool + localExecutionSupported) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; @@ -904,7 +906,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); } @@ -924,7 +926,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, true); } @@ -936,9 +938,30 @@ uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties, List *jobIdList) + TransactionProperties *xactProperties, + List *jobIdList, + bool localExecutionSupported) { ParamListInfo paramListInfo = NULL; + uint64 locallyProcessedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + + if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, NULL, + NULL, + tupleStore); + } + else + { + remoteTaskList = taskList; + } /* * If current transaction accessed local placements and task list includes @@ -946,7 +969,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } @@ -957,7 +980,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, } DistributedExecution *execution = - CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, + CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize, xactProperties, jobIdList); @@ -965,7 +988,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, RunDistributedExecution(execution); FinishDistributedExecution(execution); - return execution->rowsProcessed; + return execution->rowsProcessed + locallyProcessedRows; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c9a02fb50..51caa3ee2 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -553,7 +553,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { - if (distributedPlan->workerJob == NULL) + if (distributedPlan == NULL || distributedPlan->workerJob == NULL) { return NULL; } diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index ee11c03b7..68626b039 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -413,7 +413,7 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, resultStore, hasReturning, targetPoolSize, &xactProperties, - NIL); + NIL, false); return resultStore; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4b7c7cc32..314cd0246 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -270,11 +270,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, - taskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, + taskList, tupleDescriptor, + scanState->tuplestorestate, + hasReturning); executorState->es_processed = rowsInserted; } @@ -331,9 +331,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDescriptor, scanState->tuplestorestate, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index eb795e0c0..ffe5e59f6 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -79,6 +79,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" +#include "distributed/query_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -113,6 +114,8 @@ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo); static void LogLocalCommand(Task *task); +static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, + Tuplestorestate *tupleStoreState); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -197,11 +200,20 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo, taskNumParams = 0; taskParameterTypes = NULL; } + List *queryStrings = SplitIntoQueries(TaskQueryString(task)); + if (list_length(queryStrings) > 1) + { + LogLocalCommand(task); + totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(queryStrings, + tupleStoreState); + return totalRowsProcessed; + } - Query *shardQuery = ParseQueryString(TaskQueryString(task), + Query *shardQuery = ParseQueryString(linitial(queryStrings), taskParameterTypes, taskNumParams); + int cursorOptions = 0; /* @@ -230,48 +242,28 @@ ExecuteLocalTaskList(List *taskList, ParamListInfo orig_paramListInfo, } -/* - * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks - * if local execution can be used and executes them. - */ -uint64 -ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel rowModifyLevel, bool - hasReturning) +static uint64 +LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState) { - uint64 processedRows = 0; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - - if (ShouldExecuteTasksLocally(taskList)) + char *queryString = NULL; + uint64 totalProcessedRows = 0; + if (tupleStoreState == NULL) { - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - EState *estate = ScanStateGetExecutorState(scanState); - processedRows += ExecuteLocalTaskList(localTaskList, estate->es_param_list_info, - scanState->distributedPlan, - scanState->tuplestorestate); + tupleStoreState = tuplestore_begin_heap(true, false, work_mem); } - else + foreach_ptr(queryString, queryStrings) { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; + Query *shardQuery = ParseQueryString(queryString, + NULL, + 0); + int cursorOptions = 0; + ParamListInfo paramListInfo = NULL; + PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); + totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, + tupleStoreState, + paramListInfo); } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); - } - - return processedRows; + return totalProcessedRows; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index c58573ae0..a9ab9cba6 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -546,7 +546,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, poolSize = MaxAdaptiveExecutorPoolSize; } - ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize); + ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, true); } diff --git a/src/backend/distributed/utils/query_utils.c b/src/backend/distributed/utils/query_utils.c index f4741e36c..dbdb2de91 100644 --- a/src/backend/distributed/utils/query_utils.c +++ b/src/backend/distributed/utils/query_utils.c @@ -132,6 +132,31 @@ ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList) } +/* + * SplitIntoQueries returns a list of queries by splitting + * the given concatenated query string with delimiter ';' + */ +List * +SplitIntoQueries(char *concatenatedQueryString) +{ + List *queries = NIL; + rsize_t len = (rsize_t) strlen(concatenatedQueryString); + char *delimiter = ";"; + char *remaining = concatenatedQueryString; + char *query = strtok_s(concatenatedQueryString, &len, delimiter, &remaining); + while (query != NULL) + { + queries = lappend(queries, query); + if (len == 0) + { + break; + } + query = strtok_s(NULL, &len, delimiter, &remaining); + } + return queries; +} + + /* * ExtractRangeTableEntryWalker walks over a query tree, and finds all range * table entries. For recursing into the query tree, this function uses the diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index b55271c4d..3dd804e06 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -11,7 +11,7 @@ extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, - int targetPoolSize); + int targetPoolSize, bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index a1a1d156b..a2fabd45c 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,9 +21,6 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ -extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel - rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(List *taskList, ParamListInfo paramListInfo, DistributedPlan *distributedPlan, Tuplestorestate *tupleStoreState); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 448158d59..50bb8c53b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -79,15 +79,14 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, TransactionProperties *xactProperties, - List *jobIdList); + List *jobIdList, + bool localExecutionSupported); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported); -extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int - targetPoolSize); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); diff --git a/src/include/distributed/query_utils.h b/src/include/distributed/query_utils.h index 5e4a55bf7..f0ef4071b 100644 --- a/src/include/distributed/query_utils.h +++ b/src/include/distributed/query_utils.h @@ -34,6 +34,7 @@ extern bool ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *co /* Below two functions wrap ExtractRangeTableList function to determine the execution flow */ extern bool ExtractRangeTableRelationWalker(Node *node, List **rangeTableList); extern bool ExtractRangeTableEntryWalker(Node *node, List **rangeTableList); +extern List * SplitIntoQueries(char *concatenatedQueryString); extern bool ExtractRangeTableIndexWalker(Node *node, List **rangeTableIndexList); diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 375f4200a..f954860cd 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -489,6 +489,38 @@ BEGIN; COPY reference_table FROM STDIN; ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)') +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table VALUES(2); +NOTICE: executing the command locally: INSERT INTO local_shard_copy.ref_table_1330000 (a) VALUES (2) +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.ref_table_1330000 ref_table + count +--------------------------------------------------------------------- + 2 +(1 row) + +ROLLBACK; SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_utility_command_execution.out b/src/test/regress/expected/local_shard_utility_command_execution.out index 42ee966e4..95b1a4ccc 100644 --- a/src/test/regress/expected/local_shard_utility_command_execution.out +++ b/src/test/regress/expected/local_shard_utility_command_execution.out @@ -565,7 +565,11 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 -- would error out CREATE TABLE another_dist_table(a int); SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + COMMIT; --------------------------------------------------------------------- ------------ partitioned tables ------------- @@ -721,7 +725,7 @@ TRUNCATE partitioning_test; DROP TABLE partitioning_test; -- cleanup at exit DROP SCHEMA local_commands_test_schema CASCADE; -NOTICE: drop cascades to 16 other objects +NOTICE: drop cascades to 18 other objects DROP SCHEMA foo_schema; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index 28bdffd40..7240c62a0 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -341,6 +341,20 @@ ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); + +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table VALUES(2); + +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +ROLLBACK; + SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE;