fix null tupleStoreState case in ExecuteLocalTaskListExtended (#3711)

In case we don't care about the tupleStoreState in
ExecuteLocalTaskListExtended, it could be passed as null. In that case
we will get a seg error. This changes it so that a dummy tuple store
will be created when it is null.

Do not use local execution in ExecuteTaskListOutsideTransaction.
As we are going to run the tasks outside transaction, we shouldn't use local execution.
However, there is some problem when using local execution related to
repartition joins, when we solve that problem, we can execute the tasks
coming to this path with local execution.

Also logging the local command is simplified.

normalize job id in worker_hash_partition_table in test outputs.
pull/3720/head
SaitTalhaNisanci 2020-04-07 11:47:09 +03:00 committed by GitHub
parent a369f9001d
commit a710b3cdc5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 39 additions and 7 deletions

View File

@ -887,7 +887,13 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
TransactionProperties xactProperties = TransactionProperties xactProperties =
DecideTransactionPropertiesForTaskList(modLevel, taskList, true); DecideTransactionPropertiesForTaskList(modLevel, taskList, true);
bool localExecutionSupported = true; /*
* As we are going to run the tasks outside transaction, we shouldn't use local execution.
* However, there is some problem when using local execution related to
* repartition joins, when we solve that problem, we can execute the tasks
* coming to this path with local execution. See PR:3711
*/
bool localExecutionSupported = false;
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties, jobIdList, localExecutionSupported); &xactProperties, jobIdList, localExecutionSupported);

View File

@ -174,6 +174,11 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
numParams = paramListInfo->numParams; numParams = paramListInfo->numParams;
} }
if (tupleStoreState == NULL)
{
tupleStoreState = tuplestore_begin_heap(true, false, work_mem);
}
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
@ -186,6 +191,8 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
{ {
TransactionAccessedLocalPlacement = true; TransactionAccessedLocalPlacement = true;
} }
LogLocalCommand(task);
PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan); PlannedStmt *localPlan = GetCachedLocalPlan(task, distributedPlan);
@ -231,7 +238,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST) if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
{ {
List *queryStringList = task->taskQuery.data.queryStringList; List *queryStringList = task->taskQuery.data.queryStringList;
LogLocalCommand(task);
totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries( totalRowsProcessed += LocallyPlanAndExecuteMultipleQueries(
queryStringList, queryStringList,
tupleStoreState); tupleStoreState);
@ -256,7 +262,6 @@ ExecuteLocalTaskListExtended(List *taskList, ParamListInfo orig_paramListInfo,
localPlan = planner(shardQuery, cursorOptions, paramListInfo); localPlan = planner(shardQuery, cursorOptions, paramListInfo);
} }
LogLocalCommand(task);
char *shardQueryString = NULL; char *shardQueryString = NULL;
if (GetTaskQueryType(task) == TASK_QUERY_TEXT) if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
@ -287,10 +292,6 @@ LocallyPlanAndExecuteMultipleQueries(List *queryStrings, Tuplestorestate *tupleS
{ {
char *queryString = NULL; char *queryString = NULL;
uint64 totalProcessedRows = 0; uint64 totalProcessedRows = 0;
if (tupleStoreState == NULL)
{
tupleStoreState = tuplestore_begin_heap(true, false, work_mem);
}
foreach_ptr(queryString, queryStrings) foreach_ptr(queryString, queryStrings)
{ {
Query *shardQuery = ParseQueryString(queryString, Query *shardQuery = ParseQueryString(queryString,

View File

@ -105,6 +105,9 @@ s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('ins
# ignore job id in repartitioned insert/select # ignore job id in repartitioned insert/select
s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g
# ignore job id in worker_hash_partition_table
s/worker_hash_partition_table \([0-9]+/worker_hash_partition_table \(xxxxxxx/g
# ignore first parameter for citus_extradata_container due to differences between pg11 and pg12 # ignore first parameter for citus_extradata_container due to differences between pg11 and pg12
# can be removed when we remove PG_VERSION_NUM >= 120000 # can be removed when we remove PG_VERSION_NUM >= 120000
s/pg_catalog.citus_extradata_container\([0-9]+/pg_catalog.citus_extradata_container\(XXX/g s/pg_catalog.citus_extradata_container\([0-9]+/pg_catalog.citus_extradata_container\(XXX/g

View File

@ -188,6 +188,21 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato
100 100
(1 row) (1 row)
ROLLBACK;
BEGIN;
SET citus.enable_repartition_joins TO ON;
-- trigger local execution
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
ROLLBACK; ROLLBACK;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;

View File

@ -87,6 +87,13 @@ SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none');
SELECT count(*) FROM dist_table; SELECT count(*) FROM dist_table;
ROLLBACK; ROLLBACK;
BEGIN;
SET citus.enable_repartition_joins TO ON;
-- trigger local execution
SELECT y FROM test WHERE x = 1;
SELECT count(*) FROM test t1, test t2 WHERE t1.x = t2.y;
ROLLBACK;
DELETE FROM test; DELETE FROM test;
DROP TABLE test; DROP TABLE test;