From 5de3337b2fe2bc16125714d19b266529a84fec3c Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 6 Jan 2021 10:41:34 +0100 Subject: [PATCH] Support local execution for INSERT..SELECT with re-partitioning --- .../distributed_intermediate_results.c | 24 ++-- .../executor/insert_select_executor.c | 5 + .../distributed/executor/local_executor.c | 22 +++- src/include/distributed/local_executor.h | 5 - .../expected/coordinator_shouldhaveshards.out | 124 ++++++++++++------ .../expected/local_shard_execution.out | 29 ++-- .../multi_mx_insert_select_repartition.out | 37 +++++- src/test/regress/expected/single_node.out | 24 ++++ .../sql/coordinator_shouldhaveshards.sql | 22 +++- .../regress/sql/local_shard_execution.sql | 11 +- .../multi_mx_insert_select_repartition.sql | 6 +- src/test/regress/sql/single_node.sql | 8 ++ 12 files changed, 231 insertions(+), 86 deletions(-) diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 699a556a0..0c9bcdff9 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -48,6 +48,9 @@ typedef struct PartitioningTupleDest CitusTableCacheEntry *targetRelation; + /* MemoryContext in which we add new fragments */ + MemoryContext fragmentContext; + /* list of DistributedResultFragment pointer */ List *fragmentList; @@ -236,7 +239,6 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, char *partitionMethodString = targetRelation->partitionMethod == 'h' ? "hash" : "range"; const char *binaryFormatString = binaryFormat ? "true" : "false"; - List *perPlacementQueries = NIL; Task *wrappedSelectTask = copyObject(selectTask); @@ -254,7 +256,6 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, binaryFormatString); - perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); SetTaskQueryString(wrappedSelectTask, wrappedQuery->data); wrappedTaskList = lappend(wrappedTaskList, wrappedSelectTask); @@ -291,6 +292,7 @@ CreatePartitioningTupleDest(CitusTableCacheEntry *targetRelation) PartitioningTupleDest *tupleDest = palloc0(sizeof(PartitioningTupleDest)); tupleDest->targetRelation = targetRelation; tupleDest->tupleDesc = tupleDescriptor; + tupleDest->fragmentContext = CurrentMemoryContext; tupleDest->pub.putTuple = PartitioningTupleDestPutTuple; tupleDest->pub.tupleDescForQuery = PartitioningTupleDestTupleDescForQuery; @@ -309,13 +311,23 @@ PartitioningTupleDestPutTuple(TupleDestination *self, Task *task, HeapTuple heapTuple, uint64 tupleLibpqSize) { PartitioningTupleDest *tupleDest = (PartitioningTupleDest *) self; + ShardPlacement *placement = list_nth(task->taskPlacementList, placementIndex); + /* + * We may be deep inside a nested execution, make sure we can use the + * fragment list at the top. + */ + MemoryContext oldContext = MemoryContextSwitchTo(tupleDest->fragmentContext); + DistributedResultFragment *fragment = TupleToDistributedResultFragment(heapTuple, tupleDest->tupleDesc, tupleDest->targetRelation, placement->nodeId); + tupleDest->fragmentList = lappend(tupleDest->fragmentList, fragment); + + MemoryContextSwitchTo(oldContext); } @@ -477,13 +489,7 @@ ExecuteSelectTasksIntoTupleDest(List *taskList, TupleDestination *tupleDestinati .requires2PC = false }; - /* - * Local execution is not supported because here we use perPlacementQueryStrings. - * Local execution does not know how to handle it. One solution is to extract and set - * queryStringLazy from perPlacementQueryStrings. The extracted one should be the - * query string for the local placement. - */ - bool localExecutionSupported = false; + bool localExecutionSupported = true; ExecutionParams *executionParams = CreateBasicExecutionParams( ROW_MODIFY_READONLY, taskList, targetPoolSize, localExecutionSupported ); diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index c6fd32ea4..618dbab86 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -208,6 +208,11 @@ NonPushableInsertSelectExecScan(CustomScanState *node) hasReturning); executorState->es_processed = rowsInserted; + + if (SortReturning && hasReturning) + { + SortTupleStore(scanState); + } } else if (insertSelectQuery->onConflict || hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index d49a0d44d..767dc7f27 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -550,7 +550,8 @@ SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacement /* * ExecuteLocalTaskPlan gets a planned statement which can be executed locally. * The function simply follows the steps to have a local execution, sets the - * tupleStore if necessary. The function returns the + * tupleStore if necessary. The function returns the number of rows affected in + * case of DML. */ static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, @@ -564,6 +565,13 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, RecordNonDistTableAccessesForTask(task); + /* + * Some tuple destinations look at task->taskPlacementList to determine + * where the result came from using the placement index. Since a local + * task can only ever have 1 placement, we set the index to 0. + */ + int localPlacementIndex = 0; + /* * Use the tupleStore provided by the scanState because it is shared accross * the other task executions and the adaptive executor. @@ -574,7 +582,7 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString, */ DestReceiver *destReceiver = tupleDest ? CreateTupleDestDestReceiver(tupleDest, task, - LOCAL_PLACEMENT_INDEX) : + localPlacementIndex) : CreateDestReceiver(DestNone); /* Create a QueryDesc for the query */ @@ -615,12 +623,12 @@ RecordNonDistTableAccessesForTask(Task *task) if (list_length(taskPlacementList) == 0) { /* - * We need at least one task placement to record relation access. - * FIXME: Unfortunately, it is possible due to - * https://github.com/citusdata/citus/issues/4104. - * We can safely remove this check when above bug is fixed. + * We should never get here, but prefer to throw an error over crashing + * if we're wrong. */ - return; + ereport(ERROR, (errmsg("shard " UINT64_FORMAT " does not have any shard " + "placements", + task->anchorShardId))); } /* diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 5f2bcceb7..5b9fd928e 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -14,11 +14,6 @@ #include "distributed/citus_custom_scan.h" #include "distributed/tuple_destination.h" -/* - * Used as TupleDestination->putTuple's placementIndex when executing - * local tasks. - */ -#define LOCAL_PLACEMENT_INDEX -1 /* enabled with GUCs*/ extern bool EnableLocalExecution; diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index e6c48f731..00de7eea8 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -97,6 +97,47 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) END; +-- INSERT..SELECT with re-partitioning after local execution +BEGIN; +INSERT INTO test VALUES (0,1000); +CREATE TABLE repart_test (x int primary key, y int); +SELECT create_distributed_table('repart_test','x', colocate_with := 'none'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.repart_test (x integer NOT NULL, y integer) ');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test OWNER TO postgres');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.repart_test ADD CONSTRAINT repart_test_pkey PRIMARY KEY (x)') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO repart_test (x, y) SELECT y, x FROM test; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) +SELECT y FROM repart_test WHERE x = 1000; + y +--------------------------------------------------------------------- + 0 +(1 row) + +INSERT INTO repart_test (x, y) SELECT y, x FROM test ON CONFLICT (x) DO UPDATE SET y = -1; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503000_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503000_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503000 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503003_to','SELECT y AS x, x AS y FROM coordinator_shouldhaveshards.test_1503003 test WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503004 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.repart_test_1503007 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1503003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = '-1'::integer +SELECT y FROM repart_test WHERE x = 1000; + y +--------------------------------------------------------------------- + -1 +(1 row) + +ROLLBACK; +-- INSERT..SELECT with re-partitioning in EXPLAIN ANALYZE after local execution +BEGIN; +INSERT INTO test VALUES (0,1000); +EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) INSERT INTO test (x, y) SELECT y, x FROM test; +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands with repartitioning +ROLLBACK; -- DDL connects to locahost ALTER TABLE test ADD COLUMN z int; -- DDL after local execution @@ -140,8 +181,8 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar -- this should be run locally SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503004, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503007, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503008, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503011, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: executing the copy locally for shard xxxxx @@ -154,8 +195,8 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) SELECT count(*) FROM dist_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503004 dist_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503007 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503008 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503011 dist_table WHERE true count --------------------------------------------------------------------- 100 @@ -176,8 +217,8 @@ NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshar -- this should be run locally SELECT create_distributed_table('dist_table', 'a', colocate_with := 'none'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503010, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503010, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503013, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503013, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503014, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table (a integer) ');SELECT worker_apply_shard_ddl_command (1503017, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: executing the copy locally for shard xxxxx @@ -190,8 +231,8 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) SELECT count(*) FROM dist_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503010 dist_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503013 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503014 dist_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true count --------------------------------------------------------------------- 100 @@ -248,7 +289,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) SELECT * FROM ref JOIN local ON (a = x); -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -268,7 +309,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -284,10 +325,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato (1 row) INSERT INTO ref VALUES (1,2); -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 @@ -309,8 +350,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinato WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -320,7 +361,7 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co TRUNCATE ref; NOTICE: executing the command locally: TRUNCATE TABLE coordinator_shouldhaveshards.ref_xxxxx CASCADE SELECT * FROM ref JOIN local ON (a = x); -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- (0 rows) @@ -330,8 +371,8 @@ NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FR WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -345,8 +386,8 @@ BEGIN; WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -360,8 +401,8 @@ BEGIN; WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503016'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -380,10 +421,10 @@ NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.co ROLLBACK; -- same local table reference table tests, but outside a transaction block INSERT INTO ref VALUES (1,2); -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (1, 2) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (1, 2) INSERT INTO local VALUES (1,2); SELECT * FROM ref JOIN local ON (a = x); -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) a | b | x | y --------------------------------------------------------------------- 1 | 2 | 1 | 2 @@ -394,8 +435,8 @@ NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y FR WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref VALUES (3,2) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503016 (a, b) VALUES (3, 2) RETURNING a, b -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 (a, b) VALUES (3, 2) RETURNING a, b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b count | x | y | a | b | count --------------------------------------------------------------------- @@ -416,7 +457,7 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut (1 row) INSERT INTO dist_table VALUES(1); -NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503017 (a) VALUES (1) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.dist_table_1503021 (a) VALUES (1) SELECT * FROM local JOIN dist_table ON (a = x) ORDER BY 1,2,3; x | y | a --------------------------------------------------------------------- @@ -426,7 +467,7 @@ SELECT * FROM local JOIN dist_table ON (a = x) ORDER BY 1,2,3; (3 rows) SELECT * FROM local JOIN dist_table ON (a = x) WHERE a = 1 ORDER BY 1,2,3; -NOTICE: executing the command locally: SELECT local.x, local.y, dist_table.a FROM ((SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local JOIN coordinator_shouldhaveshards.dist_table_1503017 dist_table ON ((dist_table.a OPERATOR(pg_catalog.=) local.x))) WHERE (dist_table.a OPERATOR(pg_catalog.=) 1) ORDER BY local.x, local.y, dist_table.a +NOTICE: executing the command locally: SELECT local.x, local.y, dist_table.a FROM ((SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local JOIN coordinator_shouldhaveshards.dist_table_1503021 dist_table ON ((dist_table.a OPERATOR(pg_catalog.=) local.x))) WHERE (dist_table.a OPERATOR(pg_catalog.=) 1) ORDER BY local.x, local.y, dist_table.a x | y | a --------------------------------------------------------------------- 1 | 2 | 1 @@ -436,9 +477,9 @@ NOTICE: executing the command locally: SELECT local.x, local.y, dist_table.a FR -- intermediate results are allowed WITH cte_1 AS (SELECT * FROM dist_table ORDER BY 1 LIMIT 1) SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); -NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503017 dist_table WHERE true ORDER BY a LIMIT '1'::bigint -NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503020 dist_table WHERE true ORDER BY a LIMIT '1'::bigint -NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN (SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) +NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503021 dist_table WHERE true ORDER BY a LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT a FROM coordinator_shouldhaveshards.dist_table_1503024 dist_table WHERE true ORDER BY a LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, cte_1.a FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN (SELECT local_1.x, local_1.y FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) local_1) local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) a | b | x | y | a --------------------------------------------------------------------- 1 | 2 | 1 | 2 | 1 @@ -447,7 +488,7 @@ NOTICE: executing the command locally: SELECT ref.a, ref.b, local.x, local.y, c -- full router query with CTE and local WITH cte_1 AS (SELECT * FROM ref LIMIT 1) SELECT * FROM ref JOIN local ON (a = x) JOIN cte_1 ON (local.x = cte_1.a); -NOTICE: executing the command locally: WITH cte_1 AS (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503016 ref_1 LIMIT 1) SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503016 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) +NOTICE: executing the command locally: WITH cte_1 AS (SELECT ref_1.a, ref_1.b FROM coordinator_shouldhaveshards.ref_1503020 ref_1 LIMIT 1) SELECT ref.a, ref.b, local.x, local.y, cte_1.a, cte_1.b FROM ((coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) JOIN cte_1 ON ((local.x OPERATOR(pg_catalog.=) cte_1.a))) a | b | x | y | a | b --------------------------------------------------------------------- 1 | 2 | 1 | 2 | 1 | 2 @@ -467,6 +508,10 @@ BEGIN; -- this will use perPlacementQueryStrings, make sure it works correctly with -- copying task INSERT INTO dist_table SELECT a + 1 FROM dist_table; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503027_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503027_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503027 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503029_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503029_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503029 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503030_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503030_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503030 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1503032_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1503032_to','SELECT (a OPERATOR(pg_catalog.+) 1) AS a FROM coordinator_shouldhaveshards.dist_table_1503032 dist_table WHERE true',0,'hash','{-2147483648,-1431655766,-715827884,-2,715827880,1431655762}'::text[],'{-1431655767,-715827885,-3,715827879,1431655761,2147483647}'::text[],true) WHERE rows_written > 0 ROLLBACK; SET citus.shard_replication_factor TO 1; BEGIN; @@ -475,10 +520,10 @@ CREATE TABLE dist_table1(a int); -- this will use queryStringList, make sure it works correctly with -- copying task SELECT create_distributed_table('dist_table1', 'a'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503029, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503031, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503032, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503034, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503033, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503035, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503036, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'CREATE TABLE coordinator_shouldhaveshards.dist_table1 (a integer) ');SELECT worker_apply_shard_ddl_command (1503038, 'coordinator_shouldhaveshards', 'ALTER TABLE coordinator_shouldhaveshards.dist_table1 OWNER TO postgres') create_distributed_table --------------------------------------------------------------------- @@ -502,8 +547,8 @@ NOTICE: executing the copy locally for shard xxxxx INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); NOTICE: executing the copy locally for shard xxxxx SELECT COUNT(*) FROM test JOIN ref_table USING(x); -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503000 test JOIN coordinator_shouldhaveshards.ref_table_1503035 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503003 test JOIN coordinator_shouldhaveshards.ref_table_1503035 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503000 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.test_1503003 test JOIN coordinator_shouldhaveshards.ref_table_1503039 ref_table ON ((test.x OPERATOR(pg_catalog.=) ref_table.x))) WHERE true count --------------------------------------------------------------------- 100 @@ -527,6 +572,7 @@ ERROR: could only create 2 of 3 of required shard replicas -- this will create an empty shard with replicas in the two worker nodes SET citus.shard_replication_factor TO 2; SELECT 1 FROM master_create_empty_shard('test_append_table'); +NOTICE: Creating placements for the append partitioned tables on the coordinator is not supported, skipping coordinator ... ?column? --------------------------------------------------------------------- 1 @@ -563,8 +609,8 @@ select create_distributed_table('test_index_creation1', 'tenant_id'); CREATE INDEX ix_test_index_creation5 ON test_index_creation1 USING btree(tenant_id, timeperiod) INCLUDE (field1) WHERE (tenant_id = 100); -NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503038 ON coordinator_shouldhaveshards.test_index_creation1_1503038 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) -NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503041 ON coordinator_shouldhaveshards.test_index_creation1_1503041 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) +NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503042 ON coordinator_shouldhaveshards.test_index_creation1_1503042 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) +NOTICE: executing the command locally: CREATE INDEX ix_test_index_creation5_1503045 ON coordinator_shouldhaveshards.test_index_creation1_1503045 USING btree (tenant_id ,timeperiod ) INCLUDE (field1 )WHERE (tenant_id = 100) -- test if indexes are created SELECT 1 AS created WHERE EXISTS(SELECT * FROM pg_indexes WHERE indexname LIKE '%test_index_creation%'); created diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 241ce0673..56ddff6bd 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -706,7 +706,7 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx" NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query is followed by an INSERT..SELECT via the coordinator BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -733,19 +733,30 @@ ERROR: cannot execute command because a local execution has accessed a placemen DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + SELECT count(*) FROM distributed_table WHERE key = 6; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE (key OPERATOR(pg_catalog.=) 6) count --------------------------------------------------------------------- - 0 + 1 +(1 row) + + INSERT INTO reference_table (key) SELECT -key FROM distributed_table; +NOTICE: executing the command locally: SELECT (OPERATOR(pg_catalog.-) key) AS key FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT (OPERATOR(pg_catalog.-) key) AS key FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE true +NOTICE: executing the copy locally for shard xxxxx + INSERT INTO distributed_table (key) SELECT -key FROM distributed_table; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1470001_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1470001_to','SELECT (OPERATOR(pg_catalog.-) key) AS key FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_1470003_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_1470003_to','SELECT (OPERATOR(pg_catalog.-) key) AS key FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key) SELECT key FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1470003_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(key integer) + SELECT count(*) FROM distributed_table WHERE key = -6; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) '-6'::integer) + count +--------------------------------------------------------------------- + 1 (1 row) - INSERT INTO distributed_table (key) SELECT key+1 FROM distributed_table; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *; NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) VALUES (1, '11'::text, 21) ON CONFLICT(key) DO UPDATE SET value = '29'::text RETURNING citus_table_alias.key, citus_table_alias.value, citus_table_alias.age diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index 469b08b3a..ce8a8053b 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -78,8 +78,7 @@ EXPLAIN (costs off) INSERT INTO target_table SELECT a, max(b) FROM source_table INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; SET citus.log_local_commands to on; --- INSERT .. SELECT via repartitioning is not yet support after a local execution, --- hence below two blocks should fail +-- INSERT .. SELECT via repartitioning with local execution BEGIN; select count(*) from source_table WHERE a = 1; NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE (a OPERATOR(pg_catalog.=) 1) @@ -88,10 +87,36 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i 4 (1 row) - insert into target_table SELECT a*2 FROM source_table; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" + insert into target_table SELECT a*2 FROM source_table RETURNING a; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213581_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213581_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213583_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213583_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartitioned_results_xxxxx_from_4213582_to_0','repartitioned_results_xxxxx_from_4213584_to_0']::text[],'localhost',57638) bytes +NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_0,repartitioned_results_xxxxx_from_4213582_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a +NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a + a +--------------------------------------------------------------------- + 0 + 0 + 0 + 2 + 2 + 2 + 2 + 4 + 4 + 4 + 4 + 6 + 6 + 6 + 6 + 8 + 8 + 8 + 8 + 10 +(20 rows) + ROLLBACK; BEGIN; select count(*) from source_table WHERE a = 1; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index b7b65dd28..53238612a 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1206,6 +1206,30 @@ NOTICE: executing the command locally: INSERT INTO single_node.another_schema_t NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6) NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2) +-- INSERT..SELECT with re-partitioning when using local execution +BEGIN; +INSERT INTO another_schema_table VALUES (1,100); +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 (a, b) VALUES (1, 100) +INSERT INTO another_schema_table VALUES (2,100); +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 (a, b) VALUES (2, 100) +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630510_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630509_to_2,repartitioned_results_xxxxx_from_90630511_to_2,repartitioned_results_xxxxx_from_90630512_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_results('{repartitioned_results_xxxxx_from_90630512_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer, b integer) +SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 100) ORDER BY b + a | b +--------------------------------------------------------------------- + 100 | 1 + 100 | 2 +(2 rows) + +ROLLBACK; -- intermediate results WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index a3efb2922..42c5744ce 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -23,7 +23,6 @@ WHERE logicalrelid = 'test'::regclass AND groupid = 0; SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; - -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; @@ -46,6 +45,25 @@ SELECT y FROM test WHERE x = 1; SELECT count(*) FROM test; END; +-- INSERT..SELECT with re-partitioning after local execution +BEGIN; +INSERT INTO test VALUES (0,1000); +CREATE TABLE repart_test (x int primary key, y int); +SELECT create_distributed_table('repart_test','x', colocate_with := 'none'); + +INSERT INTO repart_test (x, y) SELECT y, x FROM test; + +SELECT y FROM repart_test WHERE x = 1000; +INSERT INTO repart_test (x, y) SELECT y, x FROM test ON CONFLICT (x) DO UPDATE SET y = -1; +SELECT y FROM repart_test WHERE x = 1000; +ROLLBACK; + +-- INSERT..SELECT with re-partitioning in EXPLAIN ANALYZE after local execution +BEGIN; +INSERT INTO test VALUES (0,1000); +EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) INSERT INTO test (x, y) SELECT y, x FROM test; +ROLLBACK; + -- DDL connects to locahost ALTER TABLE test ADD COLUMN z int; @@ -60,7 +78,6 @@ ALTER TABLE test DROP COLUMN z; SELECT y FROM test WHERE x = 1; END; - SET citus.shard_count TO 6; SET citus.log_remote_commands TO OFF; @@ -77,6 +94,7 @@ SELECT count(*) FROM dist_table; ROLLBACK; CREATE TABLE dist_table (a int); + INSERT INTO dist_table SELECT * FROM generate_series(1, 100); BEGIN; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 8fdb42149..8d72ac145 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -372,7 +372,7 @@ BEGIN; TRUNCATE distributed_table CASCADE; ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query is followed by an INSERT..SELECT via the coordinator BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; @@ -385,11 +385,12 @@ SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table d1 join distributed_table d2 using(age); ROLLBACK; --- a local query is followed by a command that cannot be executed locally +-- a local query is followed by an INSERT..SELECT with re-partitioning BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; - - INSERT INTO distributed_table (key) SELECT key+1 FROM distributed_table; + SELECT count(*) FROM distributed_table WHERE key = 6; + INSERT INTO reference_table (key) SELECT -key FROM distributed_table; + INSERT INTO distributed_table (key) SELECT -key FROM distributed_table; + SELECT count(*) FROM distributed_table WHERE key = -6; ROLLBACK; INSERT INTO distributed_table VALUES (1, '11',21) ON CONFLICT(key) DO UPDATE SET value = '29' RETURNING *; diff --git a/src/test/regress/sql/multi_mx_insert_select_repartition.sql b/src/test/regress/sql/multi_mx_insert_select_repartition.sql index e71da4a4f..29b4c2c01 100644 --- a/src/test/regress/sql/multi_mx_insert_select_repartition.sql +++ b/src/test/regress/sql/multi_mx_insert_select_repartition.sql @@ -43,12 +43,10 @@ INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a; SET citus.log_local_commands to on; --- INSERT .. SELECT via repartitioning is not yet support after a local execution, --- hence below two blocks should fail - +-- INSERT .. SELECT via repartitioning with local execution BEGIN; select count(*) from source_table WHERE a = 1; - insert into target_table SELECT a*2 FROM source_table; + insert into target_table SELECT a*2 FROM source_table RETURNING a; ROLLBACK; BEGIN; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index c4f41246e..372b3608d 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -675,6 +675,14 @@ INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table -- multi-row INSERTs INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +-- INSERT..SELECT with re-partitioning when using local execution +BEGIN; +INSERT INTO another_schema_table VALUES (1,100); +INSERT INTO another_schema_table VALUES (2,100); +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; +SELECT * FROM another_schema_table WHERE a = 100 ORDER BY b; +ROLLBACK; + -- intermediate results WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1;