Enable re-partition joins after local execution

disable_normal_joins
Marco Slot 2021-11-10 09:42:59 +01:00
parent 143b01c8b1
commit fbc0d3eda3
6 changed files with 96 additions and 37 deletions

View File

@ -1055,14 +1055,6 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
/*
* If current transaction accessed local placements and task list includes
* tasks that should be executed locally (accessing any of the local placements),
* then we should error out as it would cause inconsistencies across the
* remote connection and local execution.
*/
EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
/* run the remote execution */
StartDistributedExecution(execution);
RunDistributedExecution(execution);

View File

@ -57,9 +57,6 @@ List *
ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
{
List *allTasks = CreateTaskListForJobTree(topLevelTasks);
EnsureCompatibleLocalExecutionState(allTasks);
List *jobIds = ExtractJobsInJobTree(topLevelJob);
ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds);
@ -68,25 +65,6 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
}
/*
* EnsureCompatibleLocalExecutionState makes sure that the tasks won't have
* any visibility problems because of local execution.
*/
void
EnsureCompatibleLocalExecutionState(List *taskList)
{
/*
* We have LOCAL_EXECUTION_REQUIRED check here to avoid unnecessarily
* iterating the task list in AnyTaskAccessesLocalNode.
*/
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
AnyTaskAccessesLocalNode(taskList))
{
ErrorIfTransactionAccessedPlacementsLocally();
}
}
/*
* ExtractJobsInJobTree returns all job ids in the job tree
* where the given job is root.

View File

@ -13,7 +13,6 @@
#include "nodes/pg_list.h"
extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob);
extern void EnsureCompatibleLocalExecutionState(List *taskList);
#endif /* REPARTITION_JOIN_EXECUTION_H */

View File

@ -91,6 +91,7 @@ ALTER TABLE abcd DROP COLUMN a;
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_execution;
SET citus.enable_unique_job_ids TO off;
-- returns true of the distribution key filter
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
-- placement which is local to this not
@ -733,9 +734,51 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
(1 row)
SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age);
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_65_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_65_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_1','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_66_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_66_3','SELECT age AS column1 FROM local_shard_execution.distributed_table_1470003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_1_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_2_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_3_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_65_4_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_1_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_2_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_3_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_66_4_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_0,repartition_65_2_0,repartition_65_3_0,repartition_65_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_0,repartition_66_2_0,repartition_66_3_0,repartition_66_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_1,repartition_65_2_1,repartition_65_3_1,repartition_65_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_1,repartition_66_2_1,repartition_66_3_1,repartition_66_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_2,repartition_65_2_2,repartition_65_3_2,repartition_65_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_2,repartition_66_2_2,repartition_66_3_2,repartition_66_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_65_1_3,repartition_65_2_3,repartition_65_3_3,repartition_65_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_66_1_3,repartition_66_2_3,repartition_66_3_3,repartition_66_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
count
---------------------------------------------------------------------
2
(1 row)
ROLLBACK;
-- a local query is followed by an INSERT..SELECT with re-partitioning
BEGIN;

View File

@ -693,9 +693,55 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
(1 row)
SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age);
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_2' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_2','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500002 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147968_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147968_4','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500004 d1 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_1' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_1','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500001 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_2' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_2','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500002 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_3' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_3','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500003 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartition_3940832881147969_4' || '_' || partition_index::text , rows_written FROM pg_catalog.worker_partition_query_result('repartition_3940832881147969_4','SELECT age AS column1 FROM local_shard_execution_replicated.distributed_table_1500004 d2 WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true,true,true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_0']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_1']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_1']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_2']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_2']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_1_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_2_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_3_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147968_4_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_1_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_2_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_3_3']::text[],'localhost',57637) bytes
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartition_3940832881147969_4_3']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_0,repartition_3940832881147968_2_0,repartition_3940832881147968_3_0,repartition_3940832881147968_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_0,repartition_3940832881147969_2_0,repartition_3940832881147969_3_0,repartition_3940832881147969_4_0}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_1,repartition_3940832881147968_2_1,repartition_3940832881147968_3_1,repartition_3940832881147968_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_1,repartition_3940832881147969_2_1,repartition_3940832881147969_3_1,repartition_3940832881147969_4_1}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_2,repartition_3940832881147968_2_2,repartition_3940832881147968_3_2,repartition_3940832881147968_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_2,repartition_3940832881147969_2_2,repartition_3940832881147969_3_2,repartition_3940832881147969_4_2}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM (read_intermediate_results('{repartition_3940832881147968_1_3,repartition_3940832881147968_2_3,repartition_3940832881147968_3_3,repartition_3940832881147968_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result(column1 bigint) JOIN read_intermediate_results('{repartition_3940832881147969_1_3,repartition_3940832881147969_2_3,repartition_3940832881147969_3_3,repartition_3940832881147969_4_3}'::text[], 'binary'::citus_copy_format) intermediate_result_1(column1 bigint) ON ((intermediate_result.column1 OPERATOR(pg_catalog.=) intermediate_result_1.column1))) WHERE true
count
---------------------------------------------------------------------
2
(1 row)
ROLLBACK;
-- a local query is followed by an INSERT..SELECT with re-partitioning
BEGIN;

View File

@ -67,6 +67,7 @@ ALTER TABLE abcd DROP COLUMN a;
-- connection worker and get ready for the tests
\c - - - :worker_1_port
SET search_path TO local_shard_execution;
SET citus.enable_unique_job_ids TO off;
-- returns true of the distribution key filter
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard