mirror of https://github.com/citusdata/citus.git
Support MERGE command for single_shard_distributed Target (#7643)
This PR has following changes : 1. Enable MERGE command for single_shard_distributed targets.pull/7648/head
parent
accb7d09f7
commit
3c467e6e02
|
@ -2568,7 +2568,7 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
|
||||||
* Find the shard interval and id for the partition column value for
|
* Find the shard interval and id for the partition column value for
|
||||||
* non-reference tables.
|
* non-reference tables.
|
||||||
*
|
*
|
||||||
* For reference table, this function blindly returns the tables single
|
* For reference table, and single shard distributed table this function blindly returns the tables single
|
||||||
* shard.
|
* shard.
|
||||||
*/
|
*/
|
||||||
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry);
|
ShardInterval *shardInterval = FindShardInterval(partitionColumnValue, cacheEntry);
|
||||||
|
|
|
@ -243,14 +243,27 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
|
||||||
|
|
||||||
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
|
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
|
||||||
|
|
||||||
|
|
||||||
|
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* if target table is SINGLE_SHARD_DISTRIBUTED let's set this to invalid -1
|
||||||
|
* so later in execution phase we don't rely on this value and try to find single shard of target instead.
|
||||||
|
*/
|
||||||
|
distributedPlan->sourceResultRepartitionColumnIndex = -1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* Get the index of the column in the source query that will be utilized
|
* Get the index of the column in the source query that will be utilized
|
||||||
* to repartition the source rows, ensuring colocation with the target
|
* to repartition the source rows, ensuring colocation with the target
|
||||||
*/
|
*/
|
||||||
|
|
||||||
distributedPlan->sourceResultRepartitionColumnIndex =
|
distributedPlan->sourceResultRepartitionColumnIndex =
|
||||||
SourceResultPartitionColumnIndex(mergeQuery,
|
SourceResultPartitionColumnIndex(mergeQuery,
|
||||||
sourceQuery->targetList,
|
sourceQuery->targetList,
|
||||||
targetRelation);
|
targetRelation);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make a copy of the source query, since following code scribbles it
|
* Make a copy of the source query, since following code scribbles it
|
||||||
|
@ -262,11 +275,11 @@ CreateNonPushableMergePlan(Oid targetRelationId, uint64 planId, Query *originalQ
|
||||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||||
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
|
PlannedStmt *sourceRowsPlan = pg_plan_query(sourceQueryCopy, NULL, cursorOptions,
|
||||||
boundParams);
|
boundParams);
|
||||||
bool repartitioned = IsRedistributablePlan(sourceRowsPlan->planTree) &&
|
bool isRepartitionAllowed = IsRedistributablePlan(sourceRowsPlan->planTree) &&
|
||||||
IsSupportedRedistributionTarget(targetRelationId);
|
IsSupportedRedistributionTarget(targetRelationId);
|
||||||
|
|
||||||
/* If plan is distributed, no work at the coordinator */
|
/* If plan is distributed, no work at the coordinator */
|
||||||
if (repartitioned)
|
if (isRepartitionAllowed)
|
||||||
{
|
{
|
||||||
distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION;
|
distributedPlan->modifyWithSelectMethod = MODIFY_WITH_SELECT_REPARTITION;
|
||||||
}
|
}
|
||||||
|
@ -1273,13 +1286,6 @@ static int
|
||||||
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
||||||
CitusTableCacheEntry *targetRelation)
|
CitusTableCacheEntry *targetRelation)
|
||||||
{
|
{
|
||||||
if (IsCitusTableType(targetRelation->relationId, SINGLE_SHARD_DISTRIBUTED))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("MERGE operation across distributed schemas "
|
|
||||||
"or with a row-based distributed table is "
|
|
||||||
"not yet supported")));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Get all the Join conditions from the ON clause */
|
/* Get all the Join conditions from the ON clause */
|
||||||
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree);
|
List *mergeJoinConditionList = WhereClauseList(mergeQuery->jointree);
|
||||||
Var *targetColumn = targetRelation->partitionColumn;
|
Var *targetColumn = targetRelation->partitionColumn;
|
||||||
|
|
|
@ -32,6 +32,7 @@ s/"t2_[0-9]+"/"t2_xxxxxxx"/g
|
||||||
# shard table names for MERGE tests
|
# shard table names for MERGE tests
|
||||||
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
|
s/merge_schema\.([_a-z0-9]+)_40[0-9]+ /merge_schema.\1_xxxxxxx /g
|
||||||
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
|
s/pgmerge_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
|
||||||
|
s/merge_vcore_schema\.([_a-z0-9]+)_40[0-9]+ /pgmerge_schema.\1_xxxxxxx /g
|
||||||
|
|
||||||
# shard table names for multi_subquery
|
# shard table names for multi_subquery
|
||||||
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g
|
s/ keyval(1|2|ref)_[0-9]+ / keyval\1_xxxxxxx /g
|
||||||
|
|
|
@ -98,14 +98,26 @@ WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b;
|
||||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
MERGE INTO schema_shard_table1.nullkey_c1_t1 USING nullkey_c2_t1 ON (schema_shard_table1.nullkey_c1_t1.a = nullkey_c2_t1.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b
|
||||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b);
|
||||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) nullkey_c2_t1 ON (citus_table_alias.a OPERATOR(pg_catalog.=) nullkey_c2_t1.a) WHEN MATCHED THEN UPDATE SET b = nullkey_c2_t1.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (nullkey_c2_t1.a, nullkey_c2_t1.b)
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
-- with a distributed table
|
-- with a distributed table
|
||||||
SET search_path TO schema_shard_table1;
|
SET search_path TO schema_shard_table1;
|
||||||
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
MERGE INTO nullkey_c1_t1 USING schema_shard_table.distributed_table ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||||
|
@ -114,7 +126,12 @@ WHEN NOT MATCHED THEN INSERT VALUES (schema_shard_table.distributed_table.a, sch
|
||||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) distributed_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) distributed_table.a) WHEN MATCHED THEN UPDATE SET b = distributed_table.b WHEN NOT MATCHED THEN INSERT (a, b) VALUES (distributed_table.a, distributed_table.b)
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
MERGE INTO schema_shard_table.distributed_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.distributed_table.a)
|
||||||
WHEN MATCHED THEN DELETE
|
WHEN MATCHED THEN DELETE
|
||||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||||
|
@ -163,7 +180,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.reference_table.b;
|
||||||
DEBUG: A mix of distributed and reference table, try repartitioning
|
DEBUG: A mix of distributed and reference table, try repartitioning
|
||||||
DEBUG: A mix of distributed and reference table, routable query is not possible
|
DEBUG: A mix of distributed and reference table, routable query is not possible
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) reference_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) reference_table.a) WHEN MATCHED THEN UPDATE SET b = reference_table.b
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
MERGE INTO schema_shard_table.reference_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.reference_table.a)
|
||||||
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
WHEN MATCHED THEN UPDATE SET b = nullkey_c1_t1.b
|
||||||
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
WHEN NOT MATCHED THEN INSERT VALUES (nullkey_c1_t1.a, nullkey_c1_t1.b);
|
||||||
|
@ -174,7 +197,13 @@ WHEN MATCHED THEN UPDATE SET b = schema_shard_table.citus_local_table.b;
|
||||||
DEBUG: A mix of distributed and local table, try repartitioning
|
DEBUG: A mix of distributed and local table, try repartitioning
|
||||||
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
DEBUG: A mix of distributed and citus-local table, routable query is not possible
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) citus_local_table ON (citus_table_alias.a OPERATOR(pg_catalog.=) citus_local_table.a) WHEN MATCHED THEN UPDATE SET b = citus_local_table.b
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
MERGE INTO schema_shard_table.citus_local_table USING nullkey_c1_t1 ON (nullkey_c1_t1.a = schema_shard_table.citus_local_table.a)
|
||||||
WHEN MATCHED THEN DELETE;
|
WHEN MATCHED THEN DELETE;
|
||||||
DEBUG: A mix of distributed and local table, try repartitioning
|
DEBUG: A mix of distributed and local table, try repartitioning
|
||||||
|
@ -210,7 +239,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
WITH cte AS materialized (
|
WITH cte AS materialized (
|
||||||
SELECT * FROM schema_shard_table.distributed_table
|
SELECT * FROM schema_shard_table.distributed_table
|
||||||
)
|
)
|
||||||
|
@ -219,7 +253,12 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
|
||||||
DEBUG: Distributed tables are not co-located, try repartitioning
|
DEBUG: Distributed tables are not co-located, try repartitioning
|
||||||
DEBUG: For MERGE command, all the distributed tables must be colocated
|
DEBUG: For MERGE command, all the distributed tables must be colocated
|
||||||
DEBUG: Creating MERGE repartition plan
|
DEBUG: Creating MERGE repartition plan
|
||||||
ERROR: MERGE operation across distributed schemas or with a row-based distributed table is not yet supported
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Collect source query results on coordinator
|
||||||
|
DEBUG: Create a MERGE task list that needs to be routed
|
||||||
|
DEBUG: <Deparsed MERGE query: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b>
|
||||||
|
DEBUG: distributed statement: MERGE INTO schema_shard_table1.nullkey_c1_t1_4005006 citus_table_alias USING (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('merge_into_XXX_4005006'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte ON (citus_table_alias.a OPERATOR(pg_catalog.=) cte.a) WHEN MATCHED THEN UPDATE SET b = cte.b
|
||||||
|
DEBUG: Execute MERGE task list
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA schema_shard_table1 CASCADE;
|
DROP SCHEMA schema_shard_table1 CASCADE;
|
||||||
DROP SCHEMA schema_shard_table2 CASCADE;
|
DROP SCHEMA schema_shard_table2 CASCADE;
|
||||||
|
|
|
@ -0,0 +1,481 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
-- MERGE command performs a join from data_source to target_table_name
|
||||||
|
DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;
|
||||||
|
NOTICE: schema "merge_vcore_schema" does not exist, skipping
|
||||||
|
--MERGE INTO target
|
||||||
|
--USING source
|
||||||
|
--WHEN NOT MATCHED
|
||||||
|
--WHEN MATCHED AND <condition>
|
||||||
|
--WHEN MATCHED
|
||||||
|
CREATE SCHEMA merge_vcore_schema;
|
||||||
|
SET search_path TO merge_vcore_schema;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.next_shard_id TO 4000000;
|
||||||
|
SET citus.explain_all_tasks TO true;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET client_min_messages = warning;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
-- ****************************************** CASE 1 : Both are singleSharded***************************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', null, colocate_with=>'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000000 source
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
-- *************** CASE 2 : source is single sharded and target is distributed *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000002 source
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
-- *************** CASE 3 : source is distributed and target is single sharded *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', null);
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000007 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000008 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000009 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000010 source
|
||||||
|
(17 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
-- *************** CASE 4 : both are distributed *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000012 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000013 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000014 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000015 source
|
||||||
|
(17 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
-- *************** CASE 5 : both are distributed & colocated *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', 'id', colocate_with=>'source');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus MERGE INTO ...)
|
||||||
|
MERGE INTO target method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000020 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000021 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000022 source
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_4000023 source
|
||||||
|
(17 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
-- *************** CASE 6 : both are singlesharded & colocated *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('source', null);
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('target', null, colocate_with=>'source');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"a" : 1}
|
||||||
|
2 | {"a" : 2}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
SELECT * FROM target;
|
||||||
|
id | doc
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | {"b" : 1}
|
||||||
|
2 | {"b" : 1}
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Merge on target_4000029 target
|
||||||
|
-> Nested Loop
|
||||||
|
-> Seq Scan on source_4000028 source
|
||||||
|
-> Materialize
|
||||||
|
-> Seq Scan on target_4000029 target
|
||||||
|
Filter: ('2'::bigint = id)
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;
|
|
@ -0,0 +1,6 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
|
@ -120,6 +120,7 @@ test: merge pgmerge
|
||||||
test: merge_repartition2
|
test: merge_repartition2
|
||||||
test: merge_repartition1 merge_schema_sharding
|
test: merge_repartition1 merge_schema_sharding
|
||||||
test: merge_partition_tables
|
test: merge_partition_tables
|
||||||
|
test: merge_vcore
|
||||||
|
|
||||||
# ---------
|
# ---------
|
||||||
# test that no tests leaked intermediate results. This should always be last
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
|
|
|
@ -0,0 +1,319 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
|
||||||
|
\gset
|
||||||
|
\if :server_version_ge_15
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
|
||||||
|
-- MERGE command performs a join from data_source to target_table_name
|
||||||
|
DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;
|
||||||
|
--MERGE INTO target
|
||||||
|
--USING source
|
||||||
|
--WHEN NOT MATCHED
|
||||||
|
--WHEN MATCHED AND <condition>
|
||||||
|
--WHEN MATCHED
|
||||||
|
|
||||||
|
CREATE SCHEMA merge_vcore_schema;
|
||||||
|
SET search_path TO merge_vcore_schema;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
SET citus.next_shard_id TO 4000000;
|
||||||
|
SET citus.explain_all_tasks TO true;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
SET client_min_messages = warning;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
|
||||||
|
-- ****************************************** CASE 1 : Both are singleSharded***************************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||||
|
SELECT create_distributed_table('target', null, colocate_with=>'none');
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
|
||||||
|
-- *************** CASE 2 : source is single sharded and target is distributed *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', null, colocate_with=>'none');
|
||||||
|
SELECT create_distributed_table('target', 'id');
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
|
||||||
|
-- *************** CASE 3 : source is distributed and target is single sharded *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
SELECT create_distributed_table('target', null);
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
|
||||||
|
-- *************** CASE 4 : both are distributed *******************************
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
SELECT create_distributed_table('target', 'id');
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
|
||||||
|
-- *************** CASE 5 : both are distributed & colocated *******************************
|
||||||
|
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', 'id');
|
||||||
|
SELECT create_distributed_table('target', 'id', colocate_with=>'source');
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
|
||||||
|
-- *************** CASE 6 : both are singlesharded & colocated *******************************
|
||||||
|
|
||||||
|
CREATE TABLE source (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE TABLE target (
|
||||||
|
id bigint,
|
||||||
|
doc text
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('source', null);
|
||||||
|
SELECT create_distributed_table('target', null, colocate_with=>'source');
|
||||||
|
|
||||||
|
INSERT INTO source (id, doc) VALUES (1, '{"a" : 1}'), (1, '{"a" : 2}');
|
||||||
|
|
||||||
|
-- insert
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- update
|
||||||
|
MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id) AND src.doc = target.doc
|
||||||
|
WHEN MATCHED THEN
|
||||||
|
UPDATE SET doc = '{"b" : 1}'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT (id, doc)
|
||||||
|
VALUES (src.t_id, doc);
|
||||||
|
|
||||||
|
SELECT * FROM target;
|
||||||
|
|
||||||
|
-- Explain
|
||||||
|
EXPLAIN (costs off, timing off, summary off) MERGE INTO ONLY target USING (SELECT 2::bigint AS t_id, doc FROM source) src
|
||||||
|
ON (src.t_id = target.id)
|
||||||
|
WHEN MATCHED THEN DO NOTHING;
|
||||||
|
|
||||||
|
DROP TABLE IF EXISTS source;
|
||||||
|
DROP TABLE IF EXISTS target;
|
||||||
|
|
||||||
|
DROP SCHEMA IF EXISTS merge_vcore_schema CASCADE;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue