diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index fa30093d5..45f34bd7b 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -186,10 +186,11 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, .requires2PC = false }; + bool localExecutionSupported = true; ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, MaxAdaptiveExecutorPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 5640a31b6..5d46bbf7d 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -861,7 +861,13 @@ ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupporte /* execute remote tasks if any */ if (list_length(remoteTaskList) > 0) { - ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize); + /* + * We already executed tasks locally. We should ideally remove this method and + * let ExecuteTaskListExtended handle the local execution. + */ + localExecutionSupported = false; + ExecuteTaskList(rowModifyLevel, remoteTaskList, MaxAdaptiveExecutorPoolSize, + localExecutionSupported); } } @@ -881,10 +887,10 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(modLevel, taskList, true); - + bool localExecutionSupported = true; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, jobIdList); + &xactProperties, jobIdList, localExecutionSupported); } @@ -893,7 +899,8 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, * for some of the arguments. */ uint64 -ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) +ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize, bool + localExecutionSupported) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; @@ -904,7 +911,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); } @@ -921,10 +928,10 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( modLevel, taskList, false); - + bool localExecutionSupported = true; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties, NIL); + &xactProperties, NIL, localExecutionSupported); } @@ -936,9 +943,28 @@ uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties, List *jobIdList) + TransactionProperties *xactProperties, + List *jobIdList, + bool localExecutionSupported) { ParamListInfo paramListInfo = NULL; + uint64 locallyProcessedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + + if (localExecutionSupported && ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + locallyProcessedRows += ExecuteLocalTaskList(localTaskList, tupleStore); + } + else + { + remoteTaskList = taskList; + } /* * If current transaction accessed local placements and task list includes @@ -946,7 +972,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(taskList)) + if (TransactionAccessedLocalPlacement && AnyTaskAccessesLocalNode(remoteTaskList)) { ErrorIfTransactionAccessedPlacementsLocally(); } @@ -957,7 +983,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, } DistributedExecution *execution = - CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, + CreateDistributedExecution(modLevel, remoteTaskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize, xactProperties, jobIdList); @@ -965,7 +991,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, RunDistributedExecution(execution); FinishDistributedExecution(execution); - return execution->rowsProcessed; + return execution->rowsProcessed + locallyProcessedRows; } diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index e2ab2ce9e..b874f9018 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -414,9 +414,16 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, Tuplestorestate *resultStore = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + /* + * Local execution is not supported because here we use perPlacementQueryStrings. + * Local execution does not know how to handle it. One solution is to extract and set + * queryStringLazy from perPlacementQueryStrings. The extracted one should be the + * query string for the local placement. + */ + bool localExecutionSupported = false; ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, resultStore, hasReturning, targetPoolSize, &xactProperties, - NIL); + NIL, localExecutionSupported); return resultStore; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index a982213c8..a0a49beb6 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -270,11 +270,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, - taskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, + taskList, tupleDescriptor, + scanState->tuplestorestate, + hasReturning); executorState->es_processed = rowsInserted; } @@ -331,9 +331,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDescriptor, scanState->tuplestorestate, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bd509748a..2ac88d173 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -81,6 +81,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/citus_custom_scan.h" #include "distributed/citus_ruleutils.h" +#include "distributed/query_utils.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -116,6 +117,8 @@ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, Tuplestorestate *tupleStoreState, ParamListInfo paramListInfo); static void LogLocalCommand(Task *task); +static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, + Tuplestorestate *tupleStoreState); static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -221,10 +224,25 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, taskParameterTypes = NULL; } + /* + * for concatenated strings, we set queryStringList so that we can access + * each query string. + */ + if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) + { + List *queryStringList = task->taskQuery.data.queryStringList; + LogLocalCommand(task); + totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( + queryStringList, + tupleStoreState); + continue; + } + Query *shardQuery = ParseQueryString(TaskQueryStringForAllPlacements(task), taskParameterTypes, taskNumParams); + int cursorOptions = 0; /* @@ -261,50 +279,31 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, /* - * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks - * if local execution can be used and executes them. + * LocallyPlanAndExecuteMultipleQueries plans and executes the given query strings + * one by one. */ -uint64 -ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel rowModifyLevel, bool - hasReturning) +static uint64 +LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleStoreState) { - uint64 processedRows = 0; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - - if (ShouldExecuteTasksLocally(taskList)) + char *queryString = NULL; + uint64 totalProcessedRows = 0; + if (tupleStoreState == NULL) { - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - EState *estate = ScanStateGetExecutorState(scanState); - processedRows += ExecuteLocalTaskListExtended( - localTaskList, - estate->es_param_list_info, - scanState->distributedPlan, - scanState->tuplestorestate - ); + tupleStoreState = tuplestore_begin_heap(true, false, work_mem); } - else + foreach_ptr(queryString, queryStrings) { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; + Query *shardQuery = ParseQueryString(queryString, + NULL, + 0); + int cursorOptions = 0; + ParamListInfo paramListInfo = NULL; + PlannedStmt *localPlan = planner(shardQuery, cursorOptions, paramListInfo); + totalProcessedRows += ExecuteLocalTaskPlan(localPlan, queryString, + tupleStoreState, + paramListInfo); } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); - } - - return processedRows; + return totalProcessedRows; } diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 34b6c4ad8..32d8d805a 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -546,8 +546,8 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, */ poolSize = MaxAdaptiveExecutorPoolSize; } - - ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize); + bool localExecutionSupported = true; + ExecuteTaskList(ROW_MODIFY_NONE, taskList, poolSize, localExecutionSupported); } diff --git a/src/backend/distributed/planner/local_plan_cache.c b/src/backend/distributed/planner/local_plan_cache.c index 582f34597..cfac1abf1 100644 --- a/src/backend/distributed/planner/local_plan_cache.c +++ b/src/backend/distributed/planner/local_plan_cache.c @@ -113,7 +113,7 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { - if (distributedPlan->workerJob == NULL) + if (distributedPlan == NULL || distributedPlan->workerJob == NULL) { return NULL; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index b55271c4d..3dd804e06 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -11,7 +11,7 @@ extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, - int targetPoolSize); + int targetPoolSize, bool localExecutionSupported); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int targetPoolSize, List *jobIdList); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index f4301a888..6a3d1ed34 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,9 +21,6 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ -extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel - rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(List *taskList, Tuplestorestate *tupleStoreState); extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 448158d59..50bb8c53b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -79,15 +79,14 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, TransactionProperties *xactProperties, - List *jobIdList); + List *jobIdList, + bool localExecutionSupported); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning); extern void ExecuteUtilityTaskListWithoutResults(List *taskList, bool localExecutionSupported); -extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int - targetPoolSize); extern bool IsCitusCustomState(PlanState *planState); extern TupleTableSlot * CitusExecScan(CustomScanState *node); extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 3e1b2d856..dd419886a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -83,6 +83,9 @@ s/_id_other_column_ref_fkey/_id_fkey/g # intermediate_results s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g +# assign_distributed_transaction id params +s/(NOTICE.*)assign_distributed_transaction_id\([0-9]+, [0-9]+, '.*'\)/\1assign_distributed_transaction_id\(xx, xx, 'xxxxxxx'\)/g + # toast tables s/pg_toast_[0-9]+/pg_toast_xxxxx/g diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 023f58720..2c56fc580 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -121,9 +121,78 @@ SELECT y FROM test WHERE x = 1; (1 row) END; +SET citus.shard_count TO 6; +SET citus.log_remote_commands TO OFF; +BEGIN; +SET citus.log_local_commands TO ON; +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); +-- trigger local execution +SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- + 1 +(1 row) + +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... +NOTICE: executing the copy locally for shard xxxxx + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM dist_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503004 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503007 dist_table WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +ROLLBACK; +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); +BEGIN; +SET citus.log_local_commands TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- + 1 +(1 row) + +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503010, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503010, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503013, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1503013, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... +NOTICE: executing the copy locally for shard xxxxx + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM dist_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503010 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503013 dist_table WHERE true + count +--------------------------------------------------------------------- + 100 +(1 row) + +ROLLBACK; DELETE FROM test; DROP TABLE test; DROP SCHEMA coordinator_shouldhaveshards CASCADE; +NOTICE: drop cascades to table dist_table SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 375f4200a..5feeb5935 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -489,6 +489,38 @@ BEGIN; COPY reference_table FROM STDIN; ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.reference_table_1570000 reference_table + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') +NOTICE: executing the copy locally for shard xxxxx +NOTICE: Copying data from local table... + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO ref_table VALUES(2); +NOTICE: executing the command locally: INSERT INTO local_shard_copy.ref_table_1330000 (a) VALUES (2) +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.ref_table_1330000 ref_table + count +--------------------------------------------------------------------- + 2 +(1 row) + +ROLLBACK; SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_utility_command_execution.out b/src/test/regress/expected/local_shard_utility_command_execution.out index 42ee966e4..a35869532 100644 --- a/src/test/regress/expected/local_shard_utility_command_execution.out +++ b/src/test/regress/expected/local_shard_utility_command_execution.out @@ -565,7 +565,22 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 -- would error out CREATE TABLE another_dist_table(a int); SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table'); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500136, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500136, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500139, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500139, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500142, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500142, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500145, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500145, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500148, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500148, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500151, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500151, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500154, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500154, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500157, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500157, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500160, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500160, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500163, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500163, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + COMMIT; --------------------------------------------------------------------- ------------ partitioned tables ------------- @@ -721,7 +736,7 @@ TRUNCATE partitioning_test; DROP TABLE partitioning_test; -- cleanup at exit DROP SCHEMA local_commands_test_schema CASCADE; -NOTICE: drop cascades to 16 other objects +NOTICE: drop cascades to 28 other objects DROP SCHEMA foo_schema; SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 66979a7c8..60fe634c6 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -60,6 +60,33 @@ ALTER TABLE test DROP COLUMN z; SELECT y FROM test WHERE x = 1; END; + +SET citus.shard_count TO 6; +SET citus.log_remote_commands TO OFF; + +BEGIN; +SET citus.log_local_commands TO ON; +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); +-- trigger local execution +SELECT y FROM test WHERE x = 1; +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +SELECT count(*) FROM dist_table; +ROLLBACK; + +CREATE TABLE dist_table (a int); +INSERT INTO dist_table SELECT * FROM generate_series(1, 100); + +BEGIN; +SET citus.log_local_commands TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +-- this should be run locally +SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); +SELECT count(*) FROM dist_table; +ROLLBACK; + DELETE FROM test; DROP TABLE test; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index 28bdffd40..7240c62a0 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -341,6 +341,20 @@ ROLLBACK; SET citus.enable_local_execution = 'on'; +CREATE TABLE ref_table(a int); +INSERT INTO ref_table VALUES(1); + +BEGIN; +-- trigger local execution +SELECT COUNT(*) FROM reference_table; +-- shard creation should be done locally +SELECT create_reference_table('ref_table'); +INSERT INTO ref_table VALUES(2); + +-- verify that it worked. +SELECT COUNT(*) FROM ref_table; +ROLLBACK; + SET client_min_messages TO ERROR; SET search_path TO public; DROP SCHEMA local_shard_copy CASCADE;