mirror of https://github.com/citusdata/citus.git
Fixes the bug#6785
parent
99edb2675f
commit
387b5f80f9
|
@ -33,6 +33,7 @@
|
||||||
#include "distributed/query_colocation_checker.h"
|
#include "distributed/query_colocation_checker.h"
|
||||||
#include "distributed/repartition_executor.h"
|
#include "distributed/repartition_executor.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
|
#include "distributed/shard_pruning.h"
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||||
|
|
||||||
|
@ -40,6 +41,9 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
||||||
List *sourceTargetList,
|
List *sourceTargetList,
|
||||||
CitusTableCacheEntry *targetRelation);
|
CitusTableCacheEntry *targetRelation);
|
||||||
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
||||||
|
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||||
|
PlannerRestrictionContext *
|
||||||
|
plannerRestrictionContext);
|
||||||
static void ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId,
|
static void ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId,
|
||||||
Query *originalQuery);
|
Query *originalQuery);
|
||||||
static void ErrorIfMergeNotSupported(Query *query, Oid targetRelationId,
|
static void ErrorIfMergeNotSupported(Query *query, Oid targetRelationId,
|
||||||
|
@ -51,7 +55,8 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query
|
||||||
List *rangeTableList,
|
List *rangeTableList,
|
||||||
PlannerRestrictionContext
|
PlannerRestrictionContext
|
||||||
*
|
*
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext,
|
||||||
|
Oid targetRelationId);
|
||||||
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
|
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
|
||||||
resultRelationId,
|
resultRelationId,
|
||||||
Query *query,
|
Query *query,
|
||||||
|
@ -164,7 +169,8 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
|
||||||
|
|
||||||
distributedPlan->planningError = DeferErrorIfRoutableMergeNotSupported(originalQuery,
|
distributedPlan->planningError = DeferErrorIfRoutableMergeNotSupported(originalQuery,
|
||||||
rangeTableList,
|
rangeTableList,
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext,
|
||||||
|
targetRelationId);
|
||||||
if (distributedPlan->planningError != NULL)
|
if (distributedPlan->planningError != NULL)
|
||||||
{
|
{
|
||||||
return distributedPlan;
|
return distributedPlan;
|
||||||
|
@ -926,13 +932,52 @@ ErrorIfMergeNotSupported(Query *query, Oid targetRelationId, List *rangeTableLis
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DeferErrorIfTargetHasFalseClause checks for the presence of a false clause in the
|
||||||
|
* target relation and throws an exception if found. Router planner prunes all the shards
|
||||||
|
* for relations with such clauses, resulting in no task generation for the job. However,
|
||||||
|
* in the case of a MERGE query, tasks still need to be generated for the shards of the
|
||||||
|
* source relation.
|
||||||
|
*/
|
||||||
|
static DeferredErrorMessage *
|
||||||
|
DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||||
|
PlannerRestrictionContext *plannerRestrictionContext)
|
||||||
|
{
|
||||||
|
ListCell *restrictionCell = NULL;
|
||||||
|
foreach(restrictionCell,
|
||||||
|
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList)
|
||||||
|
{
|
||||||
|
RelationRestriction *relationRestriction =
|
||||||
|
(RelationRestriction *) lfirst(restrictionCell);
|
||||||
|
Oid relationId = relationRestriction->relationId;
|
||||||
|
|
||||||
|
/* Check only for target relation */
|
||||||
|
if (relationId != targetRelationId)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||||
|
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
||||||
|
if (ContainsFalseClause(restrictClauseList))
|
||||||
|
{
|
||||||
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
"Routing query is not possible with "
|
||||||
|
"no shards for target", NULL, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeferErrorIfRoutableMergeNotSupported Checks for conditions that prevent pushable planning, if
|
* DeferErrorIfRoutableMergeNotSupported Checks for conditions that prevent pushable planning, if
|
||||||
* found, raises a deferred error, which then continues to try repartitioning strategy.
|
* found, raises a deferred error, which then continues to try repartitioning strategy.
|
||||||
*/
|
*/
|
||||||
static DeferredErrorMessage *
|
static DeferredErrorMessage *
|
||||||
DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
||||||
PlannerRestrictionContext *plannerRestrictionContext)
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
|
Oid targetRelationId)
|
||||||
{
|
{
|
||||||
List *distTablesList = NIL;
|
List *distTablesList = NIL;
|
||||||
List *refTablesList = NIL;
|
List *refTablesList = NIL;
|
||||||
|
@ -1020,6 +1065,17 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
||||||
"conflict, use ON instead", NULL, NULL);
|
"conflict, use ON instead", NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId,
|
||||||
|
plannerRestrictionContext);
|
||||||
|
if (deferredError)
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("Target relation has a filter of the "
|
||||||
|
"form: false (AND ..), which results "
|
||||||
|
"in empty shards, but we still need "
|
||||||
|
"to evaluate NOT-MATCHED clause, try "
|
||||||
|
"repartitioning")));
|
||||||
|
return deferredError;
|
||||||
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2973,6 +2973,164 @@ SELECT COUNT(*) FROM demo_distributed where id1 = 2;
|
||||||
7
|
7
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test FALSE filters
|
||||||
|
--
|
||||||
|
CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp);
|
||||||
|
CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp);
|
||||||
|
SELECT create_distributed_table('source_filter', 'customer_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE FUNCTION load_filter() RETURNS VOID AS $$
|
||||||
|
|
||||||
|
TRUNCATE target_filter;
|
||||||
|
TRUNCATE source_filter;
|
||||||
|
|
||||||
|
INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE
|
||||||
|
INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE
|
||||||
|
|
||||||
|
INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00');
|
||||||
|
INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00');
|
||||||
|
INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00');
|
||||||
|
|
||||||
|
$$
|
||||||
|
LANGUAGE SQL;
|
||||||
|
--WHEN MATCH and FALSE
|
||||||
|
SELECT load_filter();
|
||||||
|
load_filter
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
customer_id | last_order_id | order_center | order_count | last_order
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||||
|
300 | 12 | src | 1 | Sat Jan 01 00:00:00 2022
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
--WHEN NOT MATCH and 1=0
|
||||||
|
SELECT load_filter();
|
||||||
|
load_filter
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED AND (1=0) THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
customer_id | last_order_id | order_center | order_count | last_order
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 11 | trg | 999 | Sat Jan 01 00:00:00 2022
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--ON t.key = s.key AND 1 < 0
|
||||||
|
SELECT load_filter();
|
||||||
|
load_filter
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id AND 1 < 0
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined.
|
||||||
|
DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
customer_id | last_order_id | order_center | order_count | last_order
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||||
|
200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
--(SELECT * FROM source_filter WHERE false) as source_filter
|
||||||
|
SELECT load_filter();
|
||||||
|
load_filter
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING (SELECT * FROM source_filter WHERE false) s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
customer_id | last_order_id | order_center | order_count | last_order
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||||
|
200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Bug 6785
|
||||||
|
CREATE TABLE source_6785( id integer, z int, d jsonb);
|
||||||
|
CREATE TABLE target_6785( id integer, z int, d jsonb);
|
||||||
|
SELECT create_distributed_table('target_6785','id'), create_distributed_table('source_6785', 'id');
|
||||||
|
create_distributed_table | create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source_6785 SELECT i,i FROM generate_series(0,5)i;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
MERGE INTO target_6785 sda
|
||||||
|
USING (SELECT * FROM source_6785 WHERE id = 1) sdn
|
||||||
|
ON sda.id = sdn.id AND sda.id = 2
|
||||||
|
WHEN NOT matched THEN
|
||||||
|
INSERT (id, z) VALUES (sdn.id, 5);
|
||||||
|
DEBUG: Target relation has a filter of the form: false (AND ..), which results in empty shards, but we still need to evaluate NOT-MATCHED clause, try repartitioning
|
||||||
|
DEBUG: Routing query is not possible with no shards for target
|
||||||
|
DEBUG: Creating MERGE repartition plan
|
||||||
|
DEBUG: Using column - index:0 from the source list to redistribute
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000147'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000148'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000149'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.target_6785_xxxxxxx sda USING (SELECT intermediate_result.id, intermediate_result.z, intermediate_result.d FROM read_intermediate_result('merge_into_XXX_4000150'::text, 'binary'::citus_copy_format) intermediate_result(id integer, z integer, d jsonb)) sdn ON ((sda.id OPERATOR(pg_catalog.=) sdn.id) AND (sda.id OPERATOR(pg_catalog.=) 2)) WHEN NOT MATCHED THEN INSERT (id, z) VALUES (sdn.id, 5)>
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_6785 ORDER BY 1;
|
||||||
|
id | z | d
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 5 |
|
||||||
|
(1 row)
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Error and Unsupported scenarios
|
-- Error and Unsupported scenarios
|
||||||
--
|
--
|
||||||
|
@ -3725,21 +3883,21 @@ INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
||||||
-- with a colocated table
|
-- with a colocated table
|
||||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||||
WHEN MATCHED THEN DELETE;
|
WHEN MATCHED THEN DELETE;
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||||
WHEN MATCHED THEN DELETE
|
WHEN MATCHED THEN DELETE
|
||||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000185 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
-- with non-colocated single-shard table
|
-- with non-colocated single-shard table
|
||||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||||
|
@ -3774,14 +3932,14 @@ DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Collect source query results on coordinator
|
DEBUG: Collect source query results on coordinator
|
||||||
DEBUG: Create a MERGE task list that needs to be routed
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000173 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000173'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000173 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000173'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000189 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000189'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000174 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000174'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000174 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000174'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000190 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000190'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000175 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000175'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000175 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000175'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000191 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000191'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000176 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000176'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)>
|
||||||
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000176 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000176'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
DEBUG: distributed statement: MERGE INTO query_single_shard_table.distributed_table_4000192 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4000192'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c1_t1 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) citus_table_alias.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b)
|
||||||
-- with a reference table
|
-- with a reference table
|
||||||
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
|
MERGE INTO nullkey_c1_t1 USING reference_table ON (nullkey_c1_t1.a = reference_table.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
|
WHEN MATCHED THEN UPDATE SET b = reference_table.b;
|
||||||
|
@ -3824,7 +3982,7 @@ WITH cte AS (
|
||||||
)
|
)
|
||||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||||
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000184 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||||
DEBUG: Creating MERGE router plan
|
DEBUG: Creating MERGE router plan
|
||||||
WITH cte AS (
|
WITH cte AS (
|
||||||
SELECT * FROM distributed_table
|
SELECT * FROM distributed_table
|
||||||
|
@ -3989,7 +4147,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
|
||||||
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
||||||
DROP FUNCTION merge_when_and_write();
|
DROP FUNCTION merge_when_and_write();
|
||||||
DROP SCHEMA merge_schema CASCADE;
|
DROP SCHEMA merge_schema CASCADE;
|
||||||
NOTICE: drop cascades to 98 other objects
|
NOTICE: drop cascades to 103 other objects
|
||||||
DETAIL: drop cascades to function insert_data()
|
DETAIL: drop cascades to function insert_data()
|
||||||
drop cascades to table local_local
|
drop cascades to table local_local
|
||||||
drop cascades to table target
|
drop cascades to table target
|
||||||
|
@ -4076,15 +4234,18 @@ drop cascades to table demo_distributed
|
||||||
drop cascades to table demo_source_table
|
drop cascades to table demo_source_table
|
||||||
drop cascades to table pg_demo_result
|
drop cascades to table pg_demo_result
|
||||||
drop cascades to table dist_demo_result
|
drop cascades to table dist_demo_result
|
||||||
|
drop cascades to table source_filter
|
||||||
|
drop cascades to table target_filter
|
||||||
|
drop cascades to function load_filter()
|
||||||
|
drop cascades to table source_6785
|
||||||
|
drop cascades to table target_6785
|
||||||
drop cascades to function add_s(integer,integer)
|
drop cascades to function add_s(integer,integer)
|
||||||
drop cascades to table pg
|
drop cascades to table pg
|
||||||
drop cascades to table t1_4000158
|
drop cascades to table t1_4000174
|
||||||
drop cascades to table s1_4000159
|
drop cascades to table s1_4000175
|
||||||
drop cascades to table t1
|
drop cascades to table t1
|
||||||
drop cascades to table s1
|
drop cascades to table s1
|
||||||
drop cascades to table dist_target
|
drop cascades to table dist_target
|
||||||
drop cascades to table dist_source
|
drop cascades to table dist_source
|
||||||
drop cascades to view show_tables
|
drop cascades to view show_tables
|
||||||
drop cascades to table target_columnar
|
and 3 other objects (see server log for list)
|
||||||
drop cascades to table target_1
|
|
||||||
drop cascades to table source_2
|
|
||||||
|
|
|
@ -1000,28 +1000,14 @@ SQL function "compare_data" statement 2
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
SELECT cleanup_data();
|
SELECT cleanup_data();
|
||||||
NOTICE: creating a new table for merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: moving the data of merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: dropping the old merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: creating a new table for merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: moving the data of merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: dropping the old merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
cleanup_data
|
cleanup_data
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
SELECT setup_data();
|
SELECT setup_data();
|
||||||
setup_data
|
setup_data
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -1159,28 +1145,14 @@ NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test CTE/Subquery in merge-actions (works only for router query)
|
-- Test CTE/Subquery in merge-actions (works only for router query)
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
SELECT cleanup_data();
|
SELECT cleanup_data();
|
||||||
NOTICE: creating a new table for merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: moving the data of merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: dropping the old merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_target
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 5
|
|
||||||
NOTICE: creating a new table for merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: moving the data of merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: dropping the old merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
|
||||||
CONTEXT: SQL function "cleanup_data" statement 6
|
|
||||||
cleanup_data
|
cleanup_data
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
SELECT setup_data();
|
SELECT setup_data();
|
||||||
setup_data
|
setup_data
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -1233,6 +1205,137 @@ SQL function "compare_data" statement 2
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test target with false clause
|
||||||
|
--
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT cleanup_data();
|
||||||
|
cleanup_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT setup_data();
|
||||||
|
setup_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO pg_target t
|
||||||
|
USING (SELECT * FROM pg_source WHERE id > 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id < 2500
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
MERGE INTO citus_target t
|
||||||
|
USING (SELECT * FROM citus_source WHERE id > 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id < 2500
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
SELECT compare_data();
|
||||||
|
NOTICE: The average of pg_target.id is equal to citus_target.id
|
||||||
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||||
|
SQL function "compare_data" statement 1
|
||||||
|
NOTICE: The average of pg_target.val is equal to citus_target.val
|
||||||
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||||
|
SQL function "compare_data" statement 2
|
||||||
|
compare_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT cleanup_data();
|
||||||
|
cleanup_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT setup_data();
|
||||||
|
setup_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO pg_target t
|
||||||
|
USING (SELECT * FROM pg_source WHERE id = 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id = 5000
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
MERGE INTO citus_target t
|
||||||
|
USING (SELECT * FROM citus_source WHERE id = 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id = 5000
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
SELECT compare_data();
|
||||||
|
NOTICE: The average of pg_target.id is equal to citus_target.id
|
||||||
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||||
|
SQL function "compare_data" statement 1
|
||||||
|
NOTICE: The average of pg_target.val is equal to citus_target.val
|
||||||
|
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||||
|
SQL function "compare_data" statement 2
|
||||||
|
compare_data
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP SCHEMA merge_repartition1_schema CASCADE;
|
DROP SCHEMA merge_repartition1_schema CASCADE;
|
||||||
NOTICE: drop cascades to 8 other objects
|
NOTICE: drop cascades to 8 other objects
|
||||||
DETAIL: drop cascades to table pg_target
|
DETAIL: drop cascades to table pg_target
|
||||||
|
|
|
@ -1855,6 +1855,102 @@ WHEN MATCHED THEN UPDATE SET val1 = 150;
|
||||||
SELECT COUNT(*) FROM demo_distributed where val1 = 150;
|
SELECT COUNT(*) FROM demo_distributed where val1 = 150;
|
||||||
SELECT COUNT(*) FROM demo_distributed where id1 = 2;
|
SELECT COUNT(*) FROM demo_distributed where id1 = 2;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test FALSE filters
|
||||||
|
--
|
||||||
|
CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp);
|
||||||
|
CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source_filter', 'customer_id');
|
||||||
|
SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter');
|
||||||
|
|
||||||
|
CREATE FUNCTION load_filter() RETURNS VOID AS $$
|
||||||
|
|
||||||
|
TRUNCATE target_filter;
|
||||||
|
TRUNCATE source_filter;
|
||||||
|
|
||||||
|
INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE
|
||||||
|
INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE
|
||||||
|
|
||||||
|
INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00');
|
||||||
|
INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00');
|
||||||
|
INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00');
|
||||||
|
|
||||||
|
$$
|
||||||
|
LANGUAGE SQL;
|
||||||
|
|
||||||
|
--WHEN MATCH and FALSE
|
||||||
|
SELECT load_filter();
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
|
||||||
|
--WHEN NOT MATCH and 1=0
|
||||||
|
SELECT load_filter();
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED AND (1=0) THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
|
||||||
|
--ON t.key = s.key AND 1 < 0
|
||||||
|
SELECT load_filter();
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING source_filter s
|
||||||
|
ON s.customer_id = t.customer_id AND 1 < 0
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
|
||||||
|
--(SELECT * FROM source_filter WHERE false) as source_filter
|
||||||
|
SELECT load_filter();
|
||||||
|
MERGE INTO target_filter t
|
||||||
|
USING (SELECT * FROM source_filter WHERE false) s
|
||||||
|
ON s.customer_id = t.customer_id
|
||||||
|
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||||
|
UPDATE SET order_count = 999
|
||||||
|
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||||
|
|
||||||
|
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||||
|
|
||||||
|
-- Bug 6785
|
||||||
|
CREATE TABLE source_6785( id integer, z int, d jsonb);
|
||||||
|
CREATE TABLE target_6785( id integer, z int, d jsonb);
|
||||||
|
SELECT create_distributed_table('target_6785','id'), create_distributed_table('source_6785', 'id');
|
||||||
|
INSERT INTO source_6785 SELECT i,i FROM generate_series(0,5)i;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
MERGE INTO target_6785 sda
|
||||||
|
USING (SELECT * FROM source_6785 WHERE id = 1) sdn
|
||||||
|
ON sda.id = sdn.id AND sda.id = 2
|
||||||
|
WHEN NOT matched THEN
|
||||||
|
INSERT (id, z) VALUES (sdn.id, 5);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT * FROM target_6785 ORDER BY 1;
|
||||||
|
|
||||||
--
|
--
|
||||||
-- Error and Unsupported scenarios
|
-- Error and Unsupported scenarios
|
||||||
--
|
--
|
||||||
|
|
|
@ -434,7 +434,9 @@ WHEN NOT MATCHED THEN
|
||||||
SELECT compare_data();
|
SELECT compare_data();
|
||||||
|
|
||||||
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
SELECT cleanup_data();
|
SELECT cleanup_data();
|
||||||
|
RESET client_min_messages;
|
||||||
SELECT setup_data();
|
SELECT setup_data();
|
||||||
SELECT create_distributed_table('citus_target', 'id');
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
|
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
|
||||||
|
@ -489,7 +491,9 @@ SELECT compare_data();
|
||||||
SELECT alter_table_set_access_method('citus_source', 'heap');
|
SELECT alter_table_set_access_method('citus_source', 'heap');
|
||||||
|
|
||||||
-- Test CTE/Subquery in merge-actions (works only for router query)
|
-- Test CTE/Subquery in merge-actions (works only for router query)
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
SELECT cleanup_data();
|
SELECT cleanup_data();
|
||||||
|
RESET client_min_messages;
|
||||||
SELECT setup_data();
|
SELECT setup_data();
|
||||||
SELECT create_distributed_table('citus_target', 'id');
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
|
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
|
||||||
|
@ -512,4 +516,65 @@ WHEN NOT MATCHED AND (SELECT max_a < 5001 FROM (SELECT max(id) as max_a, max(val
|
||||||
|
|
||||||
SELECT compare_data();
|
SELECT compare_data();
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test target with false clause
|
||||||
|
--
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT cleanup_data();
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT setup_data();
|
||||||
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
|
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||||
|
|
||||||
|
MERGE INTO pg_target t
|
||||||
|
USING (SELECT * FROM pg_source WHERE id > 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id < 2500
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
|
||||||
|
MERGE INTO citus_target t
|
||||||
|
USING (SELECT * FROM citus_source WHERE id > 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id < 2500
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
|
||||||
|
SELECT compare_data();
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT cleanup_data();
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT setup_data();
|
||||||
|
SELECT create_distributed_table('citus_target', 'id');
|
||||||
|
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||||
|
|
||||||
|
MERGE INTO pg_target t
|
||||||
|
USING (SELECT * FROM pg_source WHERE id = 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id = 5000
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
|
||||||
|
MERGE INTO citus_target t
|
||||||
|
USING (SELECT * FROM citus_source WHERE id = 2500) AS s
|
||||||
|
ON t.id = s.id AND t.id = 5000
|
||||||
|
WHEN MATCHED AND t.id <= 5500 THEN
|
||||||
|
UPDATE SET val = s.val + 1
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
DELETE
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES(s.id, s.val);
|
||||||
|
|
||||||
|
SELECT compare_data();
|
||||||
|
|
||||||
DROP SCHEMA merge_repartition1_schema CASCADE;
|
DROP SCHEMA merge_repartition1_schema CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue