From a710b3cdc5c60c449eb9056019f59defcc28a62c Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 7 Apr 2020 11:47:09 +0300 Subject: [PATCH] fix null tupleStoreState case in ExecuteLocalTaskListExtended (#3711) In case we don't care about the tupleStoreState in ExecuteLocalTaskListExtended, it could be passed as null. In that case we will get a seg error. This changes it so that a dummy tuple store will be created when it is null. Do not use local execution in ExecuteTaskListOutsideTransaction. As we are going to run the tasks outside transaction, we shouldn't use local execution. However, there is some problem when using local execution related to repartition joins, when we solve that problem, we can execute the tasks coming to this path with local execution. Also logging the local command is simplified. normalize job id in worker_hash_partition_table in test outputs. --- .../distributed/executor/adaptive_executor.c | 8 +++++++- src/backend/distributed/executor/local_executor.c | 13 +++++++------ src/test/regress/bin/normalize.sed | 3 +++ .../expected/coordinator_shouldhaveshards.out | 15 +++++++++++++++ .../regress/sql/coordinator_shouldhaveshards.sql | 7 +++++++ 5 files changed, 39 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index fab5e355e..330f2a410 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -887,7 +887,13 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(modLevel, taskList, true); - bool localExecutionSupported = true; + /* + * As we are going to run the tasks outside transaction, we shouldn't use local execution. + * However, there is some problem when using local execution related to + * repartition joins, when we solve that problem, we can execute the tasks + * coming to this path with local execution. See PR:3711 + */ + bool localExecutionSupported = false; return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, &xactProperties, jobIdList, localExecutionSupported); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 2ac88d173..52c720060 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -174,6 +174,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, numParams = paramListInfo->numParams; } + if (tupleStoreState == NULL) + { + tupleStoreState = tuplestore_begin_heap(true, false, work_mem); + } + Task *task = NULL; foreach_ptr(task, taskList) { @@ -186,6 +191,8 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, { TransactionAccessedLocalPlacement = true; } + LogLocalCommand(task); + PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); @@ -231,7 +238,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) { List *queryStringList = task->taskQuery.data.queryStringList; - LogLocalCommand(task); totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( queryStringList, tupleStoreState); @@ -256,7 +262,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo, localPlan = planner(shardQuery, cursorOptions, paramListInfo); } - LogLocalCommand(task); char *shardQueryString = NULL; if (GetTaskQueryType(task) == TASK_QUERY_TEXT) @@ -287,10 +292,6 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS { char *queryString = NULL; uint64 totalProcessedRows = 0; - if (tupleStoreState == NULL) - { - tupleStoreState = tuplestore_begin_heap(true, false, work_mem); - } foreach_ptr(queryString, queryStrings) { Query *shardQuery = ParseQueryString(queryString, diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 6d1cf8a99..872f090ca 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -105,6 +105,9 @@ s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('ins # ignore job id in repartitioned insert/select s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g +# ignore job id in worker_hash_partition_table +s/worker_hash_partition_table \([0-9]+/worker_hash_partition_table \(xxxxxxx/g + # ignore first parameter for citus_extradata_container due to differences between pg11 and pg12 # can be removed when we remove PG_VERSION_NUM >= 120000 s/pg_catalog.citus_extradata_container\([0-9]+/pg_catalog.citus_extradata_container\(XXX/g diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 2c56fc580..ad66d7e90 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -188,6 +188,21 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato 100 (1 row) +ROLLBACK; +BEGIN; +SET citus.enable_repartition_joins TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) + y +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; DELETE FROM test; DROP TABLE test; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 60fe634c6..741c2076a 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -87,6 +87,13 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); SELECT count(*) FROM dist_table; ROLLBACK; +BEGIN; +SET citus.enable_repartition_joins TO ON; +-- trigger local execution +SELECT y FROM test WHERE x = 1; +SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y; +ROLLBACK; + DELETE FROM test; DROP TABLE test;