diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 1f04751bb..34502d5c3 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1055,14 +1055,6 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) defaultTupleDest, &executionParams->xactProperties, executionParams->jobIdList, executionParams->localExecutionSupported); - /* - * If current transaction accessed local placements and task list includes - * tasks that should be executed locally (accessing any of the local placements), - * then we should error out as it would cause inconsistencies across the - * remote connection and local execution. - */ - EnsureCompatibleLocalExecutionState(execution->remoteTaskList); - /* run the remote execution */ StartDistributedExecution(execution); RunDistributedExecution(execution); diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index e1018191e..29d994e59 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -57,9 +57,6 @@ List * ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) { List *allTasks = CreateTaskListForJobTree(topLevelTasks); - - EnsureCompatibleLocalExecutionState(allTasks); - List *jobIds = ExtractJobsInJobTree(topLevelJob); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); @@ -68,25 +65,6 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) } -/* - * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have - * any visibility problems because of local execution. - */ -void -EnsureCompatibleLocalExecutionState(List *taskList) -{ - /* - * We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily - * iterating the task list in AnyTaskAccessesLocalNode. - */ - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && - AnyTaskAccessesLocalNode(taskList)) - { - ErrorIfTransactionAccessedPlacementsLocally(); - } -} - - /* * ExtractJobsInJobTree returns all job ids in the job tree * where the given job is root. diff --git a/src/include/distributed/repartition_join_execution.h b/src/include/distributed/repartition_join_execution.h index 3f6be8068..7f2b648cf 100644 --- a/src/include/distributed/repartition_join_execution.h +++ b/src/include/distributed/repartition_join_execution.h @@ -13,7 +13,6 @@ #include "nodes/pg_list.h" extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob); -extern void EnsureCompatibleLocalExecutionState(List *taskList); #endif /* REPARTITION_JOIN_EXECUTION_H */ diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 781a9c86c..5cb61b2d5 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -91,6 +91,7 @@ ALTER TABLE abcd DROP COLUMN a; -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; +SET citus.enable_unique_job_ids TO off; -- returns true of the distribution key filter -- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard -- placement which is local to this not @@ -733,9 +734,51 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_0,repartition_65_2_0,repartition_65_3_0,repartition_65_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_0,repartition_66_2_0,repartition_66_3_0,repartition_66_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_1,repartition_65_2_1,repartition_65_3_1,repartition_65_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_1,repartition_66_2_1,repartition_66_3_1,repartition_66_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_2,repartition_65_2_2,repartition_65_3_2,repartition_65_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_2,repartition_66_2_2,repartition_66_3_2,repartition_66_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_3,repartition_65_2_3,repartition_65_3_3,repartition_65_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_3,repartition_66_2_3,repartition_66_3_3,repartition_66_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true + count +--------------------------------------------------------------------- + 2 +(1 row) + ROLLBACK; -- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; diff --git a/src/test/regress/expected/local_shard_execution_replicated.out b/src/test/regress/expected/local_shard_execution_replicated.out index 731c825c3..a67ca29f1 100644 --- a/src/test/regress/expected/local_shard_execution_replicated.out +++ b/src/test/regress/expected/local_shard_execution_replicated.out @@ -693,9 +693,55 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_2' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_2','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500002 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_4','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500004 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_2' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_2','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500002 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_4','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500004 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_0']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_1']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_1']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_2']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_2']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_3']::text[],'localhost',57637) bytes +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_3']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_0,repartition_3940832881147968_2_0,repartition_3940832881147968_3_0,repartition_3940832881147968_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_0,repartition_3940832881147969_2_0,repartition_3940832881147969_3_0,repartition_3940832881147969_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_1,repartition_3940832881147968_2_1,repartition_3940832881147968_3_1,repartition_3940832881147968_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_1,repartition_3940832881147969_2_1,repartition_3940832881147969_3_1,repartition_3940832881147969_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_2,repartition_3940832881147968_2_2,repartition_3940832881147968_3_2,repartition_3940832881147968_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_2,repartition_3940832881147969_2_2,repartition_3940832881147969_3_2,repartition_3940832881147969_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_3,repartition_3940832881147968_2_3,repartition_3940832881147968_3_3,repartition_3940832881147968_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_3,repartition_3940832881147969_2_3,repartition_3940832881147969_3_3,repartition_3940832881147969_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true + count +--------------------------------------------------------------------- + 2 +(1 row) + ROLLBACK; -- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index c7dad6f00..b68863a7f 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -67,6 +67,7 @@ ALTER TABLE abcd DROP COLUMN a; -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_execution; +SET citus.enable_unique_job_ids TO off; -- returns true of the distribution key filter -- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard