mirror of https://github.com/citusdata/citus.git
addressing review comments
parent
22d23fdb0a
commit
98ac3aa938
|
@ -2568,7 +2568,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
|
|||
* Find the shard interval and id for the partition column value for
|
||||
* non-reference tables.
|
||||
*
|
||||
* For reference table, this function blindly returns the tables single
|
||||
* For reference table, and single shard distributed table this function blindly returns the tables single
|
||||
* shard.
|
||||
*/
|
||||
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry);
|
||||
|
|
|
@ -243,18 +243,26 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
|
|||
|
||||
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
|
||||
|
||||
bool repartitioned = IsSupportedRedistributionTarget(targetRelationId);
|
||||
|
||||
if(repartitioned)
|
||||
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||
{
|
||||
/*
|
||||
* Get the index of the column in the source query that will be utilized
|
||||
* to repartition the source rows, ensuring colocation with the target
|
||||
*/
|
||||
* if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1
|
||||
* so later in execution phase we don't rely on this value and try to find single shard of target instead.
|
||||
*/
|
||||
distributedPlan->sourceResultRepartitionColumnIndex = -1;
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Get the index of the column in the source query that will be utilized
|
||||
* to repartition the source rows, ensuring colocation with the target
|
||||
*/
|
||||
|
||||
distributedPlan->sourceResultRepartitionColumnIndex =
|
||||
SourceResultPartitionColumnIndex(mergeQuery,
|
||||
sourceQuery->targetList,
|
||||
targetRelation);
|
||||
sourceQuery->targetList,
|
||||
targetRelation);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -267,9 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
|
|||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
|
||||
boundParams);
|
||||
repartitioned = repartitioned && IsRedistributablePlan(sourceRowsPlan->planTree);
|
||||
bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) &&
|
||||
IsSupportedRedistributionTarget(targetRelationId);
|
||||
|
||||
/* If plan is distributed, no work at the coordinator */
|
||||
if (repartitioned)
|
||||
if (isRepartitionAllowed)
|
||||
{
|
||||
distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION;
|
||||
}
|
||||
|
@ -1276,13 +1286,6 @@ static int
|
|||
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
||||
CitusTableCacheEntry *targetRelation)
|
||||
{
|
||||
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||
{
|
||||
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
|
||||
"or with a row-based distributed table is "
|
||||
"not yet supported")));
|
||||
}
|
||||
|
||||
/* Get all the Join conditions from the ON clause */
|
||||
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree);
|
||||
Var *targetColumn = targetRelation->partitionColumn;
|
||||
|
|
|
@ -0,0 +1,480 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||
\gset
|
||||
\if :server_version_ge_15
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
-- MERGE command performs a join from data_source to target_table_name
|
||||
DROP SCHEMA IF EXISTS merge_schema CASCADE;
|
||||
NOTICE: schema "merge_schema" does not exist, skipping
|
||||
--MERGE INTO target
|
||||
--USING source
|
||||
--WHEN NOT MATCHED
|
||||
--WHEN MATCHED AND <condition>
|
||||
--WHEN MATCHED
|
||||
CREATE SCHEMA merge_schema;
|
||||
SET search_path TO merge_schema;
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.next_shard_id TO 4000000;
|
||||
SET citus.explain_all_tasks TO true;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||
SET client_min_messages = warning;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
-- ****************************************** CASE 1 : Both are singleSharded***************************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
|
||||
MERGE INTO target method: pull to coordinator
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000000 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
(8 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
-- *************** CASE 2 : source is single sharded and target is distributed *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
|
||||
MERGE INTO target method: pull to coordinator
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000002 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
(8 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
-- *************** CASE 3 : source is distributed and target is single sharded *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', null);
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
|
||||
MERGE INTO target method: pull to coordinator
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40)
|
||||
Task Count: 4
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000007 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000008 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000009 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000010 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
(17 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
-- *************** CASE 4 : both are distributed *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
|
||||
MERGE INTO target method: repartition
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40)
|
||||
Task Count: 4
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000012 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000013 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000014 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000015 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
(17 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
-- *************** CASE 5 : both are distributed & colocated *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', 'id', colocate_with=>'source');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus MERGE INTO ...) (cost=0.00..0.00 rows=0 width=0)
|
||||
MERGE INTO target method: repartition
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=40)
|
||||
Task Count: 4
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000020 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000021 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000022 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Seq Scan on source_4000023 source (cost=0.00..22.00 rows=1200 width=40)
|
||||
(17 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
-- *************** CASE 6 : both are singlesharded & colocated *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
SELECT create_distributed_table('source', null);
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('target', null, colocate_with=>'source');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"a" : 1}
|
||||
2 | {"a" : 2}
|
||||
(2 rows)
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
SELECT * FROM target;
|
||||
id | doc
|
||||
---------------------------------------------------------------------
|
||||
2 | {"b" : 1}
|
||||
2 | {"b" : 1}
|
||||
(2 rows)
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
QUERY PLAN
|
||||
---------------------------------------------------------------------
|
||||
Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
Task Count: 1
|
||||
Tasks Shown: All
|
||||
-> Task
|
||||
Node: host=localhost port=xxxxx dbname=regression
|
||||
-> Merge on target_4000029 target (cost=0.00..137.01 rows=0 width=0)
|
||||
-> Nested Loop (cost=0.00..137.01 rows=7200 width=6)
|
||||
-> Seq Scan on source_4000028 source (cost=0.00..22.00 rows=1200 width=0)
|
||||
-> Materialize (cost=0.00..25.03 rows=6 width=6)
|
||||
-> Seq Scan on target_4000029 target (cost=0.00..25.00 rows=6 width=6)
|
||||
Filter: ('2'::bigint = id)
|
||||
(11 rows)
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
|
@ -120,6 +120,7 @@ test: merge pgmerge
|
|||
test: merge_repartition2
|
||||
test: merge_repartition1 merge_schema_sharding
|
||||
test: merge_partition_tables
|
||||
test: merge_vcore
|
||||
|
||||
# ---------
|
||||
# test that no tests leaked intermediate results. This should always be last
|
||||
|
|
|
@ -0,0 +1,313 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||
\gset
|
||||
\if :server_version_ge_15
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
|
||||
-- MERGE command performs a join from data_source to target_table_name
|
||||
DROP SCHEMA IF EXISTS merge_schema CASCADE;
|
||||
--MERGE INTO target
|
||||
--USING source
|
||||
--WHEN NOT MATCHED
|
||||
--WHEN MATCHED AND <condition>
|
||||
--WHEN MATCHED
|
||||
|
||||
CREATE SCHEMA merge_schema;
|
||||
SET search_path TO merge_schema;
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.next_shard_id TO 4000000;
|
||||
SET citus.explain_all_tasks TO true;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||
SET client_min_messages = warning;
|
||||
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||
RESET client_min_messages;
|
||||
|
||||
|
||||
-- ****************************************** CASE 1 : Both are singleSharded***************************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||
SELECT create_distributed_table('target', null, colocate_with=>'none');
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
|
||||
|
||||
-- *************** CASE 2 : source is single sharded and target is distributed *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||
SELECT create_distributed_table('target', 'id');
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
|
||||
|
||||
-- *************** CASE 3 : source is distributed and target is single sharded *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
SELECT create_distributed_table('target', null);
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
|
||||
|
||||
-- *************** CASE 4 : both are distributed *******************************
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
SELECT create_distributed_table('target', 'id');
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
|
||||
|
||||
-- *************** CASE 5 : both are distributed & colocated *******************************
|
||||
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', 'id');
|
||||
SELECT create_distributed_table('target', 'id', colocate_with=>'source');
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
||||
|
||||
|
||||
-- *************** CASE 6 : both are singlesharded & colocated *******************************
|
||||
|
||||
CREATE TABLE source (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
CREATE TABLE target (
|
||||
id bigint,
|
||||
doc text
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('source', null);
|
||||
SELECT create_distributed_table('target', null, colocate_with=>'source');
|
||||
|
||||
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||
|
||||
-- insert
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- update
|
||||
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET doc = '{"b" : 1}'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (id, doc)
|
||||
VALUES (src.t_id, doc);
|
||||
|
||||
SELECT * FROM target;
|
||||
|
||||
-- Explain
|
||||
EXPLAIN MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||
ON (src.t_id = target.id)
|
||||
WHEN MATCHED THEN DO NOTHING;
|
||||
|
||||
DROP TABLE IF EXISTS source;
|
||||
DROP TABLE IF EXISTS target;
|
Loading…
Reference in New Issue