diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index fab5e355e..330f2a410 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -887,7 +887,13 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(modLevel, taskList, true); - bool localExecutionSupported = true; + /* + * As we are going to run the tasks outside transaction, we shouldn't use local execution. + * However, there is some problem when using local execution related to + * repartition joins, when we solve that problem, we can execute the tasks + * coming to this path with local execution. See PR:3711 + */ + bool localExecutionSupported = false; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, &xactProperties, jobIdList, localExecutionSupported); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 2ac88d173..52c720060 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -174,6 +174,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, numParams = paramListInfo->numParams; } + if (tupleStoreState == NULL) + { + tupleStoreState = tuplestore_begin_heap(true, false, work_mem); + } + Task *task = NULL; foreach_ptr(task, taskList) { @@ -186,6 +191,8 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, { TransactionAccessedLocalPlacement = true; } + LogLocalCommand(task); + PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); @@ -231,7 +238,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; - LogLocalCommand(task); totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( queryStringList, tupleStoreState); @@ -256,7 +262,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, localPlan = planner(shardQuery, cursorOptions, paramListInfo); } - LogLocalCommand(task); char *shardQueryString = NULL; if (GetTaskQueryType(task) == TASK_QUERY_TEXT) @@ -287,10 +292,6 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS { char *queryString = NULL; uint64 totalProcessedRows = 0; - if (tupleStoreState == NULL) - { - tupleStoreState = tuplestore_begin_heap(true, false, work_mem); - } foreach_ptr(queryString, queryStrings) { Query *shardQuery = ParseQueryString(queryString, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 6d1cf8a99..872f090ca 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -105,6 +105,9 @@ s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('ins # ignore job id in repartitioned insert/select s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g +# ignore job id in worker_hash_partition_table +s/worker_hash_partition_table \([0-9]+/worker_hash_partition_table \(xxxxxxx/g + # ignore first parameter for citus_extradata_container due to differences between pg11 and pg12 # can be removed when we remove PG_VERSION_NUM >= 120000 s/pg_catalog.citus_extradata_container\([0-9]+/pg_catalog.citus_extradata_container\(XXX/g diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 2c56fc580..ad66d7e90 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -188,6 +188,21 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato 100 (1 row) +ROLLBACK; +BEGIN; +SET citus.enable_repartition_joins TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; DELETE FROM test; DROP TABLE test; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 60fe634c6..741c2076a 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -87,6 +87,13 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); SELECT count(*) FROM dist_table; ROLLBACK; +BEGIN; +SET citus.enable_repartition_joins TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +ROLLBACK; + DELETE FROM test; DROP TABLE test;