Add big serial test

merge_locks
Teja Mupparti 2023-03-02 09:15:24 -08:00
parent b19566b7bb
commit 5ff880cc6c
10 changed files with 220 additions and 49 deletions

View File

@ -138,7 +138,8 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
if (multiShardQuery) if (multiShardQuery)
{ {
deferredError = DeferErrorIfUnsupportedSubqueryPushdown(originalQuery, deferredError =
DeferErrorIfUnsupportedSubqueryPushdown(originalQuery,
plannerRestrictionContext); plannerRestrictionContext);
if (deferredError) if (deferredError)
{ {

View File

@ -1874,8 +1874,8 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (*planningError) if (*planningError)
{ {
/* /*
* For MERGE, we do _not_ plan anything other than Router job, let's * For MERGE, we do _not_ plan any other router job than the MERGE job itself,
* not continue further down the lane in distributed planning, simply * let's not continue further down the lane in distributed planning, simply
* bail out. * bail out.
*/ */
if (IsMergeQuery(originalQuery)) if (IsMergeQuery(originalQuery))

View File

@ -19,7 +19,8 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte);
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, bool multiShardQuery, extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
#endif /* MERGE_PLANNER_H */ #endif /* MERGE_PLANNER_H */

View File

@ -311,7 +311,6 @@ MERGE INTO t1
UPDATE SET val = t1.val + 1 UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (pg_res.id, pg_res.val); INSERT (id, val) VALUES (pg_res.id, pg_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id; SELECT * FROM t1 order by id;
id | val id | val
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1235,11 +1234,7 @@ WHEN MATCHED THEN
DO NOTHING DO NOTHING
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT VALUES(fn_source.id, fn_source.source); INSERT VALUES(fn_source.id, fn_source.source);
DEBUG: function does not have co-located tables DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT f.id, f.source FROM merge_schema.f_dist() f(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: generating subplan XXX_1 for subquery SELECT id, source FROM merge_schema.f_dist() f(id integer, source character varying)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
DEBUG: Plan XXX query after replacing subqueries and CTEs: MERGE INTO merge_schema.fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)
DEBUG: <Deparsed MERGE query: MERGE INTO merge_schema.fn_target_xxxxxxx fn_target USING (SELECT intermediate_result.id, intermediate_result.source FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, source character varying)) fn_source ON (fn_source.id OPERATOR(pg_catalog.=) fn_target.id) WHEN MATCHED THEN DO NOTHING WHEN NOT MATCHED THEN INSERT (id, data) VALUES (fn_source.id, fn_source.source)>
RESET client_min_messages; RESET client_min_messages;
SELECT * INTO fn_local FROM fn_target ORDER BY 1 ; SELECT * INTO fn_local FROM fn_target ORDER BY 1 ;
-- Should be equal -- Should be equal
@ -2426,6 +2421,37 @@ USING source_json sdn
ON sda.id = sdn.id ON sda.id = sdn.id
WHEN matched THEN WHEN matched THEN
UPDATE SET z = immutable_hash(sdn.z); UPDATE SET z = immutable_hash(sdn.z);
-- Test bigserial
CREATE TABLE source_serial (id integer, z int, d bigserial);
CREATE TABLE target_serial (id integer, z int, d bigserial);
INSERT INTO source_serial SELECT i,i FROM generate_series(0,100)i;
SELECT create_distributed_table('source_serial', 'id'), create_distributed_table('target_serial', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.source_serial$$)
create_distributed_table | create_distributed_table
---------------------------------------------------------------------
|
(1 row)
MERGE INTO target_serial sda
USING source_serial sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (id, z);
SELECT count(*) from source_serial;
count
---------------------------------------------------------------------
101
(1 row)
SELECT count(*) from target_serial;
count
---------------------------------------------------------------------
101
(1 row)
-- --
-- Error and Unsupported scenarios -- Error and Unsupported scenarios
-- --
@ -2609,9 +2635,8 @@ $$;
-- relation which will have unexpected/suprising results. -- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2 t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN WHEN matched THEN
INSERT (id, val) UPDATE SET id = s1.id, val = random();
VALUES (s1.id , random());
ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE ERROR: functions used in MERGE actions on distributed tables must not be VOLATILE
-- Test preventing "ON" join condition from writing to the database -- Test preventing "ON" join condition from writing to the database
BEGIN; BEGIN;
@ -3103,7 +3128,7 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write(); DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE; DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 78 other objects NOTICE: drop cascades to 80 other objects
DETAIL: drop cascades to function insert_data() DETAIL: drop cascades to function insert_data()
drop cascades to table pg_result drop cascades to table pg_result
drop cascades to table local_local drop cascades to table local_local
@ -3173,9 +3198,11 @@ drop cascades to function pa_compare_tables()
drop cascades to table source_json drop cascades to table source_json
drop cascades to table target_json drop cascades to table target_json
drop cascades to function immutable_hash(integer) drop cascades to function immutable_hash(integer)
drop cascades to table source_serial
drop cascades to table target_serial
drop cascades to table pg drop cascades to table pg
drop cascades to table t1_4000118 drop cascades to table t1_4000126
drop cascades to table s1_4000119 drop cascades to table s1_4000127
drop cascades to table t1 drop cascades to table t1
drop cascades to table s1 drop cascades to table s1
drop cascades to table dist_colocated drop cascades to table dist_colocated

View File

@ -51,11 +51,6 @@ SELECT * FROM target_cj ORDER BY 1;
ROLLBACK; ROLLBACK;
-- Test PREPARE -- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
PREPARE insert(int) AS PREPARE insert(int) AS
MERGE INTO prept MERGE INTO prept
USING preps USING preps
@ -72,6 +67,9 @@ WHEN MATCHED AND prept.t2 = $1 THEN
DELETE DELETE
WHEN MATCHED THEN WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1; UPDATE SET t2 = t2 + 1;
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
@ -86,3 +84,33 @@ SELECT * FROM prept;
100 | 12 100 | 12
(1 row) (1 row)
-- Test local tables
INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause
INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause
INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause
INSERT INTO t1 VALUES(1, 0); -- Will be deleted
INSERT INTO t1 VALUES(2, 0); -- Will be updated
INSERT INTO t1 VALUES(5, 0); -- Will be intact
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
SELECT * FROM t1 order by id;
id | val
---------------------------------------------------------------------
2 | 1
3 | 1
4 | 1
5 | 0
6 | 1
(5 rows)

View File

@ -1,3 +1,10 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE;
CREATE SCHEMA merge_arbitrary_schema; CREATE SCHEMA merge_arbitrary_schema;
SET search_path TO merge_arbitrary_schema; SET search_path TO merge_arbitrary_schema;
@ -24,3 +31,42 @@ SELECT create_distributed_table('source_cj2', 'sid2');
(1 row) (1 row)
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
create_distributed_table | create_distributed_table
---------------------------------------------------------------------
|
(1 row)
PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
-- Citus local tables
CREATE TABLE t1(id int, val int);
CREATE TABLE s1(id int, val int);
SELECT citus_add_local_table_to_metadata('t1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('s1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -224,7 +224,6 @@ MERGE INTO t1
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (pg_res.id, pg_res.val); INSERT (id, val) VALUES (pg_res.id, pg_res.val);
-- Two rows with id 2 and val incremented, id 3, and id 1 is deleted
SELECT * FROM t1 order by id; SELECT * FROM t1 order by id;
SELECT * INTO merge_result FROM t1 order by id; SELECT * INTO merge_result FROM t1 order by id;
@ -1482,6 +1481,9 @@ ON sda.id = sdn.id
WHEN NOT matched THEN WHEN NOT matched THEN
INSERT (id, z) VALUES (id, z); INSERT (id, z) VALUES (id, z);
SELECT count(*) from source_serial;
SELECT count(*) from target_serial;
-- --
-- Error and Unsupported scenarios -- Error and Unsupported scenarios
-- --
@ -1615,9 +1617,8 @@ $$;
-- relation which will have unexpected/suprising results. -- relation which will have unexpected/suprising results.
MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON MERGE INTO t1 USING (SELECT * FROM s1 WHERE true) s1 ON
t1.id = s1.id AND s1.id = 2 t1.id = s1.id AND s1.id = 2
WHEN NOT matched THEN WHEN matched THEN
INSERT (id, val) UPDATE SET id = s1.id, val = random();
VALUES (s1.id , random());
-- Test preventing "ON" join condition from writing to the database -- Test preventing "ON" join condition from writing to the database
BEGIN; BEGIN;

View File

@ -40,14 +40,6 @@ SELECT * FROM target_cj ORDER BY 1;
ROLLBACK; ROLLBACK;
-- Test PREPARE -- Test PREPARE
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
PREPARE insert(int) AS PREPARE insert(int) AS
MERGE INTO prept MERGE INTO prept
USING preps USING preps
@ -66,6 +58,11 @@ WHEN MATCHED AND prept.t2 = $1 THEN
WHEN MATCHED THEN WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1; UPDATE SET t2 = t2 + 1;
INSERT INTO prept VALUES(100, 0);
INSERT INTO preps VALUES(100, 0);
INSERT INTO preps VALUES(200, 0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
EXECUTE insert(1); EXECUTE delete(0); EXECUTE insert(1); EXECUTE delete(0);
@ -77,3 +74,29 @@ EXECUTE insert(1); EXECUTE delete(0);
-- Should have the counter as 12 (6 * 2) -- Should have the counter as 12 (6 * 2)
SELECT * FROM prept; SELECT * FROM prept;
-- Test local tables
INSERT INTO s1 VALUES(1, 0); -- Matches DELETE clause
INSERT INTO s1 VALUES(2, 1); -- Matches UPDATE clause
INSERT INTO s1 VALUES(3, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(4, 1); -- No Match INSERT clause
INSERT INTO s1 VALUES(6, 1); -- No Match INSERT clause
INSERT INTO t1 VALUES(1, 0); -- Will be deleted
INSERT INTO t1 VALUES(2, 0); -- Will be updated
INSERT INTO t1 VALUES(5, 0); -- Will be intact
WITH s1_res AS (
SELECT * FROM s1
)
MERGE INTO t1
USING s1_res ON (s1_res.id = t1.id)
WHEN MATCHED AND s1_res.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
SELECT * FROM t1 order by id;

View File

@ -1,3 +1,11 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE; DROP SCHEMA IF EXISTS merge_arbitrary_schema CASCADE;
CREATE SCHEMA merge_arbitrary_schema; CREATE SCHEMA merge_arbitrary_schema;
SET search_path TO merge_arbitrary_schema; SET search_path TO merge_arbitrary_schema;
@ -10,3 +18,33 @@ CREATE TABLE source_cj2(sid2 int, src2 text, val2 int);
SELECT create_distributed_table('target_cj', 'tid'); SELECT create_distributed_table('target_cj', 'tid');
SELECT create_distributed_table('source_cj1', 'sid1'); SELECT create_distributed_table('source_cj1', 'sid1');
SELECT create_distributed_table('source_cj2', 'sid2'); SELECT create_distributed_table('source_cj2', 'sid2');
CREATE TABLE prept(t1 int, t2 int);
CREATE TABLE preps(s1 int, s2 int);
SELECT create_distributed_table('prept', 't1'), create_distributed_table('preps', 's1');
PREPARE insert(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + $1
WHEN NOT MATCHED THEN
INSERT VALUES(s1, s2);
PREPARE delete(int) AS
MERGE INTO prept
USING preps
ON prept.t1 = preps.s1
WHEN MATCHED AND prept.t2 = $1 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
-- Citus local tables
CREATE TABLE t1(id int, val int);
CREATE TABLE s1(id int, val int);
SELECT citus_add_local_table_to_metadata('t1');
SELECT citus_add_local_table_to_metadata('s1');