From 0aebd78ea7f85bd65414f1328d5e8ffa84bc62da Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 17 Mar 2020 13:37:50 +0300 Subject: [PATCH] use localExecution in ExecuteTaskListExtended ExecuteTaskListExtended is the common method for different codepaths, and instead of writing separate local execution logics in different codepaths, it makes more sense to have the logic here. We still need to do some refactoring, this is an initial step. After this commit, we can run create shard commands locally. There is a special case with shard creation commands. A create shard command might have a concatenated query string, however local execution did not know how to execute a task with multiple query strings. This is also implemented in this commit. We go over each query in the concatenated query string and plan/execute them one by one. A more clean solution to this would be to make sure that each task has a single query. We currently cannot do that because we need to ensure the task dependencies. However, it would make sense to do that at some point and it would simplify the code a lot. --- src/backend/distributed/commands/call.c | 3 +- .../distributed/executor/adaptive_executor.c | 48 +++++++++--- .../distributed_intermediate_results.c | 9 ++- .../executor/insert_select_executor.c | 18 +++-- .../distributed/executor/local_executor.c | 75 +++++++++---------- .../master/master_stage_protocol.c | 4 +- .../distributed/planner/local_plan_cache.c | 2 +- src/include/distributed/adaptive_executor.h | 2 +- src/include/distributed/local_executor.h | 3 - src/include/distributed/multi_executor.h | 5 +- .../expected/coordinator_shouldhaveshards.out | 35 +++++++++ .../regress/expected/local_shard_copy.out | 32 ++++++++ .../local_shard_utility_command_execution.out | 9 ++- .../sql/coordinator_shouldhaveshards.sql | 15 ++++ src/test/regress/sql/local_shard_copy.sql | 14 ++++ 15 files changed, 203 insertions(+), 71 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index fa30093d5..45f34bd7b 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -186,10 +186,11 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, .requires2PC = false }; + bool localExecutionSupported = true; ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, MaxAdaptiveExecutorPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 5640a31b6..5d46bbf7d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -861,7 +861,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupporte /* execute remote tasks if any */ if (list_length(remoteTaskList) > 0) { - ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize); + /* + * We already executed tasks locally. We should ideally remove this method and + * let ExecuteTaskListExtended handle the local execution. + */ + localExecutionSupported = false; + ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize, + localExecutionSupported); } } @@ -881,10 +887,10 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(modLevel, taskList, true); - + bool localExecutionSupported = true; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, jobIdList); + &xactProperties, jobIdList, localExecutionSupported); } @@ -893,7 +899,8 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, * for some of the arguments. */ uint64 -ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool + localExecutionSupported) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; @@ -904,7 +911,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); } @@ -921,10 +928,10 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( modLevel, taskList, false); - + bool localExecutionSupported = true; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); } @@ -936,9 +943,28 @@ uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties, List *jobIdList) + TransactionProperties *xactProperties, + List *jobIdList, + bool localExecutionSupported) { ParamListInfo paramListInfo = NULL; + uint64 locallyProcessedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + + if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, tupleStore); + } + else + { + remoteTaskList = taskList; + } /* * If current transaction accessed local placements and task list includes @@ -946,7 +972,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } @@ -957,7 +983,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, } DistributedExecution *execution = - CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, + CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize, xactProperties, jobIdList); @@ -965,7 +991,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, RunDistributedExecution(execution); FinishDistributedExecution(execution); - return execution->rowsProcessed; + return execution->rowsProcessed + locallyProcessedRows; } diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index e2ab2ce9e..b874f9018 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -414,9 +414,16 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + /* + * Local execution is not supported because here we use perPlacementQueryStrings. + * Local execution does not know how to handle it. One solution is to extract and set + * queryStringLazy from perPlacementQueryStrings. The extracted one should be the + * query string for the local placement. + */ + bool localExecutionSupported = false; ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, resultStore, hasReturning, targetPoolSize, &xactProperties, - NIL); + NIL, localExecutionSupported); return resultStore; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index a982213c8..a0a49beb6 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -270,11 +270,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, - taskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, + taskList, tupleDescriptor, + scanState->tuplestorestate, + hasReturning); executorState->es_processed = rowsInserted; } @@ -331,9 +331,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDescriptor, scanState->tuplestorestate, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bd509748a..4376a812c 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -81,6 +81,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" +#include "distributed/query_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -116,6 +117,8 @@ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo); static void LogLocalCommand(Task *task); +static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, + Tuplestorestate *tupleStoreState); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -221,10 +224,25 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, taskParameterTypes = NULL; } + /* + * for concatenated strings, we set queryStringList so that we can access + * each query string. + */ + if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + { + List *queryStringList = task->taskQuery.data.queryStringList; + LogLocalCommand(task); + totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( + queryStringList, + tupleStoreState); + return totalRowsProcessed; + } + Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), taskParameterTypes, taskNumParams); + int cursorOptions = 0; /* @@ -261,50 +279,31 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, /* - * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks - * if local execution can be used and executes them. + * LocallyPlanAndExecuteMultipleQueries plans and executes the given query strings + * one by one. */ -uint64 -ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel rowModifyLevel, bool - hasReturning) +static uint64 +LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState) { - uint64 processedRows = 0; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - - if (ShouldExecuteTasksLocally(taskList)) + char *queryString = NULL; + uint64 totalProcessedRows = 0; + if (tupleStoreState == NULL) { - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - EState *estate = ScanStateGetExecutorState(scanState); - processedRows += ExecuteLocalTaskListExtended( - localTaskList, - estate->es_param_list_info, - scanState->distributedPlan, - scanState->tuplestorestate - ); + tupleStoreState = tuplestore_begin_heap(true, false, work_mem); } - else + foreach_ptr(queryString, queryStrings) { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; + Query *shardQuery = ParseQueryString(queryString, + NULL, + 0); + int cursorOptions = 0; + ParamListInfo paramListInfo = NULL; + PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); + totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, + tupleStoreState, + paramListInfo); } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); - } - - return processedRows; + return totalProcessedRows; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 34b6c4ad8..32d8d805a 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -546,8 +546,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ poolSize = MaxAdaptiveExecutorPoolSize; } - - ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize); + bool localExecutionSupported = true; + ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, localExecutionSupported); } diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index 582f34597..cfac1abf1 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -113,7 +113,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { - if (distributedPlan->workerJob == NULL) + if (distributedPlan == NULL || distributedPlan->workerJob == NULL) { return NULL; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index b55271c4d..3dd804e06 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -11,7 +11,7 @@ extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, - int targetPoolSize); + int targetPoolSize, bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index f4301a888..6a3d1ed34 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,9 +21,6 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ -extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel - rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 448158d59..50bb8c53b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -79,15 +79,14 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, TransactionProperties *xactProperties, - List *jobIdList); + List *jobIdList, + bool localExecutionSupported); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported); -extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int - targetPoolSize); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 023f58720..908e645c8 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -121,6 +121,41 @@ SELECT y FROM test WHERE x = 1; (1 row) END; +SET citus.log_remote_commands TO ON; +BEGIN; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- + 1 +(1 row) + +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, 5283, '2020-03-31 12:54:05.803455-07'); +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(0, 5283, '2020-03-31 12:54:05.803455-07'); +DETAIL: on server postgres@localhost:xxxxx connectionId: 5 +NOTICE: issuing SELECT worker_apply_shard_ddl_command (1503005, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503005, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: issuing SELECT worker_apply_shard_ddl_command (1503006, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503006, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +DETAIL: on server postgres@localhost:xxxxx connectionId: 5 +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... +NOTICE: executing the copy locally for shard xxxxx +NOTICE: issuing COPY coordinator_shouldhaveshards.dist_table_1503005 (a) FROM STDIN WITH (format 'binary') +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 +NOTICE: issuing COPY coordinator_shouldhaveshards.dist_table_1503006 (a) FROM STDIN WITH (format 'binary') +DETAIL: on server postgres@localhost:xxxxx connectionId: 5 +ERROR: could not open relation with OID 0 +SELECT count(*) FROM dist_table; +ERROR: current transaction is aborted, commands ignored until end of transaction block +END; +SET citus.log_remote_commands TO OFF; DELETE FROM test; DROP TABLE test; DROP SCHEMA coordinator_shouldhaveshards CASCADE; diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 375f4200a..5feeb5935 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -489,6 +489,38 @@ BEGIN; COPY reference_table FROM STDIN; ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table VALUES(2); +NOTICE: executing the command locally: INSERT INTO local_shard_copy.ref_table_1330000 (a) VALUES (2) +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.ref_table_1330000 ref_table + count +--------------------------------------------------------------------- + 2 +(1 row) + +ROLLBACK; SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_utility_command_execution.out b/src/test/regress/expected/local_shard_utility_command_execution.out index 42ee966e4..b988f125c 100644 --- a/src/test/regress/expected/local_shard_utility_command_execution.out +++ b/src/test/regress/expected/local_shard_utility_command_execution.out @@ -565,7 +565,12 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 -- would error out CREATE TABLE another_dist_table(a int); SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + COMMIT; --------------------------------------------------------------------- ------------ partitioned tables ------------- @@ -721,7 +726,7 @@ TRUNCATE partitioning_test; DROP TABLE partitioning_test; -- cleanup at exit DROP SCHEMA local_commands_test_schema CASCADE; -NOTICE: drop cascades to 16 other objects +NOTICE: drop cascades to 18 other objects DROP SCHEMA foo_schema; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 66979a7c8..f978c2de5 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -60,6 +60,21 @@ ALTER TABLE test DROP COLUMN z; SELECT y FROM test WHERE x = 1; END; +SET citus.log_remote_commands TO ON; +BEGIN; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +SELECT count(*) FROM dist_table; +END; + +SET citus.log_remote_commands TO OFF; + + + DELETE FROM test; DROP TABLE test; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index 28bdffd40..7240c62a0 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -341,6 +341,20 @@ ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); + +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table VALUES(2); + +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +ROLLBACK; + SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE;