mirror of https://github.com/citusdata/citus.git
Fixes the bug#6785
parent
58da8771aa
commit
f9b893aa2f
|
@ -33,6 +33,7 @@
|
|||
#include "distributed/query_colocation_checker.h"
|
||||
#include "distributed/repartition_executor.h"
|
||||
#include "distributed/shared_library_init.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_15
|
||||
|
||||
|
@ -40,6 +41,9 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
|||
List *sourceTargetList,
|
||||
CitusTableCacheEntry *targetRelation);
|
||||
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
||||
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||
PlannerRestrictionContext *
|
||||
plannerRestrictionContext);
|
||||
static void ErrorIfMergeQueryQualAndTargetListNotSupported(Oid targetRelationId,
|
||||
Query *originalQuery);
|
||||
static void ErrorIfMergeNotSupported(Query *query, Oid targetRelationId,
|
||||
|
@ -51,7 +55,8 @@ static DeferredErrorMessage * DeferErrorIfRoutableMergeNotSupported(Query *query
|
|||
List *rangeTableList,
|
||||
PlannerRestrictionContext
|
||||
*
|
||||
plannerRestrictionContext);
|
||||
plannerRestrictionContext,
|
||||
Oid targetRelationId);
|
||||
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
|
||||
resultRelationId,
|
||||
Query *query,
|
||||
|
@ -164,7 +169,8 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
|
|||
|
||||
distributedPlan->planningError = DeferErrorIfRoutableMergeNotSupported(originalQuery,
|
||||
rangeTableList,
|
||||
plannerRestrictionContext);
|
||||
plannerRestrictionContext,
|
||||
targetRelationId);
|
||||
if (distributedPlan->planningError != NULL)
|
||||
{
|
||||
return distributedPlan;
|
||||
|
@ -926,13 +932,52 @@ ErrorIfMergeNotSupported(Query *query, Oid targetRelationId, List *rangeTableLis
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeferErrorIfTargetHasFalseClause checks for the presence of a false clause in the
|
||||
* target relation and throws an exception if found. Router planner prunes all the shards
|
||||
* for relations with such clauses, resulting in no task generation for the job. However,
|
||||
* in the case of a MERGE query, tasks still need to be generated for the shards of the
|
||||
* source relation.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
{
|
||||
ListCell *restrictionCell = NULL;
|
||||
foreach(restrictionCell,
|
||||
plannerRestrictionContext->relationRestrictionContext->relationRestrictionList)
|
||||
{
|
||||
RelationRestriction *relationRestriction =
|
||||
(RelationRestriction *) lfirst(restrictionCell);
|
||||
Oid relationId = relationRestriction->relationId;
|
||||
|
||||
/* Check only for target relation */
|
||||
if (relationId != targetRelationId)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
||||
if (ContainsFalseClause(restrictClauseList))
|
||||
{
|
||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||
"Routing query is not possible with "
|
||||
"no shards for target", NULL, NULL);
|
||||
}
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeferErrorIfRoutableMergeNotSupported Checks for conditions that prevent pushable planning, if
|
||||
* found, raises a deferred error, which then continues to try repartitioning strategy.
|
||||
*/
|
||||
static DeferredErrorMessage *
|
||||
DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
||||
PlannerRestrictionContext *plannerRestrictionContext)
|
||||
PlannerRestrictionContext *plannerRestrictionContext,
|
||||
Oid targetRelationId)
|
||||
{
|
||||
List *distTablesList = NIL;
|
||||
List *refTablesList = NIL;
|
||||
|
@ -1020,6 +1065,17 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
|
|||
"conflict, use ON instead", NULL, NULL);
|
||||
}
|
||||
|
||||
deferredError = DeferErrorIfTargetHasFalseClause(targetRelationId,
|
||||
plannerRestrictionContext);
|
||||
if (deferredError)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("Target relation has a filter of the "
|
||||
"form: false (AND ..), which results "
|
||||
"in empty shards, but we still need "
|
||||
"to evaluate NOT-MATCHED clause, try "
|
||||
"repartitioning")));
|
||||
return deferredError;
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -2973,6 +2973,131 @@ SELECT COUNT(*) FROM demo_distributed where id1 = 2;
|
|||
7
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- Test FALSE filters
|
||||
--
|
||||
CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp);
|
||||
CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp);
|
||||
SELECT create_distributed_table('source_filter', 'customer_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE FUNCTION load_filter() RETURNS VOID AS $$
|
||||
|
||||
TRUNCATE target_filter;
|
||||
TRUNCATE source_filter;
|
||||
|
||||
INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE
|
||||
INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE
|
||||
|
||||
INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00');
|
||||
INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00');
|
||||
INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00');
|
||||
|
||||
$$
|
||||
LANGUAGE SQL;
|
||||
--WHEN MATCH and FALSE
|
||||
SELECT load_filter();
|
||||
load_filter
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
customer_id | last_order_id | order_center | order_count | last_order
|
||||
---------------------------------------------------------------------
|
||||
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||
300 | 12 | src | 1 | Sat Jan 01 00:00:00 2022
|
||||
(2 rows)
|
||||
|
||||
--WHEN NOT MATCH and 1=0
|
||||
SELECT load_filter();
|
||||
load_filter
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED AND (1=0) THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
customer_id | last_order_id | order_center | order_count | last_order
|
||||
---------------------------------------------------------------------
|
||||
100 | 11 | trg | 999 | Sat Jan 01 00:00:00 2022
|
||||
(1 row)
|
||||
|
||||
--ON t.key = s.key AND 1 < 0
|
||||
SELECT load_filter();
|
||||
load_filter
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id AND 1 < 0
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
customer_id | last_order_id | order_center | order_count | last_order
|
||||
---------------------------------------------------------------------
|
||||
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||
100 | 12 | src | 1 | Sat Jan 01 00:00:00 2022
|
||||
200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||
200 | 12 | src | 1 | Sat Jan 01 00:00:00 2022
|
||||
300 | 12 | src | 1 | Sat Jan 01 00:00:00 2022
|
||||
(5 rows)
|
||||
|
||||
--(SELECT * FROM source_filter WHERE false) as source_filter
|
||||
SELECT load_filter();
|
||||
load_filter
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO target_filter t
|
||||
USING (SELECT * FROM source_filter WHERE false) s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
customer_id | last_order_id | order_center | order_count | last_order
|
||||
---------------------------------------------------------------------
|
||||
100 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||
200 | 11 | trg | -1 | Sat Jan 01 00:00:00 2022
|
||||
(2 rows)
|
||||
|
||||
--
|
||||
-- Error and Unsupported scenarios
|
||||
--
|
||||
|
@ -3725,21 +3850,21 @@ INSERT INTO postgres_local_table SELECT i, i FROM generate_series(5, 10) i;
|
|||
-- with a colocated table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000177 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE;
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000177 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000177 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t2.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c1_t2 ON (nullkey_c1_t1.a = nullkey_c1_t2.a)
|
||||
WHEN MATCHED THEN DELETE
|
||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b);
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000169 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1 USING query_single_shard_table.nullkey_c1_t2_4000177 nullkey_c1_t2 ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) nullkey_c1_t2.a) WHEN MATCHED THEN DELETE WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c1_t2.a, nullkey_c1_t2.b)>
|
||||
DEBUG: Creating MERGE router plan
|
||||
-- with non-colocated single-shard table
|
||||
MERGE INTO nullkey_c1_t1 USING nullkey_c2_t1 ON (nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||
|
@ -3824,7 +3949,7 @@ WITH cte AS (
|
|||
)
|
||||
MERGE INTO nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a = cte.a)
|
||||
WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000168 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||
DEBUG: <Deparsed MERGE query: WITH cte AS (SELECT nullkey_c1_t1_1.a, nullkey_c1_t1_1.b FROM query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1_1) MERGE INTO query_single_shard_table.nullkey_c1_t1_4000176 nullkey_c1_t1 USING cte ON (nullkey_c1_t1.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||
DEBUG: Creating MERGE router plan
|
||||
WITH cte AS (
|
||||
SELECT * FROM distributed_table
|
||||
|
@ -3989,7 +4114,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
|
|||
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
|
||||
DROP FUNCTION merge_when_and_write();
|
||||
DROP SCHEMA merge_schema CASCADE;
|
||||
NOTICE: drop cascades to 98 other objects
|
||||
NOTICE: drop cascades to 101 other objects
|
||||
DETAIL: drop cascades to function insert_data()
|
||||
drop cascades to table local_local
|
||||
drop cascades to table target
|
||||
|
@ -4076,10 +4201,13 @@ drop cascades to table demo_distributed
|
|||
drop cascades to table demo_source_table
|
||||
drop cascades to table pg_demo_result
|
||||
drop cascades to table dist_demo_result
|
||||
drop cascades to table source_filter
|
||||
drop cascades to table target_filter
|
||||
drop cascades to function load_filter()
|
||||
drop cascades to function add_s(integer,integer)
|
||||
drop cascades to table pg
|
||||
drop cascades to table t1_4000158
|
||||
drop cascades to table s1_4000159
|
||||
drop cascades to table t1_4000166
|
||||
drop cascades to table s1_4000167
|
||||
drop cascades to table t1
|
||||
drop cascades to table s1
|
||||
drop cascades to table dist_target
|
||||
|
|
|
@ -1000,28 +1000,14 @@ SQL function "compare_data" statement 2
|
|||
(1 row)
|
||||
|
||||
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
NOTICE: creating a new table for merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: moving the data of merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: dropping the old merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: creating a new table for merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: moving the data of merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: dropping the old merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
cleanup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
setup_data
|
||||
---------------------------------------------------------------------
|
||||
|
@ -1159,28 +1145,14 @@ NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
|||
(1 row)
|
||||
|
||||
-- Test CTE/Subquery in merge-actions (works only for router query)
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
NOTICE: creating a new table for merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: moving the data of merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: dropping the old merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_target
|
||||
CONTEXT: SQL function "cleanup_data" statement 5
|
||||
NOTICE: creating a new table for merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: moving the data of merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: dropping the old merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
NOTICE: renaming the new table to merge_repartition1_schema.citus_source
|
||||
CONTEXT: SQL function "cleanup_data" statement 6
|
||||
cleanup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
setup_data
|
||||
---------------------------------------------------------------------
|
||||
|
@ -1233,6 +1205,137 @@ SQL function "compare_data" statement 2
|
|||
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- Test target with false clause
|
||||
--
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
cleanup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
setup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING (SELECT * FROM pg_source WHERE id > 25000) AS s
|
||||
ON t.id = s.id AND t.id < 25000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
MERGE INTO citus_target t
|
||||
USING (SELECT * FROM citus_source WHERE id > 25000) AS s
|
||||
ON t.id = s.id AND t.id < 25000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
SELECT compare_data();
|
||||
NOTICE: The average of pg_target.id is equal to citus_target.id
|
||||
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||
SQL function "compare_data" statement 1
|
||||
NOTICE: The average of pg_target.val is equal to citus_target.val
|
||||
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||
SQL function "compare_data" statement 2
|
||||
compare_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
cleanup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
setup_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_target$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition1_schema.citus_source$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING (SELECT * FROM pg_source WHERE id = 25000) AS s
|
||||
ON t.id = s.id AND t.id = 50000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
MERGE INTO citus_target t
|
||||
USING (SELECT * FROM citus_source WHERE id = 25000) AS s
|
||||
ON t.id = s.id AND t.id = 50000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
SELECT compare_data();
|
||||
NOTICE: The average of pg_target.id is equal to citus_target.id
|
||||
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||
SQL function "compare_data" statement 1
|
||||
NOTICE: The average of pg_target.val is equal to citus_target.val
|
||||
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
|
||||
SQL function "compare_data" statement 2
|
||||
compare_data
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
DROP SCHEMA merge_repartition1_schema CASCADE;
|
||||
NOTICE: drop cascades to 8 other objects
|
||||
DETAIL: drop cascades to table pg_target
|
||||
|
|
|
@ -1855,6 +1855,86 @@ WHEN MATCHED THEN UPDATE SET val1 = 150;
|
|||
SELECT COUNT(*) FROM demo_distributed where val1 = 150;
|
||||
SELECT COUNT(*) FROM demo_distributed where id1 = 2;
|
||||
|
||||
--
|
||||
-- Test FALSE filters
|
||||
--
|
||||
CREATE TABLE source_filter(order_id INT, customer_id INT, order_center VARCHAR, order_time timestamp);
|
||||
CREATE TABLE target_filter(customer_id INT, last_order_id INT, order_center VARCHAR, order_count INT, last_order timestamp);
|
||||
|
||||
SELECT create_distributed_table('source_filter', 'customer_id');
|
||||
SELECT create_distributed_table('target_filter', 'customer_id', colocate_with => 'source_filter');
|
||||
|
||||
CREATE FUNCTION load_filter() RETURNS VOID AS $$
|
||||
|
||||
TRUNCATE target_filter;
|
||||
TRUNCATE source_filter;
|
||||
|
||||
INSERT INTO target_filter VALUES(100, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match UPDATE
|
||||
INSERT INTO target_filter VALUES(200, 11, 'trg', -1, '2022-01-01 00:00:00'); -- Match DELETE
|
||||
|
||||
INSERT INTO source_filter VALUES(12, 100, 'src', '2022-01-01 00:00:00');
|
||||
INSERT INTO source_filter VALUES(12, 200, 'src', '2022-01-01 00:00:00');
|
||||
INSERT INTO source_filter VALUES(12, 300, 'src', '2022-01-01 00:00:00');
|
||||
|
||||
$$
|
||||
LANGUAGE SQL;
|
||||
|
||||
--WHEN MATCH and FALSE
|
||||
SELECT load_filter();
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 AND (FALSE) THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
|
||||
--WHEN NOT MATCH and 1=0
|
||||
SELECT load_filter();
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED AND (1=0) THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
|
||||
--ON t.key = s.key AND 1 < 0
|
||||
SELECT load_filter();
|
||||
MERGE INTO target_filter t
|
||||
USING source_filter s
|
||||
ON s.customer_id = t.customer_id AND 1 < 0
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
|
||||
--(SELECT * FROM source_filter WHERE false) as source_filter
|
||||
SELECT load_filter();
|
||||
MERGE INTO target_filter t
|
||||
USING (SELECT * FROM source_filter WHERE false) s
|
||||
ON s.customer_id = t.customer_id
|
||||
WHEN MATCHED AND t.customer_id = 100 THEN
|
||||
UPDATE SET order_count = 999
|
||||
WHEN MATCHED AND t.customer_id = 200 THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.customer_id, s.order_id, s.order_center, 1, s.order_time);
|
||||
|
||||
SELECT * FROM target_filter ORDER BY 1, 2;
|
||||
|
||||
--
|
||||
-- Error and Unsupported scenarios
|
||||
--
|
||||
|
|
|
@ -434,7 +434,9 @@ WHEN NOT MATCHED THEN
|
|||
SELECT compare_data();
|
||||
|
||||
-- Test source-query that requires repartitioning on top of MERGE repartitioning
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
|
||||
|
@ -489,7 +491,9 @@ SELECT compare_data();
|
|||
SELECT alter_table_set_access_method('citus_source', 'heap');
|
||||
|
||||
-- Test CTE/Subquery in merge-actions (works only for router query)
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
|
||||
|
@ -512,4 +516,63 @@ WHEN NOT MATCHED AND (SELECT max_a < 5001 FROM (SELECT max(id) as max_a, max(val
|
|||
|
||||
SELECT compare_data();
|
||||
|
||||
--
|
||||
-- Test target with false clause
|
||||
--
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING (SELECT * FROM pg_source WHERE id > 25000) AS s
|
||||
ON t.id = s.id AND t.id < 25000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING (SELECT * FROM citus_source WHERE id > 25000) AS s
|
||||
ON t.id = s.id AND t.id < 25000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
SELECT compare_data();
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT cleanup_data();
|
||||
RESET client_min_messages;
|
||||
SELECT setup_data();
|
||||
SELECT create_distributed_table('citus_target', 'id');
|
||||
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
|
||||
|
||||
MERGE INTO pg_target t
|
||||
USING (SELECT * FROM pg_source WHERE id = 25000) AS s
|
||||
ON t.id = s.id AND t.id = 50000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
|
||||
MERGE INTO citus_target t
|
||||
USING (SELECT * FROM citus_source WHERE id = 25000) AS s
|
||||
ON t.id = s.id AND t.id = 50000
|
||||
WHEN MATCHED AND t.id <= 55000 THEN
|
||||
UPDATE SET val = s.val + 1
|
||||
WHEN MATCHED THEN
|
||||
DELETE
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES(s.id, s.val);
|
||||
SELECT compare_data();
|
||||
|
||||
DROP SCHEMA merge_repartition1_schema CASCADE;
|
||||
|
|
Loading…
Reference in New Issue