Merge pull request #2137 from citusdata/marco_updel_subquery

Implement recursive planning for DML statements
pull/2147/head
Marco Slot 2018-05-03 22:22:14 +02:00 committed by GitHub
commit 0f98e4dd2f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 2296 additions and 246 deletions

View File

@ -487,6 +487,8 @@ RouterSequentialModifyExecScan(CustomScanState *node)
BeginOrContinueCoordinatedTransaction(); BeginOrContinueCoordinatedTransaction();
} }
ExecuteSubPlans(distributedPlan);
foreach(taskCell, taskList) foreach(taskCell, taskList)
{ {
Task *task = (Task *) lfirst(taskCell); Task *task = (Task *) lfirst(taskCell);
@ -522,6 +524,7 @@ RouterMultiModifyExecScan(CustomScanState *node)
bool hasReturning = distributedPlan->hasReturning; bool hasReturning = distributedPlan->hasReturning;
bool isModificationQuery = true; bool isModificationQuery = true;
ExecuteSubPlans(distributedPlan);
ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning);
scanState->finishedRemoteScan = true; scanState->finishedRemoteScan = true;

View File

@ -50,17 +50,16 @@ static uint64 NextPlanId = 1;
/* local function forward declarations */ /* local function forward declarations */
static bool NeedsDistributedPlanningWalker(Node *node, void *context); static bool NeedsDistributedPlanningWalker(Node *node, void *context);
static PlannedStmt * CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, static PlannedStmt * CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan,
Query *originalQuery, Query *query, Query *originalQuery, Query *query,
ParamListInfo boundParams, ParamListInfo boundParams,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
static DistributedPlan * CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, static DistributedPlan * CreateDistributedPlan(uint64 planId, Query *originalQuery,
Query *query, Query *query, ParamListInfo boundParams,
ParamListInfo boundParams, bool hasUnresolvedParams,
bool hasUnresolvedParams, PlannerRestrictionContext *
PlannerRestrictionContext * plannerRestrictionContext);
plannerRestrictionContext);
static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams); static Node * ResolveExternalParams(Node *inputNode, ParamListInfo boundParams);
static void AssignRTEIdentities(Query *queryTree); static void AssignRTEIdentities(Query *queryTree);
@ -146,8 +145,8 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
{ {
uint64 planId = NextPlanId++; uint64 planId = NextPlanId++;
result = CreateDistributedPlan(planId, result, originalQuery, parse, result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
boundParams, plannerRestrictionContext); boundParams, plannerRestrictionContext);
} }
} }
PG_CATCH(); PG_CATCH();
@ -467,13 +466,13 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
/* /*
* CreateDistributedPlan encapsulates the logic needed to transform a particular * CreateDistributedPlannedStmt encapsulates the logic needed to transform a particular
* query into a distributed plan. * query into a distributed plan that is encapsulated by a PlannedStmt.
*/ */
static PlannedStmt * static PlannedStmt *
CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuery, CreateDistributedPlannedStmt(uint64 planId, PlannedStmt *localPlan, Query *originalQuery,
Query *query, ParamListInfo boundParams, Query *query, ParamListInfo boundParams,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
DistributedPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
PlannedStmt *resultPlan = NULL; PlannedStmt *resultPlan = NULL;
@ -489,31 +488,9 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer
plannerRestrictionContext->joinRestrictionContext = plannerRestrictionContext->joinRestrictionContext =
RemoveDuplicateJoinRestrictions(joinRestrictionContext); RemoveDuplicateJoinRestrictions(joinRestrictionContext);
if (IsModifyCommand(query)) distributedPlan =
{ CreateDistributedPlan(planId, originalQuery, query, boundParams,
EnsureModificationsCanRun(); hasUnresolvedParams, plannerRestrictionContext);
if (InsertSelectIntoDistributedTable(originalQuery))
{
distributedPlan =
CreateInsertSelectPlan(originalQuery, plannerRestrictionContext);
}
else
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}
Assert(distributedPlan);
}
else
{
distributedPlan =
CreateDistributedSelectPlan(planId, originalQuery, query, boundParams,
hasUnresolvedParams,
plannerRestrictionContext);
}
/* /*
* If no plan was generated, prepare a generic error to be emitted. * If no plan was generated, prepare a generic error to be emitted.
@ -580,7 +557,7 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer
/* /*
* CreateDistributedSelectPlan generates a distributed plan for a SELECT query. * CreateDistributedPlan generates a distributed plan for a query.
* It goes through 3 steps: * It goes through 3 steps:
* *
* 1. Try router planner * 1. Try router planner
@ -589,39 +566,71 @@ CreateDistributedPlan(uint64 planId, PlannedStmt *localPlan, Query *originalQuer
* 3. Logical planner * 3. Logical planner
*/ */
static DistributedPlan * static DistributedPlan *
CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query, CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamListInfo
ParamListInfo boundParams, bool hasUnresolvedParams, boundParams, bool hasUnresolvedParams,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
DistributedPlan *distributedPlan = NULL; DistributedPlan *distributedPlan = NULL;
MultiTreeRoot *logicalPlan = NULL; MultiTreeRoot *logicalPlan = NULL;
List *subPlanList = NIL; List *subPlanList = NIL;
bool hasCtes = originalQuery->cteList != NIL;
/* if (IsModifyCommand(originalQuery))
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planing process needed to
* produce distributed query plans.
*/
distributedPlan = CreateRouterPlan(originalQuery, query,
plannerRestrictionContext);
if (distributedPlan != NULL)
{ {
EnsureModificationsCanRun();
if (InsertSelectIntoDistributedTable(originalQuery))
{
distributedPlan =
CreateInsertSelectPlan(originalQuery, plannerRestrictionContext);
}
else
{
/* modifications are always routed through the same planner/executor */
distributedPlan =
CreateModifyPlan(originalQuery, query, plannerRestrictionContext);
}
/* the functions above always return a plan, possibly with an error */
Assert(distributedPlan);
if (distributedPlan->planningError == NULL) if (distributedPlan->planningError == NULL)
{ {
/* successfully created a router plan */
return distributedPlan; return distributedPlan;
} }
else else
{ {
/*
* For debugging it's useful to display why query was not
* router plannable.
*/
RaiseDeferredError(distributedPlan->planningError, DEBUG1); RaiseDeferredError(distributedPlan->planningError, DEBUG1);
} }
} }
else
{
/*
* For select queries we, if router executor is enabled, first try to
* plan the query as a router query. If not supported, otherwise try
* the full blown plan/optimize/physical planing process needed to
* produce distributed query plans.
*/
distributedPlan = CreateRouterPlan(originalQuery, query,
plannerRestrictionContext);
if (distributedPlan != NULL)
{
if (distributedPlan->planningError == NULL)
{
/* successfully created a router plan */
return distributedPlan;
}
else
{
/*
* For debugging it's useful to display why query was not
* router plannable.
*/
RaiseDeferredError(distributedPlan->planningError, DEBUG1);
}
}
}
if (hasUnresolvedParams) if (hasUnresolvedParams)
{ {
@ -662,8 +671,13 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query,
* We could simplify this code if the logical planner was capable of dealing * We could simplify this code if the logical planner was capable of dealing
* with an original query. In that case, we would only have to filter the * with an original query. In that case, we would only have to filter the
* planner restriction context. * planner restriction context.
*
* Note that we check both for subplans and whether the query had CTEs
* prior to calling GenerateSubplansForSubqueriesAndCTEs. If none of
* the CTEs are referenced then there are no subplans, but we still want
* to retry the router planner.
*/ */
if (list_length(subPlanList) > 0) if (list_length(subPlanList) > 0 || hasCtes)
{ {
Query *newQuery = copyObject(originalQuery); Query *newQuery = copyObject(originalQuery);
bool setPartitionedTablesInherited = false; bool setPartitionedTablesInherited = false;
@ -693,14 +707,24 @@ CreateDistributedSelectPlan(uint64 planId, Query *originalQuery, Query *query,
/* overwrite the old transformed query with the new transformed query */ /* overwrite the old transformed query with the new transformed query */
memcpy(query, newQuery, sizeof(Query)); memcpy(query, newQuery, sizeof(Query));
/* recurse into CreateDistributedSelectPlan with subqueries/CTEs replaced */ /* recurse into CreateDistributedPlan with subqueries/CTEs replaced */
distributedPlan = CreateDistributedSelectPlan(planId, originalQuery, query, NULL, distributedPlan = CreateDistributedPlan(planId, originalQuery, query, NULL, false,
false, plannerRestrictionContext); plannerRestrictionContext);
distributedPlan->subPlanList = subPlanList; distributedPlan->subPlanList = subPlanList;
return distributedPlan; return distributedPlan;
} }
/*
* DML command returns a planning error, even after recursive planning. The
* logical planner cannot handle DML commands so return the plan with the
* error.
*/
if (IsModifyCommand(originalQuery))
{
return distributedPlan;
}
/* /*
* CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs. * CTEs are stripped from the original query by RecursivelyPlanSubqueriesAndCTEs.
* If we get here and there are still CTEs that means that none of the CTEs are * If we get here and there are still CTEs that means that none of the CTEs are

View File

@ -332,7 +332,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
} }
} }
if (contain_volatile_functions((Node *) queryTree)) if (FindNodeCheck((Node *) queryTree, CitusIsVolatileFunction))
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"volatile functions are not allowed in distributed " "volatile functions are not allowed in distributed "

View File

@ -594,14 +594,21 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
if (rangeTableEntry->rtekind == RTE_RELATION) if (rangeTableEntry->rtekind == RTE_RELATION)
{ {
/* DistTableCacheEntry *distTableEntry = NULL;
* We are sure that the table should be distributed, therefore no need to
* call IsDistributedTable() here and DistributedTableCacheEntry will
* error out if the table is not distributed
*/
DistTableCacheEntry *distTableEntry =
DistributedTableCacheEntry(rangeTableEntry->relid);
if (!IsDistributedTable(rangeTableEntry->relid))
{
StringInfo errorMessage = makeStringInfo();
char *relationName = get_rel_name(rangeTableEntry->relid);
appendStringInfo(errorMessage, "relation %s is not distributed",
relationName);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
distTableEntry = DistributedTableCacheEntry(rangeTableEntry->relid);
if (distTableEntry->partitionMethod == DISTRIBUTE_BY_NONE) if (distTableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{ {
referenceTable = true; referenceTable = true;

View File

@ -699,11 +699,11 @@ RecursivelyPlanCTEs(Query *query, RecursivePlanningContext *planningContext)
NULL, NULL); NULL, NULL);
} }
if (cte->cterefcount == 0) if (cte->cterefcount == 0 && subquery->commandType == CMD_SELECT)
{ {
/* /*
* CTEs that aren't referenced aren't executed in postgres. We * SELECT CTEs that aren't referenced aren't executed in postgres.
* don't need to generate a subplan for it and can take the rest * We don't need to generate a subplan for it and can take the rest
* of this iteration off. * of this iteration off.
*/ */
continue; continue;

View File

@ -0,0 +1,151 @@
CREATE SCHEMA cte_nested_modifications;
SET search_path TO cte_nested_modifications, public;
CREATE TABLE tt1(id int, value_1 int);
INSERT INTO tt1 VALUES(1,2),(2,3),(3,4);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE TABLE tt2(id int, value_1 int);
INSERT INTO tt2 VALUES(3,3),(4,4),(5,5);
SELECT create_distributed_table('tt2','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE TABLE tt3(id int, json_val json);
INSERT INTO tt3 VALUES(1, '{"prod_name":"name_1", "qty":"6"}'),
(2, '{"prod_name":"name_2", "qty":"4"}'),
(3, '{"prod_name":"name_3", "qty":"2"}');
-- DELETE within CTE and use it from UPDATE
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id - 1 as id
)
UPDATE tt1
SET value_1 = abs(2 + 3.5)
FROM cte_1
WHERE cte_1.id = tt1.id;
SELECT * FROM tt1 ORDER BY id;
id | value_1
----+---------
1 | 2
2 | 6
3 | 4
(3 rows)
ROLLBACK;
-- Similar to test above, now use the CTE in the SET part of the UPDATE
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id as id
)
UPDATE tt1
SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1);
SELECT * FROM tt1 ORDER BY id;
id | value_1
----+---------
1 | 9
2 | 9
3 | 9
(3 rows)
ROLLBACK;
-- Use alias in the definition of CTE, instead of in the RETURNING
BEGIN;
WITH cte_1(id) AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id
)
UPDATE tt1
SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1);
SELECT * FROM tt1 ORDER BY id;
id | value_1
----+---------
1 | 9
2 | 9
3 | 9
(3 rows)
ROLLBACK;
-- Update within CTE and use it from Delete
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
UPDATE tt2
SET value_1 = 10
FROM cte_2
WHERE id = cte2_id
RETURNING id, value_1
)
DELETE FROM tt1
USING cte_1
WHERE tt1.id < cte_1.id;
SELECT * FROM tt1 ORDER BY id;
id | value_1
----+---------
3 | 4
(1 row)
ROLLBACK;
-- Similar to test above, but use json column
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT * FROM tt3
)
UPDATE tt2
SET value_1 = (SELECT max((json_val->>'qty')::int) FROM cte_2)
RETURNING id, value_1
)
DELETE FROM tt1
USING cte_1
WHERE tt1.id < cte_1.id;
SELECT * FROM tt1 ORDER BY id;
id | value_1
----+---------
(0 rows)
ROLLBACK;
DROP SCHEMA cte_nested_modifications CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table tt1
drop cascades to table tt2
drop cascades to table tt3

View File

@ -0,0 +1,67 @@
CREATE SCHEMA cte_prepared_modify;
SET search_path TO cte_prepared_modify, public;
CREATE TABLE tt1(id int, value_1 int);
INSERT INTO tt1 VALUES(1,2),(2,3),(3,4);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE TABLE tt2(id int, value_1 int);
INSERT INTO tt2 VALUES(3,3),(4,4),(5,5);
SELECT create_distributed_table('tt2','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
-- Test with prepared statements (parameter used by SET)
PREPARE prepared_test(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = $1
FROM cte_1
WHERE tt2.id = cte_1.id;
-- Test with prepared statements (parameter used by WHERE on partition column)
PREPARE prepared_test_2(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = (SELECT max(id) FROM cte_1)
WHERE tt2.id = $1;
-- Test with prepared statements (parameter used by WHERE on non-partition column)
PREPARE prepared_test_3(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = (SELECT max(id) FROM cte_1)
WHERE tt2.value_1 = $1;
EXECUTE prepared_test(1);
EXECUTE prepared_test(2);
EXECUTE prepared_test(3);
EXECUTE prepared_test(4);
EXECUTE prepared_test(5);
EXECUTE prepared_test(6);
EXECUTE prepared_test(1);
EXECUTE prepared_test(2);
EXECUTE prepared_test(3);
EXECUTE prepared_test(4);
EXECUTE prepared_test(5);
EXECUTE prepared_test(6);
EXECUTE prepared_test_3(1);
EXECUTE prepared_test_3(2);
EXECUTE prepared_test_3(3);
EXECUTE prepared_test_3(4);
EXECUTE prepared_test_3(5);
EXECUTE prepared_test_3(6);
DROP SCHEMA cte_prepared_modify CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table tt1
drop cascades to table tt2

View File

@ -0,0 +1,359 @@
CREATE SCHEMA recursive_dml_queries;
SET search_path TO recursive_dml_queries, public;
SET citus.next_shard_id TO 2370000;
CREATE TABLE recursive_dml_queries.distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE recursive_dml_queries.second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE recursive_dml_queries.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
create_reference_table
------------------------
(1 row)
CREATE TABLE recursive_dml_queries.local_table (id text, name text);
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
CREATE VIEW tenant_ids AS
SELECT
tenant_id, name
FROM
distributed_table, reference_table
WHERE
distributed_table.dept::text = reference_table.id
ORDER BY 2 DESC, 1 DESC;
SET client_min_messages TO DEBUG1;
-- the subquery foo is recursively planned
UPDATE
reference_table
SET
name = 'new_' || name
FROM
(
SELECT
avg(second_distributed_table.tenant_id::int) as avg_tenant_id
FROM
second_distributed_table
) as foo
WHERE
foo.avg_tenant_id::int::text = reference_table.id
RETURNING
reference_table.name;
DEBUG: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables
DEBUG: generating subplan 4_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM recursive_dml_queries.second_distributed_table
DEBUG: Plan 4 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.reference_table SET name = ('new_'::text || reference_table.name) FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = reference_table.id) RETURNING reference_table.name
name
-------------
new_user_50
(1 row)
-- the subquery foo is recursively planned
-- but note that the subquery foo itself is pushdownable
UPDATE
second_distributed_table
SET
dept = foo.max_dept * 2
FROM
(
SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
) foo_inner
GROUP BY
tenant_id
ORDER BY 1 DESC
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (2)
RETURNING
second_distributed_table.tenant_id, second_distributed_table.dept;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) AS max_dept FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE (distributed_table.tenant_id = second_distributed_table.tenant_id)) foo_inner GROUP BY tenant_id ORDER BY tenant_id DESC
DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = (foo.max_dept * 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 2)) RETURNING second_distributed_table.tenant_id, second_distributed_table.dept
tenant_id | dept
-----------+------
52 | 18
72 | 18
82 | 18
2 | 18
12 | 18
22 | 18
62 | 18
92 | 18
32 | 18
42 | 18
(10 rows)
-- the subquery foo is recursively planned
-- and foo itself is a non colocated subquery and recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
) foo_inner_1,
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (4,5)
)foo_inner_2
WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (3);
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 8_1 for subquery SELECT second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id = second_distributed_table.tenant_id) AND (second_distributed_table.dept = ANY (ARRAY[4, 5])))
DEBUG: generating subplan 8_2 for subquery SELECT DISTINCT foo_inner_1.tenant_id FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries.second_distributed_table, recursive_dml_queries.distributed_table WHERE ((distributed_table.tenant_id = second_distributed_table.tenant_id) AND (second_distributed_table.dept = ANY (ARRAY[3, 4])))) foo_inner_1, (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo_inner_2 WHERE (foo_inner_1.tenant_id <> foo_inner_2.tenant_id)
DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.second_distributed_table SET dept = ((foo.tenant_id)::integer / 4) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 3))
-- we currently do not allow local tables in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(local_table.id::int) as avg_tenant_id
FROM
local_table
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
DEBUG: relation local_table is not distributed
DEBUG: generating subplan 11_1 for subquery SELECT avg((id)::integer) AS avg_tenant_id FROM recursive_dml_queries.local_table
DEBUG: Plan 11 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info
tenant_id | dept | info
-----------+------+------------------------
50 | 50 | {"f1": 50, "f2": 2500}
(1 row)
-- we currently do not allow views in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(tenant_id::int) as avg_tenant_id
FROM
tenant_ids
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
DEBUG: relation tenant_ids is not distributed
DEBUG: generating subplan 12_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM (SELECT distributed_table.tenant_id, reference_table.name FROM recursive_dml_queries.distributed_table, recursive_dml_queries.reference_table WHERE ((distributed_table.dept)::text = reference_table.id) ORDER BY reference_table.name DESC, distributed_table.tenant_id DESC) tenant_ids
DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = (foo.avg_tenant_id)::integer FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = distributed_table.tenant_id) RETURNING distributed_table.tenant_id, distributed_table.dept, distributed_table.info
tenant_id | dept | info
-----------+------+------------------------
50 | 50 | {"f1": 50, "f2": 2500}
(1 row)
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
) as foo
RETURNING *;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- again a corrolated subquery
-- this time distribution key eq. exists
-- however recursive planning is prevented due to correlated subqueries
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT baz.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table as d1
WHERE
d1.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
AND
second_distributed_table.tenant_id IN
(
SELECT s2.tenant_id
FROM second_distributed_table as s2
GROUP BY d1.tenant_id, s2.tenant_id
)
) as baz
) as foo WHERE second_distributed_table.tenant_id = foo.tenant_id
RETURNING *;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- we don't support subquerues/CTEs inside VALUES
INSERT INTO
second_distributed_table (tenant_id, dept)
VALUES ('3', (WITH vals AS (SELECT 3) select * from vals));
DEBUG: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
DEBUG: generating subplan 18_1 for CTE vals: SELECT 3
DEBUG: Plan 18 query after replacing subqueries and CTEs: INSERT INTO recursive_dml_queries.second_distributed_table (tenant_id, dept) VALUES ('3'::text, (SELECT vals."?column?" FROM (SELECT intermediate_result."?column?" FROM read_intermediate_result('18_1'::text, 'binary'::citus_copy_format) intermediate_result("?column?" integer)) vals))
DEBUG: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
INSERT INTO
second_distributed_table (tenant_id, dept)
VALUES ('3', (SELECT 3));
DEBUG: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
ERROR: subqueries are not supported within INSERT queries
HINT: Try rewriting your queries with 'INSERT INTO ... SELECT' syntax.
-- DML with an unreferenced SELECT CTE
WITH cte_1 AS (
WITH cte_2 AS (
SELECT tenant_id as cte2_id
FROM second_distributed_table
WHERE dept >= 2
)
UPDATE distributed_table
SET dept = 10
RETURNING *
)
UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 20_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept >= 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: Plan 20 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id < cte_1.tenant_id)
EXPLAIN (COSTS FALSE) WITH cte_1 AS (
WITH cte_2 AS (
SELECT tenant_id as cte2_id
FROM second_distributed_table
WHERE dept >= 2
)
UPDATE distributed_table
SET dept = 10
RETURNING *
)
UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 22_1 for CTE cte_1: WITH cte_2 AS (SELECT second_distributed_table.tenant_id AS cte2_id FROM recursive_dml_queries.second_distributed_table WHERE (second_distributed_table.dept >= 2)) UPDATE recursive_dml_queries.distributed_table SET dept = 10 RETURNING tenant_id, dept, info
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: Plan 22 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries.distributed_table SET dept = 5 FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept, intermediate_result.info FROM read_intermediate_result('22_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer, info jsonb)) cte_1 WHERE (distributed_table.tenant_id < cte_1.tenant_id)
QUERY PLAN
------------------------------------------------------------------------------------------------
Custom Scan (Citus Router)
-> Distributed Subplan 22_1
-> Custom Scan (Citus Router)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370000 distributed_table
-> Seq Scan on distributed_table_2370000 distributed_table
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on distributed_table_2370000 distributed_table
-> Nested Loop
Join Filter: (distributed_table.tenant_id < intermediate_result.tenant_id)
-> Function Scan on read_intermediate_result intermediate_result
-> Materialize
-> Seq Scan on distributed_table_2370000 distributed_table
(19 rows)
-- we don't support updating local table with a join with
-- distributed tables
UPDATE
local_table
SET
id = 'citus_test'
FROM
distributed_table
WHERE
distributed_table.tenant_id = local_table.id;
DEBUG: relation local_table is not distributed
ERROR: relation local_table is not distributed
RESET client_min_messages;
DROP SCHEMA recursive_dml_queries CASCADE;
NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table distributed_table
drop cascades to table second_distributed_table
drop cascades to table reference_table
drop cascades to table local_table
drop cascades to view tenant_ids

View File

@ -1775,6 +1775,25 @@ SELECT count(*) FROM raw_events_second;
38 38
(1 row) (1 row)
-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE
WITH series AS (
SELECT s AS val FROM generate_series(60,70) s
),
inserts AS (
INSERT INTO raw_events_second (user_id)
SELECT
user_id
FROM
raw_events_first JOIN series ON (value_1 = val)
RETURNING
NULL
)
SELECT count(*) FROM inserts;
count
-------
2
(1 row)
-- we need this in our next test -- we need this in our next test
truncate raw_events_first; truncate raw_events_first;
SET client_min_messages TO DEBUG2; SET client_min_messages TO DEBUG2;
@ -2176,12 +2195,14 @@ INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
ON CONFLICT DO NOTHING; ON CONFLICT DO NOTHING;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: ON CONFLICT is not supported in INSERT ... SELECT via coordinator
ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator ERROR: ON CONFLICT is not supported in INSERT ... SELECT via coordinator
-- RETURNING is unsupported -- RETURNING is unsupported
INSERT INTO raw_events_first (user_id, value_1) INSERT INTO raw_events_first (user_id, value_1)
SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s SELECT s, nextval('insert_select_test_seq') FROM generate_series(1, 5) s
RETURNING *; RETURNING *;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: RETURNING is not supported in INSERT ... SELECT via coordinator
ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
RESET client_min_messages; RESET client_min_messages;
-- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported -- INSERT ... SELECT and multi-shard SELECT in the same transaction is supported

View File

@ -1,5 +1,6 @@
SET citus.shard_count TO 32; SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000; SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000;
-- =================================================================== -- ===================================================================
-- test end-to-end modification functionality -- test end-to-end modification functionality
-- =================================================================== -- ===================================================================
@ -241,13 +242,9 @@ SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000;
3 3
(1 row) (1 row)
-- Who says that? :) -- commands containing a CTE are supported
-- INSERT ... SELECT ... FROM commands are unsupported WITH deleted_orders AS (DELETE FROM limit_orders WHERE id < 0 RETURNING *)
-- INSERT INTO limit_orders SELECT * FROM limit_orders; INSERT INTO limit_orders SELECT * FROM deleted_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;
ERROR: common table expressions are not supported in distributed modifications
-- test simple DELETE -- test simple DELETE
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
SELECT COUNT(*) FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246;
@ -291,16 +288,15 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
0 0
(1 row) (1 row)
-- commands with a USING clause are unsupported -- commands with a USING clause are supported
CREATE TABLE bidders ( name text, id bigint ); CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed ERROR: relation bidders is not distributed
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders; DELETE FROM limit_orders WHERE id < 0;
ERROR: common table expressions are not supported in distributed modifications
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
-- simple UPDATE -- simple UPDATE
UPDATE limit_orders SET symbol = 'GM' WHERE id = 246; UPDATE limit_orders SET symbol = 'GM' WHERE id = 246;
@ -431,10 +427,10 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed ERROR: relation bidders is not distributed
-- commands containing a CTE are unsupported -- the connection used for the INSERT is claimed by pull-push, causing the UPDATE to fail
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders SET symbol = 'GM'; UPDATE limit_orders SET symbol = 'GM';
ERROR: common table expressions are not supported in distributed modifications ERROR: cannot establish a new connection for placement 750003, since DML has been executed on a connection that is in use
SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
symbol | bidder_id symbol | bidder_id
--------+----------- --------+-----------
@ -962,13 +958,15 @@ SELECT * FROM summary_table ORDER BY id;
2 | 400 | 450.0000000000000000 | | 2 | 400 | 450.0000000000000000 | |
(2 rows) (2 rows)
-- unsupported multi-shard updates -- multi-shard updates with recursively planned subqueries
BEGIN;
UPDATE summary_table SET average_value = average_query.average FROM ( UPDATE summary_table SET average_value = average_query.average FROM (
SELECT avg(value) AS average FROM raw_table) average_query; SELECT avg(value) AS average FROM raw_table) average_query;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ROLLBACK;
BEGIN;
UPDATE summary_table SET average_value = average_value + 1 WHERE id = UPDATE summary_table SET average_value = average_value + 1 WHERE id =
(SELECT id FROM raw_table WHERE value > 100); (SELECT id FROM raw_table WHERE value > 100 LIMIT 1);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ROLLBACK;
-- test complex queries -- test complex queries
UPDATE summary_table UPDATE summary_table
SET SET

View File

@ -111,10 +111,9 @@ INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()),
(2039, 'GOOG', 5634, now(), 'buy', random()); (2039, 'GOOG', 5634, now(), 'buy', random());
-- connect back to the other node -- connect back to the other node
\c - - - :worker_1_port \c - - - :worker_1_port
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (DELETE FROM limit_orders_mx RETURNING *) WITH deleted_orders AS (DELETE FROM limit_orders_mx WHERE id < 0 RETURNING *)
INSERT INTO limit_orders_mx DEFAULT VALUES; INSERT INTO limit_orders_mx SELECT * FROM deleted_orders;
ERROR: common table expressions are not supported in distributed modifications
-- test simple DELETE -- test simple DELETE
INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246; SELECT COUNT(*) FROM limit_orders_mx WHERE id = 246;
@ -166,10 +165,9 @@ DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed ERROR: relation bidders is not distributed
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH new_orders AS (INSERT INTO limit_orders_mx VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders_mx; DELETE FROM limit_orders_mx WHERE id < 0;
ERROR: common table expressions are not supported in distributed modifications
-- cursors are not supported -- cursors are not supported
DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name; DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name;
ERROR: cannot run DML queries with cursors ERROR: cannot run DML queries with cursors
@ -236,10 +234,9 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
ERROR: relation bidders is not distributed ERROR: relation bidders is not distributed
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders_mx VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders_mx SET symbol = 'GM'; UPDATE limit_orders_mx SET symbol = 'GM';
ERROR: common table expressions are not supported in distributed modifications
SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246;
symbol | bidder_id symbol | bidder_id
--------+----------- --------+-----------

View File

@ -415,20 +415,20 @@ WHERE user_id IN (SELECT user_id
FROM users_test_table FROM users_test_table
UNION ALL UNION ALL
SELECT user_id SELECT user_id
FROM events_test_table) returning *; FROM events_test_table) returning value_3;
user_id | value_1 | value_2 | value_3 value_3
---------+---------+---------+--------- ---------
8 | 4 | 13 | 0 0
20 | 4 | | 0 0
20 | 4 | | 0 0
20 | 4 | | 0 0
4 | 4 | 9 | 0 0
4 | 4 | 17 | 0 0
16 | 4 | | 0 0
6 | 4 | 11 | 0 0
6 | 4 | 15 | 0 0
2 | 4 | 7 | 0 0
2 | 4 | 19 | 0 0
(11 rows) (11 rows)
UPDATE users_test_table UPDATE users_test_table
@ -556,12 +556,10 @@ WHERE user_id IN (SELECT 2);
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 6 SET value_2 = 6
WHERE value_1 IN (SELECT 2); WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions -- Function calls in subqueries will be recursively planned
UPDATE test_table_1 UPDATE test_table_1
SET col_3 = 6 SET col_3 = 6
WHERE date_col IN (SELECT now()); WHERE date_col IN (SELECT now());
ERROR: cannot push down this subquery
DETAIL: Subqueries without a FROM clause can only contain immutable functions
-- Test with prepared statements -- Test with prepared statements
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count count
@ -630,17 +628,15 @@ FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id) FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
ERROR: a join with USING causes an internal naming conflict, use ON instead ERROR: a join with USING causes an internal naming conflict, use ON instead
-- We can not pushdown query if there is no partition key equality -- Non-pushdownable subqueries, but will be handled through recursive planning
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 1 SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1) WHERE user_id IN (SELECT Count(value_1)
FROM events_test_table FROM events_test_table
GROUP BY user_id); GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = (SELECT Count(*) SET value_1 = (SELECT Count(*)
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
WHERE user_id IN (SELECT user_id WHERE user_id IN (SELECT user_id
@ -648,7 +644,6 @@ WHERE user_id IN (SELECT user_id
UNION UNION
SELECT value_1 SELECT value_1
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
WHERE user_id IN (SELECT user_id WHERE user_id IN (SELECT user_id
@ -657,7 +652,6 @@ WHERE user_id IN (SELECT user_id
SELECT Sum(value_1) SELECT Sum(value_1)
FROM events_test_table FROM events_test_table
GROUP BY user_id); GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_2 = (SELECT value_3 SET value_2 = (SELECT value_3
FROM users_test_table); FROM users_test_table);
@ -685,8 +679,6 @@ WHERE user_id IN
INTERSECT INTERSECT
SELECT user_id SELECT user_id
FROM events_test_table); FROM events_test_table);
ERROR: cannot push down this subquery
DETAIL: Intersect and Except are currently unsupported
-- Reference tables can not locate on the outer part of the outer join -- Reference tables can not locate on the outer part of the outer join
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
@ -707,25 +699,25 @@ SET value_2 = 5 * random()
FROM events_test_table FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id; WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
-- Volatile functions in a subquery are recursively planned
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Local tables are not supported -- Local tables are not supported
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
FROM events_test_table_local FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id; WHERE users_test_table.user_id = events_test_table_local.user_id;
ERROR: relation events_test_table_local is not distributed ERROR: relation events_test_table_local is not distributed
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
ERROR: relation events_test_table_local is not distributed
UPDATE events_test_table_local UPDATE events_test_table_local
SET value_2 = 5 SET value_2 = 5
FROM users_test_table FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id; WHERE events_test_table_local.user_id = users_test_table.user_id;
ERROR: relation events_test_table_local is not distributed ERROR: relation events_test_table_local is not distributed
-- Local tables in a subquery are supported through recursive planning
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
-- Shard counts of tables must be equal to pushdown the query -- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
@ -738,7 +730,8 @@ DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
DELETE FROM users_test_table DELETE FROM users_test_table
WHERE users_test_table.user_id = (SELECT user_id WHERE users_test_table.user_id = (SELECT user_id
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637
-- Cursors are not supported -- Cursors are not supported
BEGIN; BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;

View File

@ -438,20 +438,20 @@ WHERE user_id IN (SELECT user_id
FROM users_test_table FROM users_test_table
UNION ALL UNION ALL
SELECT user_id SELECT user_id
FROM events_test_table) returning *; FROM events_test_table) returning value_3;
user_id | value_1 | value_2 | value_3 value_3
---------+---------+---------+--------- ---------
8 | 4 | 13 | 0 0
20 | 4 | | 0 0
20 | 4 | | 0 0
20 | 4 | | 0 0
4 | 4 | 9 | 0 0
4 | 4 | 17 | 0 0
16 | 4 | | 0 0
6 | 4 | 11 | 0 0
6 | 4 | 15 | 0 0
2 | 4 | 7 | 0 0
2 | 4 | 19 | 0 0
(11 rows) (11 rows)
UPDATE users_test_table UPDATE users_test_table
@ -579,12 +579,10 @@ WHERE user_id IN (SELECT 2);
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 6 SET value_2 = 6
WHERE value_1 IN (SELECT 2); WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions -- Function calls in subqueries will be recursively planned
UPDATE test_table_1 UPDATE test_table_1
SET col_3 = 6 SET col_3 = 6
WHERE date_col IN (SELECT now()); WHERE date_col IN (SELECT now());
ERROR: cannot push down this subquery
DETAIL: Subqueries without a FROM clause can only contain immutable functions
-- Test with prepared statements -- Test with prepared statements
SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0; SELECT COUNT(*) FROM users_test_table WHERE value_1 = 0;
count count
@ -653,17 +651,15 @@ FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id) FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
ERROR: a join with USING causes an internal naming conflict, use ON instead ERROR: a join with USING causes an internal naming conflict, use ON instead
-- We can not pushdown query if there is no partition key equality -- Non-pushdownable subqueries, but will be handled through recursive planning
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 1 SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1) WHERE user_id IN (SELECT Count(value_1)
FROM events_test_table FROM events_test_table
GROUP BY user_id); GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = (SELECT Count(*) SET value_1 = (SELECT Count(*)
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
WHERE user_id IN (SELECT user_id WHERE user_id IN (SELECT user_id
@ -671,7 +667,6 @@ WHERE user_id IN (SELECT user_id
UNION UNION
SELECT value_1 SELECT value_1
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
WHERE user_id IN (SELECT user_id WHERE user_id IN (SELECT user_id
@ -680,7 +675,6 @@ WHERE user_id IN (SELECT user_id
SELECT Sum(value_1) SELECT Sum(value_1)
FROM events_test_table FROM events_test_table
GROUP BY user_id); GROUP BY user_id);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
UPDATE users_test_table UPDATE users_test_table
SET value_2 = (SELECT value_3 SET value_2 = (SELECT value_3
FROM users_test_table); FROM users_test_table);
@ -708,8 +702,6 @@ WHERE user_id IN
INTERSECT INTERSECT
SELECT user_id SELECT user_id
FROM events_test_table); FROM events_test_table);
ERROR: cannot push down this subquery
DETAIL: Intersect and Except are currently unsupported
-- Reference tables can not locate on the outer part of the outer join -- Reference tables can not locate on the outer part of the outer join
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 4 SET value_1 = 4
@ -730,25 +722,25 @@ SET value_2 = 5 * random()
FROM events_test_table FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id; WHERE users_test_table.user_id = events_test_table.user_id;
ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE ERROR: functions used in UPDATE queries on distributed tables must not be VOLATILE
-- Volatile functions in a subquery are recursively planned
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
-- Local tables are not supported -- Local tables are not supported
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
FROM events_test_table_local FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id; WHERE users_test_table.user_id = events_test_table_local.user_id;
ERROR: relation events_test_table_local is not distributed ERROR: relation events_test_table_local is not distributed
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
ERROR: relation events_test_table_local is not distributed
UPDATE events_test_table_local UPDATE events_test_table_local
SET value_2 = 5 SET value_2 = 5
FROM users_test_table FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id; WHERE events_test_table_local.user_id = users_test_table.user_id;
ERROR: relation events_test_table_local is not distributed ERROR: relation events_test_table_local is not distributed
-- Local tables in a subquery are supported through recursive planning
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
-- Shard counts of tables must be equal to pushdown the query -- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
@ -761,7 +753,8 @@ DETAIL: Shards of relations in subquery need to have 1-to-1 shard partitioning
DELETE FROM users_test_table DELETE FROM users_test_table
WHERE users_test_table.user_id = (SELECT user_id WHERE users_test_table.user_id = (SELECT user_id
FROM events_test_table); FROM events_test_table);
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637
-- Cursors are not supported -- Cursors are not supported
BEGIN; BEGIN;
DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table; DECLARE test_cursor CURSOR FOR SELECT * FROM users_test_table;

View File

@ -0,0 +1,176 @@
CREATE SCHEMA recursive_dml_queries_mx;
SET search_path TO recursive_dml_queries_mx, public;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE recursive_dml_queries_mx.distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE recursive_dml_queries_mx.second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE recursive_dml_queries_mx.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
create_reference_table
------------------------
(1 row)
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- the subquery foo is recursively planned
UPDATE
reference_table
SET
name = 'new_' || name
FROM
(
SELECT
avg(second_distributed_table.tenant_id::int) as avg_tenant_id
FROM
second_distributed_table
) as foo
WHERE
foo.avg_tenant_id::int::text = reference_table.id;
DEBUG: only reference tables may be queried when targeting a reference table with multi shard UPDATE/DELETE queries with multiple tables
DEBUG: generating subplan 4_1 for subquery SELECT avg((tenant_id)::integer) AS avg_tenant_id FROM recursive_dml_queries_mx.second_distributed_table
DEBUG: Plan 4 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries_mx.reference_table SET name = ('new_'::text || reference_table.name) FROM (SELECT intermediate_result.avg_tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(avg_tenant_id numeric)) foo WHERE (((foo.avg_tenant_id)::integer)::text = reference_table.id)
-- the subquery foo is recursively planned
-- but note that the subquery foo itself is pushdownable
UPDATE
second_distributed_table
SET
dept = foo.max_dept * 2
FROM
(
SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
) foo_inner
GROUP BY
tenant_id
ORDER BY 1 DESC
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (2);
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 6_1 for subquery SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) AS max_dept FROM (SELECT second_distributed_table.dept, second_distributed_table.tenant_id FROM recursive_dml_queries_mx.second_distributed_table, recursive_dml_queries_mx.distributed_table WHERE (distributed_table.tenant_id = second_distributed_table.tenant_id)) foo_inner GROUP BY tenant_id ORDER BY tenant_id DESC
DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE recursive_dml_queries_mx.second_distributed_table SET dept = (foo.max_dept * 2) FROM (SELECT intermediate_result.tenant_id, intermediate_result.max_dept FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, max_dept integer)) foo WHERE ((foo.tenant_id <> second_distributed_table.tenant_id) AND (second_distributed_table.dept = 2))
-- run some queries from worker nodes
\c - - - :worker_1_port
SET search_path TO recursive_dml_queries_mx, public;
-- the subquery foo is recursively planned
-- and foo itself is a non colocated subquery and recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
) foo_inner_1,
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (4,5)
)foo_inner_2
WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (3);
-- use the second worker
\c - - - :worker_2_port
SET search_path TO recursive_dml_queries_mx, public;
CREATE TABLE recursive_dml_queries_mx.local_table (id text, name text);
INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
CREATE VIEW tenant_ids AS
SELECT
tenant_id, name
FROM
distributed_table, reference_table
WHERE
distributed_table.dept::text = reference_table.id
ORDER BY 2 DESC, 1 DESC;
-- we currently do not allow local tables in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(local_table.id::int) as avg_tenant_id
FROM
local_table
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
tenant_id | dept | info
-----------+------+------------------------
50 | 50 | {"f1": 50, "f2": 2500}
(1 row)
-- we currently do not allow views in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(tenant_id::int) as avg_tenant_id
FROM
tenant_ids
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
tenant_id | dept | info
-----------+------+------------------------
50 | 50 | {"f1": 50, "f2": 2500}
(1 row)
DROP TABLE local_table;
\c - - - :master_port
SET search_path TO recursive_dml_queries_mx, public;
RESET client_min_messages;
DROP SCHEMA recursive_dml_queries_mx CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table distributed_table
drop cascades to table second_distributed_table
drop cascades to table reference_table
RESET citus.shard_replication_factor;
RESET citus.replication_model;

View File

@ -0,0 +1,87 @@
CREATE SCHEMA recursive_dml_with_different_planner_executors;
SET search_path TO recursive_dml_with_different_planner_executors, public;
CREATE TABLE distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE reference_table (id text, name text);
SELECT create_reference_table('reference_table');
create_reference_table
------------------------
(1 row)
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- subquery with router planner
-- joined with a real-time query
UPDATE
distributed_table
SET dept = foo.dept FROM
(SELECT tenant_id, dept FROM second_distributed_table WHERE dept = 1 ) as foo,
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4) OFFSET 0) as bar
WHERE foo.tenant_id = bar.tenant_id
AND distributed_table.tenant_id = bar.tenant_id;
DEBUG: cannot push down this subquery
DETAIL: Offset clause is currently unsupported when a subquery references a column from another query
DEBUG: generating subplan 3_1 for subquery SELECT tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (dept = ANY (ARRAY[1, 2, 3, 4])) OFFSET 0
DEBUG: Plan 3 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.dept FROM (SELECT second_distributed_table.tenant_id, second_distributed_table.dept FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (second_distributed_table.dept = 1)) foo, (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) bar WHERE ((foo.tenant_id = bar.tenant_id) AND (distributed_table.tenant_id = bar.tenant_id))
-- a non colocated subquery inside the UPDATE
UPDATE distributed_table SET dept = foo.max_dept FROM
(
SELECT
max(dept) as max_dept
FROM
(SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table
WHERE tenant_id NOT IN
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4))
) as foo WHERE foo.max_dept > dept * 3;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 5_1 for subquery SELECT tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (dept = ANY (ARRAY[1, 2, 3, 4]))
DEBUG: generating subplan 5_2 for subquery SELECT max(dept) AS max_dept FROM (SELECT DISTINCT distributed_table_1.tenant_id, distributed_table_1.dept FROM recursive_dml_with_different_planner_executors.distributed_table distributed_table_1) distributed_table WHERE (NOT (tenant_id IN (SELECT intermediate_result.tenant_id FROM read_intermediate_result('5_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text))))
DEBUG: Plan 5 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.max_dept FROM (SELECT intermediate_result.max_dept FROM read_intermediate_result('5_2'::text, 'binary'::citus_copy_format) intermediate_result(max_dept integer)) foo WHERE (foo.max_dept > (distributed_table.dept * 3))
-- subquery with repartition query
SET citus.enable_repartition_joins to ON;
UPDATE distributed_table SET dept = foo.some_tenants::int FROM
(
SELECT
DISTINCT second_distributed_table.tenant_id as some_tenants
FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept
) as foo;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: cannot use real time executor with repartition jobs
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT second_distributed_table.tenant_id AS some_tenants FROM recursive_dml_with_different_planner_executors.second_distributed_table, recursive_dml_with_different_planner_executors.distributed_table WHERE (second_distributed_table.dept = distributed_table.dept)
DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = (foo.some_tenants)::integer FROM (SELECT intermediate_result.some_tenants FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(some_tenants text)) foo
SET citus.enable_repartition_joins to OFF;
-- final query is router
UPDATE distributed_table SET dept = foo.max_dept FROM
(
SELECT
max(dept) as max_dept
FROM
(SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table
WHERE tenant_id IN
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4))
) as foo WHERE foo.max_dept >= dept and tenant_id = '8';
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 10_1 for subquery SELECT max(dept) AS max_dept FROM (SELECT DISTINCT distributed_table_1.tenant_id, distributed_table_1.dept FROM recursive_dml_with_different_planner_executors.distributed_table distributed_table_1) distributed_table WHERE (tenant_id IN (SELECT second_distributed_table.tenant_id FROM recursive_dml_with_different_planner_executors.second_distributed_table WHERE (second_distributed_table.dept = ANY (ARRAY[1, 2, 3, 4]))))
DEBUG: Plan 10 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = foo.max_dept FROM (SELECT intermediate_result.max_dept FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(max_dept integer)) foo WHERE ((foo.max_dept >= distributed_table.dept) AND (distributed_table.tenant_id = '8'::text))
RESET client_min_messages;
DROP SCHEMA recursive_dml_with_different_planner_executors CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table distributed_table
drop cascades to table second_distributed_table
drop cascades to table reference_table
SET search_path TO public;

View File

@ -0,0 +1,180 @@
CREATE SCHEMA with_dml;
SET search_path TO with_dml, public;
CREATE TABLE with_dml.distributed_table (tenant_id text PRIMARY KEY, dept int);
SELECT create_distributed_table('distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE with_dml.second_distributed_table (tenant_id text, dept int);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE with_dml.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
create_reference_table
------------------------
(1 row)
INSERT INTO distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- delete all tenants from the reference table whose dept is 1
WITH ids_to_delete AS (
SELECT tenant_id FROM distributed_table WHERE dept = 1
)
DELETE FROM reference_table WHERE id IN (SELECT tenant_id FROM ids_to_delete);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 4_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 1)
DEBUG: Plan 4 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('4_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete))
-- update the name of the users whose dept is 2
WITH ids_to_update AS (
SELECT tenant_id FROM distributed_table WHERE dept = 2
)
UPDATE reference_table SET name = 'new_' || name WHERE id IN (SELECT tenant_id FROM ids_to_update);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 6_1 for CTE ids_to_update: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 2)
DEBUG: Plan 6 query after replacing subqueries and CTEs: UPDATE with_dml.reference_table SET name = ('new_'::text || name) WHERE (id IN (SELECT ids_to_update.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_update))
-- now the CTE is also modifying
WITH ids_deleted_3 AS
(
DELETE FROM distributed_table WHERE dept = 3 RETURNING tenant_id
),
ids_deleted_4 AS
(
DELETE FROM distributed_table WHERE dept = 4 RETURNING tenant_id
)
DELETE FROM reference_table WHERE id IN (SELECT * FROM ids_deleted_3 UNION SELECT * FROM ids_deleted_4);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 8_1 for CTE ids_deleted_3: DELETE FROM with_dml.distributed_table WHERE (dept = 3) RETURNING tenant_id
DEBUG: generating subplan 8_2 for CTE ids_deleted_4: DELETE FROM with_dml.distributed_table WHERE (dept = 4) RETURNING tenant_id
DEBUG: generating subplan 8_3 for subquery SELECT ids_deleted_3.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_3 UNION SELECT ids_deleted_4.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_deleted_4
DEBUG: Plan 8 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT intermediate_result.tenant_id FROM read_intermediate_result('8_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)))
-- now the final UPDATE command is pushdownable
WITH ids_to_delete AS
(
SELECT tenant_id FROM distributed_table WHERE dept = 5
)
UPDATE
distributed_table
SET
dept = dept + 1
FROM
ids_to_delete, (SELECT tenant_id FROM distributed_table WHERE tenant_id::int < 60) as some_tenants
WHERE
some_tenants.tenant_id = ids_to_delete.tenant_id
AND distributed_table.tenant_id = some_tenants.tenant_id;
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 12_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table WHERE (dept = 5)
DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE with_dml.distributed_table SET dept = (distributed_table.dept + 1) FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete, (SELECT distributed_table_1.tenant_id FROM with_dml.distributed_table distributed_table_1 WHERE ((distributed_table_1.tenant_id)::integer < 60)) some_tenants WHERE ((some_tenants.tenant_id = ids_to_delete.tenant_id) AND (distributed_table.tenant_id = some_tenants.tenant_id))
-- this query errors out since we've some hard
-- errors in the INSERT ... SELECT pushdown
-- which prevents to fallback to recursive planning
WITH ids_to_upsert AS
(
SELECT tenant_id FROM distributed_table WHERE dept > 7
)
INSERT INTO distributed_table
SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table
WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id
ON CONFLICT (tenant_id) DO UPDATE SET dept = 8;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Select query cannot be pushed down to the worker.
-- the following query is very similar to the above one
-- but this time the query is pulled to coordinator since
-- we return before hitting any hard errors
WITH ids_to_insert AS
(
SELECT (tenant_id::int * 100)::text as tenant_id FROM distributed_table WHERE dept > 7
)
INSERT INTO distributed_table
SELECT DISTINCT ids_to_insert.tenant_id FROM ids_to_insert, distributed_table
WHERE distributed_table.tenant_id < ids_to_insert.tenant_id;
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column.
HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery.
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: generating subplan 16_1 for CTE ids_to_insert: SELECT (((tenant_id)::integer * 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept > 7)
DEBUG: generating subplan 16_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id < ids_to_insert.tenant_id)
DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) citus_insert_select_subquery
-- not a very meaningful query
-- but has two modifying CTEs along with another
-- modify statement
WITH copy_to_other_table AS (
INSERT INTO distributed_table
SELECT *
FROM second_distributed_table
WHERE dept = 3
ON CONFLICT (tenant_id) DO UPDATE SET dept = 4
RETURNING *
),
main_table_deleted AS (
DELETE
FROM distributed_table
WHERE dept < 10
AND NOT EXISTS (SELECT 1 FROM second_distributed_table
WHERE second_distributed_table.dept = 1
AND second_distributed_table.tenant_id = distributed_table.tenant_id)
RETURNING *
)
INSERT INTO second_distributed_table
SELECT *
FROM main_table_deleted
EXCEPT
SELECT *
FROM copy_to_other_table;
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 20_1 for CTE copy_to_other_table: INSERT INTO with_dml.distributed_table (tenant_id, dept) SELECT tenant_id, dept FROM with_dml.second_distributed_table WHERE (dept = 3) ON CONFLICT(tenant_id) DO UPDATE SET dept = 4 RETURNING distributed_table.tenant_id, distributed_table.dept
DEBUG: generating subplan 20_2 for CTE main_table_deleted: DELETE FROM with_dml.distributed_table WHERE ((dept < 10) AND (NOT (EXISTS (SELECT 1 FROM with_dml.second_distributed_table WHERE ((second_distributed_table.dept = 1) AND (second_distributed_table.tenant_id = distributed_table.tenant_id)))))) RETURNING tenant_id, dept
DEBUG: generating subplan 20_3 for subquery SELECT main_table_deleted.tenant_id, main_table_deleted.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) main_table_deleted EXCEPT SELECT copy_to_other_table.tenant_id, copy_to_other_table.dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) copy_to_other_table
DEBUG: Plan 20 query after replacing subqueries and CTEs: SELECT tenant_id, dept FROM (SELECT intermediate_result.tenant_id, intermediate_result.dept FROM read_intermediate_result('20_3'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text, dept integer)) citus_insert_select_subquery
-- CTE inside the UPDATE statement
UPDATE
second_distributed_table
SET dept =
(WITH vals AS (
SELECT DISTINCT tenant_id::int FROM distributed_table
) select * from vals where tenant_id = 8 )
WHERE dept = 8;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 24_1 for CTE vals: SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table
DEBUG: Plan 24 query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT vals.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('24_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) vals WHERE (vals.tenant_id = 8)) WHERE (dept = 8)
-- Subquery inside the UPDATE statement
UPDATE
second_distributed_table
SET dept =
(SELECT DISTINCT tenant_id::int FROM distributed_table WHERE tenant_id = '9')
WHERE dept = 8;
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
DEBUG: generating subplan 26_1 for subquery SELECT DISTINCT (tenant_id)::integer AS tenant_id FROM with_dml.distributed_table WHERE (tenant_id = '9'::text)
DEBUG: Plan 26 query after replacing subqueries and CTEs: UPDATE with_dml.second_distributed_table SET dept = (SELECT intermediate_result.tenant_id FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) WHERE (dept = 8)
-- delete all remaining tenants
WITH ids_to_delete AS (
SELECT tenant_id FROM distributed_table
)
DELETE FROM distributed_table WHERE tenant_id = ANY(SELECT tenant_id FROM ids_to_delete);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 28_1 for CTE ids_to_delete: SELECT tenant_id FROM with_dml.distributed_table
DEBUG: Plan 28 query after replacing subqueries and CTEs: DELETE FROM with_dml.distributed_table WHERE (tenant_id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('28_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_delete))
WITH ids_to_delete AS (
SELECT id FROM reference_table
)
DELETE FROM reference_table WHERE id = ANY(SELECT id FROM ids_to_delete);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 30_1 for CTE ids_to_delete: SELECT id FROM with_dml.reference_table
DEBUG: Plan 30 query after replacing subqueries and CTEs: DELETE FROM with_dml.reference_table WHERE (id IN (SELECT ids_to_delete.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('30_1'::text, 'binary'::citus_copy_format) intermediate_result(id text)) ids_to_delete))
RESET client_min_messages;
DROP SCHEMA with_dml CASCADE;
NOTICE: drop cascades to 3 other objects
DETAIL: drop cascades to table distributed_table
drop cascades to table second_distributed_table
drop cascades to table reference_table

View File

@ -201,12 +201,10 @@ WITH cte AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
) )
DELETE FROM modify_table WHERE id IN (SELECT value_2 FROM cte); DELETE FROM modify_table WHERE id IN (SELECT value_2 FROM cte);
ERROR: common table expressions are not supported in distributed modifications
WITH cte AS ( WITH cte AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
) )
UPDATE modify_table SET val=-1 WHERE val IN (SELECT * FROM cte); UPDATE modify_table SET val=-1 WHERE val IN (SELECT * FROM cte);
ERROR: common table expressions are not supported in distributed modifications
WITH cte AS ( WITH cte AS (
WITH basic AS ( WITH basic AS (
SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3) SELECT value_2 FROM users_table WHERE user_id IN (1, 2, 3)
@ -214,7 +212,7 @@ WITH cte AS (
INSERT INTO modify_table (SELECT * FROM basic) RETURNING * INSERT INTO modify_table (SELECT * FROM basic) RETURNING *
) )
UPDATE modify_table SET val=-2 WHERE id IN (SELECT id FROM cte); UPDATE modify_table SET val=-2 WHERE id IN (SELECT id FROM cte);
ERROR: common table expressions are not supported in distributed modifications ERROR: RETURNING is not supported in INSERT ... SELECT via coordinator
WITH cte AS ( WITH cte AS (
WITH basic AS ( WITH basic AS (
SELECT * FROM events_table WHERE event_type = 5 SELECT * FROM events_table WHERE event_type = 5
@ -237,10 +235,10 @@ INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY
SELECT * FROM summary_table ORDER BY id; SELECT * FROM summary_table ORDER BY id;
id | counter id | counter
----+--------- ----+---------
2 | 153 2 | 20
3 | 184 3 | 38
4 | 160 4 | 24
5 | 170 5 | 27
(4 rows) (4 rows)
SELECT COUNT(*) FROM modify_table; SELECT COUNT(*) FROM modify_table;
@ -259,11 +257,11 @@ SELECT * FROM summary_table ORDER BY id, counter;
----+--------- ----+---------
1 | 1 1 | 1
2 | 1 2 | 1
2 | 153 2 | 20
3 | 1 3 | 1
3 | 184 3 | 38
4 | 160 4 | 24
5 | 170 5 | 27
(7 rows) (7 rows)
SELECT COUNT(*) FROM modify_table; SELECT COUNT(*) FROM modify_table;
@ -310,10 +308,10 @@ SELECT * FROM summary_table ORDER BY id, counter;
id | counter id | counter
----+--------- ----+---------
1 | 1 1 | 1
2 | 154 2 | 21
3 | 185 3 | 39
4 | 160 4 | 24
5 | 170 5 | 27
(5 rows) (5 rows)
WITH added_data AS ( WITH added_data AS (
@ -334,10 +332,10 @@ SELECT * FROM summary_table ORDER BY id, counter;
----+--------- ----+---------
1 | 1 1 | 1
1 | 1 1 | 1
2 | 154 2 | 21
3 | 185 3 | 39
4 | 160 4 | 24
5 | 170 5 | 27
(6 rows) (6 rows)
-- Merge rows in the summary_table -- Merge rows in the summary_table
@ -349,10 +347,10 @@ SELECT * FROM summary_table ORDER BY id, counter;
id | counter id | counter
----+--------- ----+---------
1 | 2 1 | 2
2 | 154 2 | 21
3 | 185 3 | 39
4 | 160 4 | 24
5 | 170 5 | 27
(5 rows) (5 rows)
SELECT * FROM modify_table ORDER BY id, val; SELECT * FROM modify_table ORDER BY id, val;
@ -378,8 +376,6 @@ raw_data AS (
DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING * DELETE FROM modify_table WHERE id >= (SELECT min(id) FROM select_data WHERE id > 10) RETURNING *
) )
INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id; INSERT INTO summary_table SELECT id, COUNT(*) AS counter FROM raw_data GROUP BY id;
ERROR: cannot push down this subquery
DETAIL: Aggregates without group by are currently unsupported when a subquery references a column from another query
INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3); INSERT INTO modify_table VALUES (21, 1), (22, 2), (23, 3);
-- read ids from the same table -- read ids from the same table
WITH distinct_ids AS ( WITH distinct_ids AS (
@ -392,7 +388,7 @@ update_data AS (
SELECT count(*) FROM update_data; SELECT count(*) FROM update_data;
count count
------- -------
6 3
(1 row) (1 row)
-- read ids from a different table -- read ids from a different table
@ -420,7 +416,7 @@ WITH update_data AS (
SELECT COUNT(*) FROM update_data; SELECT COUNT(*) FROM update_data;
count count
------- -------
2 1
(1 row) (1 row)
WITH delete_rows AS ( WITH delete_rows AS (
@ -429,13 +425,10 @@ WITH delete_rows AS (
SELECT * FROM delete_rows ORDER BY id, val; SELECT * FROM delete_rows ORDER BY id, val;
id | val id | val
----+----- ----+-----
11 | 100
12 | 300
13 | 100
21 | 300 21 | 300
22 | 200 22 | 200
23 | 100 23 | 100
(6 rows) (3 rows)
WITH delete_rows AS ( WITH delete_rows AS (
DELETE FROM summary_table WHERE id > 10 RETURNING * DELETE FROM summary_table WHERE id > 10 RETURNING *
@ -443,7 +436,10 @@ WITH delete_rows AS (
SELECT * FROM delete_rows ORDER BY id, counter; SELECT * FROM delete_rows ORDER BY id, counter;
id | counter id | counter
----+--------- ----+---------
(0 rows) 11 | 1
12 | 1
13 | 1
(3 rows)
-- Check modifiying CTEs inside a transaction -- Check modifiying CTEs inside a transaction
BEGIN; BEGIN;
@ -467,11 +463,11 @@ SELECT * FROM summary_table ORDER BY id, counter;
1 | 1 1 | 1
1 | 2 1 | 2
2 | 1 2 | 1
2 | 154 2 | 21
3 | 1 3 | 1
3 | 185 3 | 39
4 | 160 4 | 24
5 | 170 5 | 27
(8 rows) (8 rows)
SELECT * FROM modify_table ORDER BY id, val; SELECT * FROM modify_table ORDER BY id, val;
@ -493,10 +489,10 @@ SELECT * FROM summary_table ORDER BY id, counter;
id | counter id | counter
----+--------- ----+---------
1 | 2 1 | 2
2 | 154 2 | 21
3 | 185 3 | 39
4 | 160 4 | 24
5 | 170 5 | 27
(5 rows) (5 rows)
SELECT * FROM modify_table ORDER BY id, val; SELECT * FROM modify_table ORDER BY id, val;
@ -548,7 +544,11 @@ WITH deleted_rows AS (
DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING * DELETE FROM modify_table WHERE val IN (SELECT val FROM modify_table WHERE id = 3) RETURNING *
) )
SELECT * FROM deleted_rows; SELECT * FROM deleted_rows;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator id | val
----+-----
3 | 6
(1 row)
WITH select_rows AS ( WITH select_rows AS (
SELECT val FROM modify_table WHERE id = 3 SELECT val FROM modify_table WHERE id = 3
), ),
@ -558,14 +558,14 @@ deleted_rows AS (
SELECT * FROM deleted_rows; SELECT * FROM deleted_rows;
id | val id | val
----+----- ----+-----
3 | 6 (0 rows)
(1 row)
WITH deleted_rows AS ( WITH deleted_rows AS (
DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING * DELETE FROM modify_table WHERE ctid IN (SELECT ctid FROM modify_table WHERE id = 1) RETURNING *
) )
SELECT * FROM deleted_rows; SELECT * FROM deleted_rows;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator ERROR: cannot perform distributed planning for the given modification
DETAIL: Recursively planned distributed modifications with ctid on where clause are not supported.
WITH select_rows AS ( WITH select_rows AS (
SELECT ctid FROM modify_table WHERE id = 1 SELECT ctid FROM modify_table WHERE id = 1
), ),
@ -646,7 +646,10 @@ raw_data AS (
RETURNING id, val RETURNING id, val
) )
SELECT * FROM raw_data ORDER BY val; SELECT * FROM raw_data ORDER BY val;
ERROR: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator id | val
----+-----
(0 rows)
-- Test with replication factor 2 -- Test with replication factor 2
SET citus.shard_replication_factor to 2; SET citus.shard_replication_factor to 2;
DROP TABLE modify_table; DROP TABLE modify_table;

View File

@ -0,0 +1,113 @@
CREATE SCHEMA with_transactions;
SET search_path TO with_transactions, public;
SET citus.shard_count TO 4;
SET citus.next_placement_id TO 800000;
CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz);
SELECT create_distributed_table('raw_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz);
SELECT create_distributed_table('second_raw_table', 'tenant_id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO
raw_table (tenant_id, income, created_at)
SELECT
i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day'
FROM
generate_series (0, 100) i;
INSERT INTO second_raw_table SELECT * FROM raw_table;
SET client_min_messages TO DEBUG1;
-- run a transaction which DELETE
BEGIN;
WITH ids_to_delete AS
(
SELECT tenant_id FROM raw_table WHERE income < 250
),
deleted_ids AS
(
DELETE FROM raw_table WHERE created_at < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id
)
UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 3_1 for CTE ids_to_delete: SELECT tenant_id FROM with_transactions.raw_table WHERE (income < (250)::double precision)
DEBUG: generating subplan 3_2 for CTE deleted_ids: DELETE FROM with_transactions.raw_table WHERE ((created_at < 'Mon Feb 10 20:00:00 2014 PST'::timestamp with time zone) AND (tenant_id IN (SELECT ids_to_delete.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_to_delete))) RETURNING tenant_id
DEBUG: Plan 3 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET income = (income * (2)::double precision) WHERE (tenant_id IN (SELECT deleted_ids.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('3_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) deleted_ids))
ROLLBACK;
-- see that both UPDATE and DELETE commands are rollbacked
SELECT count(*) FROM raw_table;
count
-------
101
(1 row)
SELECT max(income) FROM raw_table;
max
------
1000
(1 row)
-- multi-statement multi shard modifying statements should work
BEGIN;
SELECT count (*) FROM second_raw_table;
count
-------
101
(1 row)
WITH distinct_count AS (
SELECT count(DISTINCT created_at) FROM raw_table
),
ids_inserted AS
(
INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id
)
UPDATE raw_table SET created_at = '2001-02-10 20:00:00'
WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 9_1 for CTE distinct_count: SELECT count(DISTINCT created_at) AS count FROM with_transactions.raw_table
DEBUG: generating subplan 9_2 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11, 1000, now()) RETURNING tenant_id
DEBUG: Plan 9 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET created_at = 'Sat Feb 10 20:00:00 2001 PST'::timestamp with time zone WHERE ((tenant_id IN (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('9_2'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) AND (tenant_id < (SELECT distinct_count.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('9_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) distinct_count)))
TRUNCATE second_raw_table;
COMMIT;
-- sequential insert followed by parallel update causes execution issues
WITH ids_inserted AS
(
INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id
)
UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted);
DEBUG: common table expressions are not supported in distributed modifications
DEBUG: generating subplan 12_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id, income, created_at) VALUES (11,1000,now()), (12,1000,now()), (13,1000,now()) RETURNING raw_table.tenant_id
DEBUG: Plan 12 query after replacing subqueries and CTEs: UPDATE with_transactions.raw_table SET created_at = 'Sat Feb 10 20:00:00 2001 PST'::timestamp with time zone WHERE (tenant_id IN (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('12_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted))
ERROR: cannot establish a new connection for placement 800007, since DML has been executed on a connection that is in use
-- make sure that everything committed
SELECT count(*) FROM raw_table;
count
-------
102
(1 row)
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
count
-------
1
(1 row)
SELECT count(*) FROM second_raw_table;
count
-------
0
(1 row)
RESET client_min_messages;
RESET citus.shard_count;
DROP SCHEMA with_transactions CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table raw_table
drop cascades to table second_raw_table

View File

@ -23,7 +23,7 @@ test: multi_mx_copy_data multi_mx_router_planner
test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10
test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19
test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7 test: multi_mx_tpch_query3 multi_mx_tpch_query6 multi_mx_tpch_query7
test: multi_mx_tpch_query7_nested multi_mx_ddl test: multi_mx_tpch_query7_nested multi_mx_ddl recursive_dml_queries_mx
test: multi_mx_repartition_udt_prepare test: multi_mx_repartition_udt_prepare
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata test: multi_mx_metadata

View File

@ -32,7 +32,7 @@ test: multi_read_from_secondaries
test: multi_create_table test: multi_create_table
test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table test: multi_create_table_constraints multi_master_protocol multi_load_data multi_behavioral_analytics_create_table
test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select test: multi_behavioral_analytics_basics multi_behavioral_analytics_single_shard_queries multi_insert_select_non_pushable_queries multi_insert_select
test: multi_insert_select_window multi_shard_update_delete window_functions test: multi_insert_select_window multi_shard_update_delete window_functions dml_recursive recursive_dml_with_different_planners_executors
# ---------- # ----------
# Tests for partitioning support # Tests for partitioning support
@ -99,8 +99,8 @@ test: multi_repartition_join_planning multi_repartition_join_pruning multi_repar
# --------- # ---------
test: with_nested with_where with_basics with_prepare with_set_operations test: with_nested with_where with_basics with_prepare with_set_operations
test: with_modifying test: with_modifying cte_prepared_modify cte_nested_modification
test: with_executors with_join with_partitioning test: with_executors with_join with_partitioning with_transactions with_dml
# ---------- # ----------
# Tests to check our large record loading and shard deletion behavior # Tests to check our large record loading and shard deletion behavior

View File

@ -0,0 +1,105 @@
CREATE SCHEMA cte_nested_modifications;
SET search_path TO cte_nested_modifications, public;
CREATE TABLE tt1(id int, value_1 int);
INSERT INTO tt1 VALUES(1,2),(2,3),(3,4);
SELECT create_distributed_table('tt1','id');
CREATE TABLE tt2(id int, value_1 int);
INSERT INTO tt2 VALUES(3,3),(4,4),(5,5);
SELECT create_distributed_table('tt2','id');
CREATE TABLE tt3(id int, json_val json);
INSERT INTO tt3 VALUES(1, '{"prod_name":"name_1", "qty":"6"}'),
(2, '{"prod_name":"name_2", "qty":"4"}'),
(3, '{"prod_name":"name_3", "qty":"2"}');
-- DELETE within CTE and use it from UPDATE
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id - 1 as id
)
UPDATE tt1
SET value_1 = abs(2 + 3.5)
FROM cte_1
WHERE cte_1.id = tt1.id;
SELECT * FROM tt1 ORDER BY id;
ROLLBACK;
-- Similar to test above, now use the CTE in the SET part of the UPDATE
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id as id
)
UPDATE tt1
SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1);
SELECT * FROM tt1 ORDER BY id;
ROLLBACK;
-- Use alias in the definition of CTE, instead of in the RETURNING
BEGIN;
WITH cte_1(id) AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
DELETE FROM tt2
USING cte_2
WHERE tt2.id = cte_2.cte2_id
RETURNING cte2_id
)
UPDATE tt1
SET value_1 = (SELECT max(id) + abs(2 + 3.5) FROM cte_1);
SELECT * FROM tt1 ORDER BY id;
ROLLBACK;
-- Update within CTE and use it from Delete
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT id as cte2_id
FROM tt1
WHERE value_1 >= 2
)
UPDATE tt2
SET value_1 = 10
FROM cte_2
WHERE id = cte2_id
RETURNING id, value_1
)
DELETE FROM tt1
USING cte_1
WHERE tt1.id < cte_1.id;
SELECT * FROM tt1 ORDER BY id;
ROLLBACK;
-- Similar to test above, but use json column
BEGIN;
WITH cte_1 AS (
WITH cte_2 AS (
SELECT * FROM tt3
)
UPDATE tt2
SET value_1 = (SELECT max((json_val->>'qty')::int) FROM cte_2)
RETURNING id, value_1
)
DELETE FROM tt1
USING cte_1
WHERE tt1.id < cte_1.id;
SELECT * FROM tt1 ORDER BY id;
ROLLBACK;
DROP SCHEMA cte_nested_modifications CASCADE;

View File

@ -0,0 +1,52 @@
CREATE SCHEMA cte_prepared_modify;
SET search_path TO cte_prepared_modify, public;
CREATE TABLE tt1(id int, value_1 int);
INSERT INTO tt1 VALUES(1,2),(2,3),(3,4);
SELECT create_distributed_table('tt1','id');
CREATE TABLE tt2(id int, value_1 int);
INSERT INTO tt2 VALUES(3,3),(4,4),(5,5);
SELECT create_distributed_table('tt2','id');
-- Test with prepared statements (parameter used by SET)
PREPARE prepared_test(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = $1
FROM cte_1
WHERE tt2.id = cte_1.id;
-- Test with prepared statements (parameter used by WHERE on partition column)
PREPARE prepared_test_2(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = (SELECT max(id) FROM cte_1)
WHERE tt2.id = $1;
-- Test with prepared statements (parameter used by WHERE on non-partition column)
PREPARE prepared_test_3(integer) AS
WITH cte_1 AS(
SELECT * FROM tt1 WHERE id >= 2
)
UPDATE tt2
SET value_1 = (SELECT max(id) FROM cte_1)
WHERE tt2.value_1 = $1;
EXECUTE prepared_test(1);
EXECUTE prepared_test(2);
EXECUTE prepared_test(3);
EXECUTE prepared_test(4);
EXECUTE prepared_test(5);
EXECUTE prepared_test(6);
EXECUTE prepared_test(1);
EXECUTE prepared_test(2);
EXECUTE prepared_test(3);
EXECUTE prepared_test(4);
EXECUTE prepared_test(5);
EXECUTE prepared_test(6);
EXECUTE prepared_test_3(1);
EXECUTE prepared_test_3(2);
EXECUTE prepared_test_3(3);
EXECUTE prepared_test_3(4);
EXECUTE prepared_test_3(5);
EXECUTE prepared_test_3(6);
DROP SCHEMA cte_prepared_modify CASCADE;

View File

@ -0,0 +1,265 @@
CREATE SCHEMA recursive_dml_queries;
SET search_path TO recursive_dml_queries, public;
SET citus.next_shard_id TO 2370000;
CREATE TABLE recursive_dml_queries.distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
CREATE TABLE recursive_dml_queries.second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
CREATE TABLE recursive_dml_queries.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
CREATE TABLE recursive_dml_queries.local_table (id text, name text);
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
CREATE VIEW tenant_ids AS
SELECT
tenant_id, name
FROM
distributed_table, reference_table
WHERE
distributed_table.dept::text = reference_table.id
ORDER BY 2 DESC, 1 DESC;
SET client_min_messages TO DEBUG1;
-- the subquery foo is recursively planned
UPDATE
reference_table
SET
name = 'new_' || name
FROM
(
SELECT
avg(second_distributed_table.tenant_id::int) as avg_tenant_id
FROM
second_distributed_table
) as foo
WHERE
foo.avg_tenant_id::int::text = reference_table.id
RETURNING
reference_table.name;
-- the subquery foo is recursively planned
-- but note that the subquery foo itself is pushdownable
UPDATE
second_distributed_table
SET
dept = foo.max_dept * 2
FROM
(
SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
) foo_inner
GROUP BY
tenant_id
ORDER BY 1 DESC
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (2)
RETURNING
second_distributed_table.tenant_id, second_distributed_table.dept;
-- the subquery foo is recursively planned
-- and foo itself is a non colocated subquery and recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
) foo_inner_1,
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (4,5)
)foo_inner_2
WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (3);
-- we currently do not allow local tables in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(local_table.id::int) as avg_tenant_id
FROM
local_table
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
-- we currently do not allow views in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(tenant_id::int) as avg_tenant_id
FROM
tenant_ids
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
-- there is a lateral join (e.g., corrolated subquery) thus the subqueries cannot be
-- recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
)
foo_inner_1 JOIN LATERAL
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND foo_inner_1.dept = second_distributed_table.dept
AND
second_distributed_table.dept IN (4,5)
) foo_inner_2
ON (foo_inner_2.tenant_id != foo_inner_1.tenant_id)
) as foo
RETURNING *;
-- again a corrolated subquery
-- this time distribution key eq. exists
-- however recursive planning is prevented due to correlated subqueries
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT baz.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table as d1
WHERE
d1.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
AND
second_distributed_table.tenant_id IN
(
SELECT s2.tenant_id
FROM second_distributed_table as s2
GROUP BY d1.tenant_id, s2.tenant_id
)
) as baz
) as foo WHERE second_distributed_table.tenant_id = foo.tenant_id
RETURNING *;
-- we don't support subquerues/CTEs inside VALUES
INSERT INTO
second_distributed_table (tenant_id, dept)
VALUES ('3', (WITH vals AS (SELECT 3) select * from vals));
INSERT INTO
second_distributed_table (tenant_id, dept)
VALUES ('3', (SELECT 3));
-- DML with an unreferenced SELECT CTE
WITH cte_1 AS (
WITH cte_2 AS (
SELECT tenant_id as cte2_id
FROM second_distributed_table
WHERE dept >= 2
)
UPDATE distributed_table
SET dept = 10
RETURNING *
)
UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
EXPLAIN (COSTS FALSE) WITH cte_1 AS (
WITH cte_2 AS (
SELECT tenant_id as cte2_id
FROM second_distributed_table
WHERE dept >= 2
)
UPDATE distributed_table
SET dept = 10
RETURNING *
)
UPDATE distributed_table
SET dept = 5
FROM cte_1
WHERE distributed_table.tenant_id < cte_1.tenant_id;
-- we don't support updating local table with a join with
-- distributed tables
UPDATE
local_table
SET
id = 'citus_test'
FROM
distributed_table
WHERE
distributed_table.tenant_id = local_table.id;
RESET client_min_messages;
DROP SCHEMA recursive_dml_queries CASCADE;

View File

@ -1420,6 +1420,21 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6; INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP BY 1,2,3,4,5,6;
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
-- intermediate results (CTEs) should be allowed when doing INSERT...SELECT within a CTE
WITH series AS (
SELECT s AS val FROM generate_series(60,70) s
),
inserts AS (
INSERT INTO raw_events_second (user_id)
SELECT
user_id
FROM
raw_events_first JOIN series ON (value_1 = val)
RETURNING
NULL
)
SELECT count(*) FROM inserts;
-- we need this in our next test -- we need this in our next test
truncate raw_events_first; truncate raw_events_first;

View File

@ -1,5 +1,6 @@
SET citus.shard_count TO 32; SET citus.shard_count TO 32;
SET citus.next_shard_id TO 750000; SET citus.next_shard_id TO 750000;
SET citus.next_placement_id TO 750000;
-- =================================================================== -- ===================================================================
@ -168,13 +169,9 @@ INSERT INTO limit_orders VALUES (random() * 10 + 70000, 'GOOG', 5634, now(), 'bu
SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000; SELECT COUNT(*) FROM limit_orders WHERE id BETWEEN 70000 AND 90000;
-- Who says that? :) -- commands containing a CTE are supported
-- INSERT ... SELECT ... FROM commands are unsupported WITH deleted_orders AS (DELETE FROM limit_orders WHERE id < 0 RETURNING *)
-- INSERT INTO limit_orders SELECT * FROM limit_orders; INSERT INTO limit_orders SELECT * FROM deleted_orders;
-- commands containing a CTE are unsupported
WITH deleted_orders AS (DELETE FROM limit_orders RETURNING *)
INSERT INTO limit_orders DEFAULT VALUES;
-- test simple DELETE -- test simple DELETE
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
@ -194,15 +191,15 @@ SELECT COUNT(*) FROM limit_orders WHERE id = 246;
DELETE FROM limit_orders WHERE id = (2 * 123); DELETE FROM limit_orders WHERE id = (2 * 123);
SELECT COUNT(*) FROM limit_orders WHERE id = 246; SELECT COUNT(*) FROM limit_orders WHERE id = 246;
-- commands with a USING clause are unsupported -- commands with a USING clause are supported
CREATE TABLE bidders ( name text, id bigint ); CREATE TABLE bidders ( name text, id bigint );
DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND DELETE FROM limit_orders USING bidders WHERE limit_orders.id = 246 AND
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH new_orders AS (INSERT INTO limit_orders VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders; DELETE FROM limit_orders WHERE id < 0;
INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
@ -307,8 +304,8 @@ UPDATE limit_orders SET limit_price = 0.00 FROM bidders
limit_orders.bidder_id = bidders.id AND limit_orders.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
-- commands containing a CTE are unsupported -- the connection used for the INSERT is claimed by pull-push, causing the UPDATE to fail
WITH deleted_orders AS (INSERT INTO limit_orders DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders SET symbol = 'GM'; UPDATE limit_orders SET symbol = 'GM';
SELECT symbol, bidder_id FROM limit_orders WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders WHERE id = 246;
@ -572,12 +569,16 @@ WHERE id = 1;
SELECT * FROM summary_table ORDER BY id; SELECT * FROM summary_table ORDER BY id;
-- unsupported multi-shard updates -- multi-shard updates with recursively planned subqueries
BEGIN;
UPDATE summary_table SET average_value = average_query.average FROM ( UPDATE summary_table SET average_value = average_query.average FROM (
SELECT avg(value) AS average FROM raw_table) average_query; SELECT avg(value) AS average FROM raw_table) average_query;
ROLLBACK;
BEGIN;
UPDATE summary_table SET average_value = average_value + 1 WHERE id = UPDATE summary_table SET average_value = average_value + 1 WHERE id =
(SELECT id FROM raw_table WHERE value > 100); (SELECT id FROM raw_table WHERE value > 100 LIMIT 1);
ROLLBACK;
-- test complex queries -- test complex queries
UPDATE summary_table UPDATE summary_table

View File

@ -84,9 +84,9 @@ INSERT INTO limit_orders_mx VALUES (2037, 'GOOG', 5634, now(), 'buy', random()),
-- connect back to the other node -- connect back to the other node
\c - - - :worker_1_port \c - - - :worker_1_port
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (DELETE FROM limit_orders_mx RETURNING *) WITH deleted_orders AS (DELETE FROM limit_orders_mx WHERE id < 0 RETURNING *)
INSERT INTO limit_orders_mx DEFAULT VALUES; INSERT INTO limit_orders_mx SELECT * FROM deleted_orders;
-- test simple DELETE -- test simple DELETE
INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69); INSERT INTO limit_orders_mx VALUES (246, 'TSLA', 162, '2007-07-02 16:32:15', 'sell', 20.69);
@ -115,9 +115,9 @@ DELETE FROM limit_orders_mx USING bidders WHERE limit_orders_mx.id = 246 AND
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH new_orders AS (INSERT INTO limit_orders_mx VALUES (411, 'FLO', 12, '2017-07-02 16:32:15', 'buy', 66))
DELETE FROM limit_orders_mx; DELETE FROM limit_orders_mx WHERE id < 0;
-- cursors are not supported -- cursors are not supported
DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name; DELETE FROM limit_orders_mx WHERE CURRENT OF cursor_name;
@ -161,8 +161,8 @@ UPDATE limit_orders_mx SET limit_price = 0.00 FROM bidders
limit_orders_mx.bidder_id = bidders.id AND limit_orders_mx.bidder_id = bidders.id AND
bidders.name = 'Bernie Madoff'; bidders.name = 'Bernie Madoff';
-- commands containing a CTE are unsupported -- commands containing a CTE are supported
WITH deleted_orders AS (INSERT INTO limit_orders_mx DEFAULT VALUES RETURNING *) WITH deleted_orders AS (INSERT INTO limit_orders_mx VALUES (399, 'PDR', 14, '2017-07-02 16:32:15', 'sell', 43))
UPDATE limit_orders_mx SET symbol = 'GM'; UPDATE limit_orders_mx SET symbol = 'GM';
SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246; SELECT symbol, bidder_id FROM limit_orders_mx WHERE id = 246;

View File

@ -314,7 +314,7 @@ WHERE user_id IN (SELECT user_id
FROM users_test_table FROM users_test_table
UNION ALL UNION ALL
SELECT user_id SELECT user_id
FROM events_test_table) returning *; FROM events_test_table) returning value_3;
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 5 SET value_1 = 5
@ -438,7 +438,7 @@ UPDATE users_test_table
SET value_2 = 6 SET value_2 = 6
WHERE value_1 IN (SELECT 2); WHERE value_1 IN (SELECT 2);
-- Can only use immutable functions -- Function calls in subqueries will be recursively planned
UPDATE test_table_1 UPDATE test_table_1
SET col_3 = 6 SET col_3 = 6
WHERE date_col IN (SELECT now()); WHERE date_col IN (SELECT now());
@ -505,7 +505,7 @@ FROM users_test_table
FULL OUTER JOIN events_test_table e2 USING (user_id) FULL OUTER JOIN events_test_table e2 USING (user_id)
WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2; WHERE e2.user_id = events_test_table.user_id RETURNING events_test_table.value_2;
-- We can not pushdown query if there is no partition key equality -- Non-pushdownable subqueries, but will be handled through recursive planning
UPDATE users_test_table UPDATE users_test_table
SET value_1 = 1 SET value_1 = 1
WHERE user_id IN (SELECT Count(value_1) WHERE user_id IN (SELECT Count(value_1)
@ -580,6 +580,7 @@ SET value_2 = 5 * random()
FROM events_test_table FROM events_test_table
WHERE users_test_table.user_id = events_test_table.user_id; WHERE users_test_table.user_id = events_test_table.user_id;
-- Volatile functions in a subquery are recursively planned
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5
WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table); WHERE users_test_table.user_id IN (SELECT user_id * random() FROM events_test_table);
@ -590,15 +591,16 @@ SET value_2 = 5
FROM events_test_table_local FROM events_test_table_local
WHERE users_test_table.user_id = events_test_table_local.user_id; WHERE users_test_table.user_id = events_test_table_local.user_id;
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
UPDATE events_test_table_local UPDATE events_test_table_local
SET value_2 = 5 SET value_2 = 5
FROM users_test_table FROM users_test_table
WHERE events_test_table_local.user_id = users_test_table.user_id; WHERE events_test_table_local.user_id = users_test_table.user_id;
-- Local tables in a subquery are supported through recursive planning
UPDATE users_test_table
SET value_2 = 5
WHERE users_test_table.user_id IN(SELECT user_id FROM events_test_table_local);
-- Shard counts of tables must be equal to pushdown the query -- Shard counts of tables must be equal to pushdown the query
UPDATE users_test_table UPDATE users_test_table
SET value_2 = 5 SET value_2 = 5

View File

@ -0,0 +1,159 @@
CREATE SCHEMA recursive_dml_queries_mx;
SET search_path TO recursive_dml_queries_mx, public;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE recursive_dml_queries_mx.distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
CREATE TABLE recursive_dml_queries_mx.second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
CREATE TABLE recursive_dml_queries_mx.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- the subquery foo is recursively planned
UPDATE
reference_table
SET
name = 'new_' || name
FROM
(
SELECT
avg(second_distributed_table.tenant_id::int) as avg_tenant_id
FROM
second_distributed_table
) as foo
WHERE
foo.avg_tenant_id::int::text = reference_table.id;
-- the subquery foo is recursively planned
-- but note that the subquery foo itself is pushdownable
UPDATE
second_distributed_table
SET
dept = foo.max_dept * 2
FROM
(
SELECT DISTINCT ON (tenant_id) tenant_id, max(dept) as max_dept FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
) foo_inner
GROUP BY
tenant_id
ORDER BY 1 DESC
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (2);
-- run some queries from worker nodes
\c - - - :worker_1_port
SET search_path TO recursive_dml_queries_mx, public;
-- the subquery foo is recursively planned
-- and foo itself is a non colocated subquery and recursively planned
UPDATE
second_distributed_table
SET
dept = foo.tenant_id::int / 4
FROM
(
SELECT DISTINCT foo_inner_1.tenant_id FROM
(
SELECT
second_distributed_table.dept, second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (3,4)
) foo_inner_1,
(
SELECT
second_distributed_table.tenant_id
FROM
second_distributed_table, distributed_table
WHERE
distributed_table.tenant_id = second_distributed_table.tenant_id
AND
second_distributed_table.dept IN (4,5)
)foo_inner_2
WHERE foo_inner_1.tenant_id != foo_inner_2.tenant_id
) as foo
WHERE
foo.tenant_id != second_distributed_table.tenant_id
AND second_distributed_table.dept IN (3);
-- use the second worker
\c - - - :worker_2_port
SET search_path TO recursive_dml_queries_mx, public;
CREATE TABLE recursive_dml_queries_mx.local_table (id text, name text);
INSERT INTO local_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
CREATE VIEW tenant_ids AS
SELECT
tenant_id, name
FROM
distributed_table, reference_table
WHERE
distributed_table.dept::text = reference_table.id
ORDER BY 2 DESC, 1 DESC;
-- we currently do not allow local tables in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(local_table.id::int) as avg_tenant_id
FROM
local_table
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
-- we currently do not allow views in modification queries
UPDATE
distributed_table
SET
dept = avg_tenant_id::int
FROM
(
SELECT
avg(tenant_id::int) as avg_tenant_id
FROM
tenant_ids
) as foo
WHERE
foo.avg_tenant_id::int::text = distributed_table.tenant_id
RETURNING
distributed_table.*;
DROP TABLE local_table;
\c - - - :master_port
SET search_path TO recursive_dml_queries_mx, public;
RESET client_min_messages;
DROP SCHEMA recursive_dml_queries_mx CASCADE;
RESET citus.shard_replication_factor;
RESET citus.replication_model;

View File

@ -0,0 +1,66 @@
CREATE SCHEMA recursive_dml_with_different_planner_executors;
SET search_path TO recursive_dml_with_different_planner_executors, public;
CREATE TABLE distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('distributed_table', 'tenant_id');
CREATE TABLE second_distributed_table (tenant_id text, dept int, info jsonb);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
CREATE TABLE reference_table (id text, name text);
SELECT create_reference_table('reference_table');
INSERT INTO distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10, row_to_json(row(i, i*i)) FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- subquery with router planner
-- joined with a real-time query
UPDATE
distributed_table
SET dept = foo.dept FROM
(SELECT tenant_id, dept FROM second_distributed_table WHERE dept = 1 ) as foo,
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4) OFFSET 0) as bar
WHERE foo.tenant_id = bar.tenant_id
AND distributed_table.tenant_id = bar.tenant_id;
-- a non colocated subquery inside the UPDATE
UPDATE distributed_table SET dept = foo.max_dept FROM
(
SELECT
max(dept) as max_dept
FROM
(SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table
WHERE tenant_id NOT IN
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4))
) as foo WHERE foo.max_dept > dept * 3;
-- subquery with repartition query
SET citus.enable_repartition_joins to ON;
UPDATE distributed_table SET dept = foo.some_tenants::int FROM
(
SELECT
DISTINCT second_distributed_table.tenant_id as some_tenants
FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept
) as foo;
SET citus.enable_repartition_joins to OFF;
-- final query is router
UPDATE distributed_table SET dept = foo.max_dept FROM
(
SELECT
max(dept) as max_dept
FROM
(SELECT DISTINCT tenant_id, dept FROM distributed_table) as distributed_table
WHERE tenant_id IN
(SELECT tenant_id FROM second_distributed_table WHERE dept IN (1, 2, 3, 4))
) as foo WHERE foo.max_dept >= dept and tenant_id = '8';
RESET client_min_messages;
DROP SCHEMA recursive_dml_with_different_planner_executors CASCADE;
SET search_path TO public;

View File

@ -0,0 +1,138 @@
CREATE SCHEMA with_dml;
SET search_path TO with_dml, public;
CREATE TABLE with_dml.distributed_table (tenant_id text PRIMARY KEY, dept int);
SELECT create_distributed_table('distributed_table', 'tenant_id');
CREATE TABLE with_dml.second_distributed_table (tenant_id text, dept int);
SELECT create_distributed_table('second_distributed_table', 'tenant_id');
CREATE TABLE with_dml.reference_table (id text, name text);
SELECT create_reference_table('reference_table');
INSERT INTO distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i;
INSERT INTO second_distributed_table SELECT i::text, i % 10 FROM generate_series (0, 100) i;
INSERT INTO reference_table SELECT i::text, 'user_' || i FROM generate_series (0, 100) i;
SET client_min_messages TO DEBUG1;
-- delete all tenants from the reference table whose dept is 1
WITH ids_to_delete AS (
SELECT tenant_id FROM distributed_table WHERE dept = 1
)
DELETE FROM reference_table WHERE id IN (SELECT tenant_id FROM ids_to_delete);
-- update the name of the users whose dept is 2
WITH ids_to_update AS (
SELECT tenant_id FROM distributed_table WHERE dept = 2
)
UPDATE reference_table SET name = 'new_' || name WHERE id IN (SELECT tenant_id FROM ids_to_update);
-- now the CTE is also modifying
WITH ids_deleted_3 AS
(
DELETE FROM distributed_table WHERE dept = 3 RETURNING tenant_id
),
ids_deleted_4 AS
(
DELETE FROM distributed_table WHERE dept = 4 RETURNING tenant_id
)
DELETE FROM reference_table WHERE id IN (SELECT * FROM ids_deleted_3 UNION SELECT * FROM ids_deleted_4);
-- now the final UPDATE command is pushdownable
WITH ids_to_delete AS
(
SELECT tenant_id FROM distributed_table WHERE dept = 5
)
UPDATE
distributed_table
SET
dept = dept + 1
FROM
ids_to_delete, (SELECT tenant_id FROM distributed_table WHERE tenant_id::int < 60) as some_tenants
WHERE
some_tenants.tenant_id = ids_to_delete.tenant_id
AND distributed_table.tenant_id = some_tenants.tenant_id;
-- this query errors out since we've some hard
-- errors in the INSERT ... SELECT pushdown
-- which prevents to fallback to recursive planning
WITH ids_to_upsert AS
(
SELECT tenant_id FROM distributed_table WHERE dept > 7
)
INSERT INTO distributed_table
SELECT distributed_table.tenant_id FROM ids_to_upsert, distributed_table
WHERE distributed_table.tenant_id = ids_to_upsert.tenant_id
ON CONFLICT (tenant_id) DO UPDATE SET dept = 8;
-- the following query is very similar to the above one
-- but this time the query is pulled to coordinator since
-- we return before hitting any hard errors
WITH ids_to_insert AS
(
SELECT (tenant_id::int * 100)::text as tenant_id FROM distributed_table WHERE dept > 7
)
INSERT INTO distributed_table
SELECT DISTINCT ids_to_insert.tenant_id FROM ids_to_insert, distributed_table
WHERE distributed_table.tenant_id < ids_to_insert.tenant_id;
-- not a very meaningful query
-- but has two modifying CTEs along with another
-- modify statement
WITH copy_to_other_table AS (
INSERT INTO distributed_table
SELECT *
FROM second_distributed_table
WHERE dept = 3
ON CONFLICT (tenant_id) DO UPDATE SET dept = 4
RETURNING *
),
main_table_deleted AS (
DELETE
FROM distributed_table
WHERE dept < 10
AND NOT EXISTS (SELECT 1 FROM second_distributed_table
WHERE second_distributed_table.dept = 1
AND second_distributed_table.tenant_id = distributed_table.tenant_id)
RETURNING *
)
INSERT INTO second_distributed_table
SELECT *
FROM main_table_deleted
EXCEPT
SELECT *
FROM copy_to_other_table;
-- CTE inside the UPDATE statement
UPDATE
second_distributed_table
SET dept =
(WITH vals AS (
SELECT DISTINCT tenant_id::int FROM distributed_table
) select * from vals where tenant_id = 8 )
WHERE dept = 8;
-- Subquery inside the UPDATE statement
UPDATE
second_distributed_table
SET dept =
(SELECT DISTINCT tenant_id::int FROM distributed_table WHERE tenant_id = '9')
WHERE dept = 8;
-- delete all remaining tenants
WITH ids_to_delete AS (
SELECT tenant_id FROM distributed_table
)
DELETE FROM distributed_table WHERE tenant_id = ANY(SELECT tenant_id FROM ids_to_delete);
WITH ids_to_delete AS (
SELECT id FROM reference_table
)
DELETE FROM reference_table WHERE id = ANY(SELECT id FROM ids_to_delete);
RESET client_min_messages;
DROP SCHEMA with_dml CASCADE;

View File

@ -0,0 +1,75 @@
CREATE SCHEMA with_transactions;
SET search_path TO with_transactions, public;
SET citus.shard_count TO 4;
SET citus.next_placement_id TO 800000;
CREATE TABLE with_transactions.raw_table (tenant_id int, income float, created_at timestamptz);
SELECT create_distributed_table('raw_table', 'tenant_id');
CREATE TABLE with_transactions.second_raw_table (tenant_id int, income float, created_at timestamptz);
SELECT create_distributed_table('second_raw_table', 'tenant_id');
INSERT INTO
raw_table (tenant_id, income, created_at)
SELECT
i % 10, i * 10.0, timestamp '2014-01-10 20:00:00' + i * interval '1 day'
FROM
generate_series (0, 100) i;
INSERT INTO second_raw_table SELECT * FROM raw_table;
SET client_min_messages TO DEBUG1;
-- run a transaction which DELETE
BEGIN;
WITH ids_to_delete AS
(
SELECT tenant_id FROM raw_table WHERE income < 250
),
deleted_ids AS
(
DELETE FROM raw_table WHERE created_at < '2014-02-10 20:00:00' AND tenant_id IN (SELECT * from ids_to_delete) RETURNING tenant_id
)
UPDATE raw_table SET income = income * 2 WHERE tenant_id IN (SELECT tenant_id FROM deleted_ids);
ROLLBACK;
-- see that both UPDATE and DELETE commands are rollbacked
SELECT count(*) FROM raw_table;
SELECT max(income) FROM raw_table;
-- multi-statement multi shard modifying statements should work
BEGIN;
SELECT count (*) FROM second_raw_table;
WITH distinct_count AS (
SELECT count(DISTINCT created_at) FROM raw_table
),
ids_inserted AS
(
INSERT INTO raw_table VALUES (11, 1000, now()) RETURNING tenant_id
)
UPDATE raw_table SET created_at = '2001-02-10 20:00:00'
WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted) AND tenant_id < (SELECT count FROM distinct_count);
TRUNCATE second_raw_table;
COMMIT;
-- sequential insert followed by parallel update causes execution issues
WITH ids_inserted AS
(
INSERT INTO raw_table VALUES (11, 1000, now()), (12, 1000, now()), (13, 1000, now()) RETURNING tenant_id
)
UPDATE raw_table SET created_at = '2001-02-10 20:00:00' WHERE tenant_id IN (SELECT tenant_id FROM ids_inserted);
-- make sure that everything committed
SELECT count(*) FROM raw_table;
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
SELECT count(*) FROM second_raw_table;
RESET client_min_messages;
RESET citus.shard_count;
DROP SCHEMA with_transactions CASCADE;