From e444dd4f3f349e62d411e0d4bcc71e9c17eed0d0 Mon Sep 17 00:00:00 2001 From: Teja Mupparti Date: Sat, 25 Mar 2023 15:54:02 -0700 Subject: [PATCH] MERGE: Support reference table as source with local table as target --- .../distributed/metadata/node_metadata.c | 4 +- .../distributed/planner/merge_planner.c | 278 +++++++++--------- .../planner/multi_router_planner.c | 21 +- src/include/distributed/merge_planner.h | 7 +- .../distributed/multi_router_planner.h | 1 + .../distributed/pg_dist_node_metadata.h | 2 + src/test/regress/expected/merge.out | 141 ++++++++- src/test/regress/expected/pg15.out | 7 +- src/test/regress/sql/merge.sql | 101 +++++++ src/test/regress/sql/pg15.sql | 4 +- 10 files changed, 410 insertions(+), 156 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 1c0314a49..8586ac934 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -36,6 +36,7 @@ #include "distributed/multi_join_order.h" #include "distributed/multi_router_planner.h" #include "distributed/pg_dist_node.h" +#include "distributed/pg_dist_node_metadata.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" @@ -119,7 +120,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *field); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); -static int FindCoordinatorNodeId(void); static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static void ErrorIfAnyNodeNotExist(List *nodeList); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); @@ -1800,7 +1800,7 @@ FindNodeWithNodeId(int nodeId, bool missingOk) /* * FindCoordinatorNodeId returns the node id of the coordinator node */ -static int +int FindCoordinatorNodeId() { bool includeNodesFromOtherClusters = false; diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index c67095624..5b39aeba6 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -22,29 +22,27 @@ #include "distributed/merge_planner.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_router_planner.h" +#include "distributed/pg_dist_node_metadata.h" #include "distributed/pg_version_constants.h" #include "distributed/query_pushdown_planning.h" #if PG_VERSION_NUM >= PG_VERSION_15 -static DeferredErrorMessage * CheckIfRTETypeIsUnsupported(Query *parse, - RangeTblEntry *rangeTableEntry); static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, List * distTablesList, PlannerRestrictionContext * plannerRestrictionContext); -static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, +static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, + Query *parse, List *rangeTableList, PlannerRestrictionContext * restrictionContext); static bool IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool skipOuterVars); -static DeferredErrorMessage * InsertDistributionColumnMatchesSource(Query *query, - RangeTblEntry * - resultRte); - +static DeferredErrorMessage * InsertDistributionColumnMatchesSource(Oid targetRelationId, + Query *query); static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTree, @@ -65,12 +63,15 @@ CreateMergePlan(Query *originalQuery, Query *query, { DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); bool multiShardQuery = false; + Oid targetRelationId = ModifyQueryResultRelationId(originalQuery); Assert(originalQuery->commandType == CMD_MERGE); + Assert(OidIsValid(targetRelationId)); + distributedPlan->targetRelationId = targetRelationId; distributedPlan->modLevel = RowModifyLevelForQuery(query); - - distributedPlan->planningError = MergeQuerySupported(originalQuery, + distributedPlan->planningError = MergeQuerySupported(targetRelationId, + originalQuery, multiShardQuery, plannerRestrictionContext); @@ -94,8 +95,6 @@ CreateMergePlan(Query *originalQuery, Query *query, /* MERGE doesn't support RETURNING clause */ distributedPlan->expectResults = false; - distributedPlan->targetRelationId = ResultRelationOidForQuery(query); - distributedPlan->fastPathRouterPlan = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; @@ -111,7 +110,7 @@ CreateMergePlan(Query *originalQuery, Query *query, * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported */ DeferredErrorMessage * -MergeQuerySupported(Query *originalQuery, bool multiShardQuery, +MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery, PlannerRestrictionContext *plannerRestrictionContext) { /* function is void for pre-15 versions of Postgres */ @@ -138,7 +137,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, } List *rangeTableList = ExtractRangeTableEntryList(originalQuery); - RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery); /* * Fast path queries cannot have merge command, and we prevent the remaining here. @@ -149,7 +147,8 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, * ErrorIfDistTablesNotColocated for details. */ DeferredErrorMessage *deferredError = - ErrorIfMergeHasUnsupportedTables(originalQuery, + ErrorIfMergeHasUnsupportedTables(resultRelationId, + originalQuery, rangeTableList, plannerRestrictionContext); if (deferredError) @@ -158,7 +157,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, RaiseDeferredError(deferredError, ERROR); } - Oid resultRelationId = resultRte->relid; deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, originalQuery->jointree, originalQuery->jointree-> @@ -191,7 +189,7 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, } deferredError = - InsertDistributionColumnMatchesSource(originalQuery, resultRte); + InsertDistributionColumnMatchesSource(resultRelationId, originalQuery); if (deferredError) { /* MERGE's unsupported scenario, raise the exception */ @@ -222,32 +220,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery, } -/* - * IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is - * permitted on special relations, such as materialized view, returns true only if - * it's a "source" relation. - */ -bool -IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte) -{ - if (!IsMergeQuery(parse)) - { - return false; - } - - /* Fetch the MERGE target relation */ - RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable); - - /* Is it a target relation? */ - if (targetRte->relid == rte->relid) - { - return false; - } - - return true; -} - - #if PG_VERSION_NUM >= PG_VERSION_15 /* @@ -283,70 +255,6 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList, } -/* - * ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such - * as, reference tables, append-distributed tables and materialized view as target relation. - * Routine returns NULL for the supported types, error message for everything else. - */ -static DeferredErrorMessage * -CheckIfRTETypeIsUnsupported(Query *parse, RangeTblEntry *rangeTableEntry) -{ - if (rangeTableEntry->relkind == RELKIND_MATVIEW || - rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE) - { - /* Materialized view or Foreign table as target is not allowed */ - if (IsMergeAllowedOnRelation(parse, rangeTableEntry)) - { - /* Non target relation is ok */ - return NULL; - } - else - { - /* Usually we don't reach this exception as the Postgres parser catches it */ - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "MERGE command is not allowed on " - "relation type(relkind:%c)", - rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - errorMessage->data, NULL, NULL); - } - } - - if (rangeTableEntry->relkind != RELKIND_RELATION && - rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE) - { - StringInfo errorMessage = makeStringInfo(); - appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " - "in MERGE command", rangeTableEntry->relkind); - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - errorMessage->data, NULL, NULL); - } - - Assert(rangeTableEntry->relid != 0); - - /* Reference tables are not supported yet */ - if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "MERGE command is not supported on reference " - "tables yet", NULL, NULL); - } - - /* Append/Range tables are not supported */ - if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) || - IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED)) - { - return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "For MERGE command, all the distributed tables " - "must be colocated, for append/range distribution, " - "colocation is not supported", NULL, - "Consider using hash distribution instead"); - } - - return NULL; -} - - /* * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus @@ -355,11 +263,12 @@ CheckIfRTETypeIsUnsupported(Query *parse, RangeTblEntry *rangeTableEntry) * for all other combinations. */ static DeferredErrorMessage * -ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, +ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, Query *parse, List *rangeTableList, PlannerRestrictionContext *restrictionContext) { List *distTablesList = NIL; bool foundLocalTables = false; + bool foundReferenceTables = false; RangeTblEntry *rangeTableEntry = NULL; foreach_ptr(rangeTableEntry, rangeTableList) @@ -410,18 +319,48 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, } /* RTE Relation can be of various types, check them now */ - - /* skip the regular views as they are replaced with subqueries */ - if (rangeTableEntry->relkind == RELKIND_VIEW) + switch (rangeTableEntry->relkind) { - continue; - } + /* skip the regular views as they are replaced with subqueries */ + case RELKIND_VIEW: + { + continue; + } - DeferredErrorMessage *errorMessage = - CheckIfRTETypeIsUnsupported(parse, rangeTableEntry); - if (errorMessage) - { - return errorMessage; + case RELKIND_MATVIEW: + case RELKIND_FOREIGN_TABLE: + { + /* These two cases as a target is not allowed */ + if (relationId == targetRelationId) + { + /* Usually we don't reach this exception as the Postgres parser catches it */ + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "MERGE command is not allowed on " + "relation type(relkind:%c)", + rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } + break; + } + + case RELKIND_RELATION: + case RELKIND_PARTITIONED_TABLE: + { + /* Check for citus/postgres table types */ + Assert(OidIsValid(relationId)); + break; + } + + default: + { + StringInfo errorMessage = makeStringInfo(); + appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) " + "in MERGE command", + rangeTableEntry->relkind); + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + errorMessage->data, NULL, NULL); + } } /* @@ -430,28 +369,63 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, */ if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) { - distTablesList = lappend(distTablesList, rangeTableEntry); - continue; - } + /* Append/Range distributed tables are not supported */ + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) || + IsCitusTableType(relationId, RANGE_DISTRIBUTED)) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "For MERGE command, all the distributed tables " + "must be colocated, for append/range distribution, " + "colocation is not supported", NULL, + "Consider using hash distribution instead"); + } - /* Regular Postgres tables and Citus local tables are allowed */ - if (!IsCitusTable(relationId) || - IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + distTablesList = lappend(distTablesList, rangeTableEntry); + } + else if (IsCitusTableType(relationId, REFERENCE_TABLE)) { + /* Reference table as a target is not allowed */ + if (relationId == targetRelationId) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "Reference table as target " + "is not allowed in " + "MERGE command", NULL, NULL); + } + + foundReferenceTables = true; + } + else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + /* Citus local tables */ + foundLocalTables = true; + } + else if (!IsCitusTable(relationId)) + { + /* Regular Postgres table */ foundLocalTables = true; - continue; } /* Any other Citus table type missing ? */ } - /* Ensure all tables are indeed local */ - if (foundLocalTables && list_length(distTablesList) == 0) + /* Ensure all tables are indeed local (or a combination of reference and local) */ + if (list_length(distTablesList) == 0) { - /* All the tables are local, supported */ + /* + * All the tables are local/reference, supported as long as + * coordinator is in the metadata. + */ + if (FindCoordinatorNodeId() == -1) + { + elog(ERROR, "Coordinator node is not in the metadata. TODO better meesage"); + } + + /* All the tables are local/reference, supported */ return NULL; } - else if (foundLocalTables && list_length(distTablesList) > 0) + + if (foundLocalTables) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "MERGE command is not supported with " @@ -459,6 +433,17 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, NULL, NULL); } + if (foundReferenceTables) + { + return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, + "MERGE command is not supported with " + "combination of distributed/reference yet", + NULL, + "If target is distributed, source " + "must be distributed and co-located"); + } + + /* Ensure all distributed tables are indeed co-located */ return ErrorIfDistTablesNotColocated(parse, distTablesList, @@ -515,11 +500,11 @@ IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool * prevent such mishaps, we disallow such inserts here. */ static DeferredErrorMessage * -InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) +InsertDistributionColumnMatchesSource(Oid targetRelationId, Query *query) { Assert(IsMergeQuery(query)); - if (!IsCitusTableType(resultRte->relid, DISTRIBUTED_TABLE)) + if (!IsCitusTableType(targetRelationId, DISTRIBUTED_TABLE)) { return NULL; } @@ -549,7 +534,7 @@ InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) } Assert(action->commandType == CMD_INSERT); - Var *targetKey = PartitionColumn(resultRte->relid, 1); + Var *targetKey = PartitionColumn(targetRelationId, 1); TargetEntry *targetEntry = NULL; foreach_ptr(targetEntry, action->targetList) @@ -736,3 +721,34 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre #endif + + +/* + * IsLocalTableModification returns true if the table modified is a Postgres table. + * We do not support recursive planning for MERGE yet, so we could have a join + * between local and Citus tables. Only allow local tables when it is the target table. + */ +bool +IsLocalTableModification(Oid targetRelationId, Query *query, uint64 shardId, + RTEListProperties *rteProperties) +{ + /* No-op for SELECT command */ + if (!IsModifyCommand(query)) + { + return false; + } + + /* For MERGE, we have to check only the target relation */ + if (IsMergeQuery(query) && !IsCitusTable(targetRelationId)) + { + /* Postgres table */ + return true; + } + + if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties)) + { + return true; + } + + return false; +} diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 94691bab9..a95be74f8 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2246,10 +2246,8 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query) } -static bool ContainsOnlyLocalTables(RTEListProperties *rteProperties); - /* - * RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries. + * RouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and MERGE queries. * If there are shards present and query is routable, all RTEs have been updated * to point to the relevant shards in the originalQuery. Also, placementList is * filled with the list of worker nodes that has all the required shard placements @@ -2282,6 +2280,7 @@ PlanRouterQuery(Query *originalQuery, DeferredErrorMessage *planningError = NULL; bool shardsPresent = false; CmdType commandType = originalQuery->commandType; + Oid targetRelationId = InvalidOid; bool fastPathRouterQuery = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; @@ -2350,7 +2349,8 @@ PlanRouterQuery(Query *originalQuery, if (IsMergeQuery(originalQuery)) { - planningError = MergeQuerySupported(originalQuery, + targetRelationId = ModifyQueryResultRelationId(originalQuery); + planningError = MergeQuerySupported(targetRelationId, originalQuery, isMultiShardQuery, plannerRestrictionContext); } @@ -2403,13 +2403,14 @@ PlanRouterQuery(Query *originalQuery, /* both Postgres tables and materialized tables are locally avaliable */ RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery); - if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties)) + + if (isLocalTableModification) { - if (commandType != CMD_SELECT) - { - *isLocalTableModification = true; - } + *isLocalTableModification = + IsLocalTableModification(targetRelationId, originalQuery, shardId, + rteProperties); } + bool hasPostgresLocalRelation = rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView; List *taskPlacementList = @@ -2447,7 +2448,7 @@ PlanRouterQuery(Query *originalQuery, * ContainsOnlyLocalTables returns true if there is only * local tables and not any distributed or reference table. */ -static bool +bool ContainsOnlyLocalTables(RTEListProperties *rteProperties) { return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable; diff --git a/src/include/distributed/merge_planner.h b/src/include/distributed/merge_planner.h index 158f26861..b4ec1852f 100644 --- a/src/include/distributed/merge_planner.h +++ b/src/include/distributed/merge_planner.h @@ -19,13 +19,16 @@ #include "distributed/errormessage.h" #include "distributed/multi_physical_planner.h" -extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); -extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, +extern DeferredErrorMessage * MergeQuerySupported(Oid resultRelationId, + Query *originalQuery, bool multiShardQuery, PlannerRestrictionContext * plannerRestrictionContext); extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query, PlannerRestrictionContext * plannerRestrictionContext); +extern bool IsLocalTableModification(Oid targetRelationId, Query *query, + uint64 shardId, + RTEListProperties *rteProperties); #endif /* MERGE_PLANNER_H */ diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 200c498ef..a255fd520 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -117,5 +117,6 @@ extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode); extern Job * RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionContext, DeferredErrorMessage **planningError); +extern bool ContainsOnlyLocalTables(RTEListProperties *rteProperties); #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/include/distributed/pg_dist_node_metadata.h b/src/include/distributed/pg_dist_node_metadata.h index f2bdc4801..00ccb9788 100644 --- a/src/include/distributed/pg_dist_node_metadata.h +++ b/src/include/distributed/pg_dist_node_metadata.h @@ -19,4 +19,6 @@ #define Natts_pg_dist_node_metadata 1 #define Anum_pg_dist_node_metadata_metadata 1 +extern int FindCoordinatorNodeId(void); + #endif /* PG_DIST_NODE_METADATA_H */ diff --git a/src/test/regress/expected/merge.out b/src/test/regress/expected/merge.out index 412667037..2196d966d 100644 --- a/src/test/regress/expected/merge.out +++ b/src/test/regress/expected/merge.out @@ -2416,9 +2416,134 @@ SELECT * FROM target_set ORDER BY 1, 2; 2 | (2 rows) +-- +-- Reference as a source +-- +CREATE TABLE reftarget_local(t1 int, t2 int); +CREATE TABLE refsource_ref(s1 int, s2 int); +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +DROP TABLE IF EXISTS pg_result; +SELECT * INTO pg_result FROM reftarget_local ORDER BY 1, 2; +-- Make source table as reference (target is Postgres) +TRUNCATE reftarget_local; +TRUNCATE refsource_ref; +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); +SELECT create_reference_table('refsource_ref'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.refsource_ref$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * INTO pg_ref FROM reftarget_local ORDER BY 1, 2; +-- Should be equal +SELECT c.*, p.* +FROM pg_ref c, pg_result p +WHERE c.t1 = p.t1 +ORDER BY 1,2; + t1 | t2 | t1 | t2 +--------------------------------------------------------------------- + 1 | 100 | 1 | 100 + 2 | | 2 | +(2 rows) + +-- Must return zero rows +SELECT count(*) +FROM pg_result FULL OUTER JOIN pg_ref ON pg_result.t1 = pg_ref.t1 +WHERE pg_result.t1 IS NULL OR pg_ref.t1 IS NULL; + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- Now make both Citus tables, reference as source, local as target +TRUNCATE reftarget_local; +TRUNCATE refsource_ref; +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); +SELECT citus_add_local_table_to_metadata('reftarget_local'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * INTO local_ref FROM reftarget_local ORDER BY 1, 2; +-- Should be equal +SELECT c.*, p.* +FROM local_ref c, pg_result p +WHERE c.t1 = p.t1 +ORDER BY 1,2; + t1 | t2 | t1 | t2 +--------------------------------------------------------------------- + 1 | 100 | 1 | 100 + 2 | | 2 | +(2 rows) + +-- Must return zero rows +SELECT count(*) +FROM pg_result FULL OUTER JOIN local_ref ON pg_result.t1 = local_ref.t1 +WHERE pg_result.t1 IS NULL OR local_ref.t1 IS NULL; + count +--------------------------------------------------------------------- + 0 +(1 row) + -- -- Error and Unsupported scenarios -- +-- Reference as a target and local as source +MERGE INTO refsource_ref +USING (SELECT * FROM reftarget_local UNION SELECT * FROM reftarget_local) AS foo ON refsource_ref.s1 = foo.t1 +WHEN MATCHED THEN + UPDATE SET s2 = s2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.t1); +ERROR: Reference table as target is not allowed in MERGE command +-- Reference as a source and distributed as target +MERGE INTO target_set t +USING refsource_ref AS s ON t.t1 = s.s1 +WHEN MATCHED THEN + DO NOTHING; +ERROR: MERGE command is not supported with combination of distributed/reference yet +HINT: If target is distributed, source must be distributed and co-located MERGE INTO target_set USING source_set AS foo ON target_set.t1 = foo.s1 WHEN MATCHED THEN @@ -2735,7 +2860,7 @@ MERGE INTO t1 UPDATE SET val = t1.val + 1 WHEN NOT MATCHED THEN INSERT (id, val) VALUES (s1.id, s1.val); -ERROR: MERGE command is not supported on reference tables yet +ERROR: Reference table as target is not allowed in MERGE command -- -- Postgres + Citus-Distributed table -- @@ -3113,9 +3238,8 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_ PL/pgSQL function citus_drop_trigger() line XX at PERFORM DROP FUNCTION merge_when_and_write(); DROP SCHEMA merge_schema CASCADE; -NOTICE: drop cascades to 84 other objects +NOTICE: drop cascades to 90 other objects DETAIL: drop cascades to function insert_data() -drop cascades to table pg_result drop cascades to table local_local drop cascades to table target drop cascades to table source @@ -3188,10 +3312,17 @@ drop cascades to table source_serial drop cascades to table target_serial drop cascades to table target_set drop cascades to table source_set +drop cascades to table reftarget_local_4000113 +drop cascades to table refsource_ref +drop cascades to table pg_result +drop cascades to table refsource_ref_4000112 +drop cascades to table pg_ref +drop cascades to table reftarget_local +drop cascades to table local_ref drop cascades to function add_s(integer,integer) drop cascades to table pg -drop cascades to table t1_4000131 -drop cascades to table s1_4000132 +drop cascades to table t1_4000133 +drop cascades to table s1_4000134 drop cascades to table t1 drop cascades to table s1 drop cascades to table dist_colocated diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index 7fc102dbb..4d1040a7e 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -306,7 +306,7 @@ SELECT citus_add_local_table_to_metadata('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; --- one table is reference, the other local, not supported +-- source table is reference, the target is local, supported SELECT create_reference_table('tbl2'); create_reference_table --------------------------------------------------------------------- @@ -315,8 +315,7 @@ SELECT create_reference_table('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on reference tables yet --- now, both are reference, still not supported +-- now, both are reference, not supported SELECT create_reference_table('tbl1'); create_reference_table --------------------------------------------------------------------- @@ -325,7 +324,7 @@ SELECT create_reference_table('tbl1'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; -ERROR: MERGE command is not supported on reference tables yet +ERROR: Reference table as target is not allowed in MERGE command -- now, both distributed, not works SELECT undistribute_table('tbl1'); NOTICE: creating a new table for pg15.tbl1 diff --git a/src/test/regress/sql/merge.sql b/src/test/regress/sql/merge.sql index d663491ae..5b9190516 100644 --- a/src/test/regress/sql/merge.sql +++ b/src/test/regress/sql/merge.sql @@ -1536,10 +1536,111 @@ WHEN NOT MATCHED THEN INSERT VALUES(foo.s1); SELECT * FROM target_set ORDER BY 1, 2; +-- +-- Reference as a source +-- +CREATE TABLE reftarget_local(t1 int, t2 int); +CREATE TABLE refsource_ref(s1 int, s2 int); + +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); + +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); + +DROP TABLE IF EXISTS pg_result; +SELECT * INTO pg_result FROM reftarget_local ORDER BY 1, 2; + +-- Make source table as reference (target is Postgres) +TRUNCATE reftarget_local; +TRUNCATE refsource_ref; +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); +SELECT create_reference_table('refsource_ref'); + +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * INTO pg_ref FROM reftarget_local ORDER BY 1, 2; + +-- Should be equal +SELECT c.*, p.* +FROM pg_ref c, pg_result p +WHERE c.t1 = p.t1 +ORDER BY 1,2; + +-- Must return zero rows +SELECT count(*) +FROM pg_result FULL OUTER JOIN pg_ref ON pg_result.t1 = pg_ref.t1 +WHERE pg_result.t1 IS NULL OR pg_ref.t1 IS NULL; + +-- Now make both Citus tables, reference as source, local as target +TRUNCATE reftarget_local; +TRUNCATE refsource_ref; +INSERT INTO reftarget_local VALUES(1, 0); +INSERT INTO reftarget_local VALUES(3, 100); +INSERT INTO refsource_ref VALUES(1, 1); +INSERT INTO refsource_ref VALUES(2, 2); +INSERT INTO refsource_ref VALUES(3, 3); + +SELECT citus_add_local_table_to_metadata('reftarget_local'); + +MERGE INTO reftarget_local +USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1 +WHEN MATCHED AND reftarget_local.t2 = 100 THEN + DELETE +WHEN MATCHED THEN + UPDATE SET t2 = t2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.s1); +SELECT * INTO local_ref FROM reftarget_local ORDER BY 1, 2; + +-- Should be equal +SELECT c.*, p.* +FROM local_ref c, pg_result p +WHERE c.t1 = p.t1 +ORDER BY 1,2; + +-- Must return zero rows +SELECT count(*) +FROM pg_result FULL OUTER JOIN local_ref ON pg_result.t1 = local_ref.t1 +WHERE pg_result.t1 IS NULL OR local_ref.t1 IS NULL; + -- -- Error and Unsupported scenarios -- +-- Reference as a target and local as source +MERGE INTO refsource_ref +USING (SELECT * FROM reftarget_local UNION SELECT * FROM reftarget_local) AS foo ON refsource_ref.s1 = foo.t1 +WHEN MATCHED THEN + UPDATE SET s2 = s2 + 100 +WHEN NOT MATCHED THEN + INSERT VALUES(foo.t1); + +-- Reference as a source and distributed as target +MERGE INTO target_set t +USING refsource_ref AS s ON t.t1 = s.s1 +WHEN MATCHED THEN + DO NOTHING; + MERGE INTO target_set USING source_set AS foo ON target_set.t1 = foo.s1 WHEN MATCHED THEN diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index ac8062c65..b82b0d745 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -198,13 +198,13 @@ SELECT citus_add_local_table_to_metadata('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; --- one table is reference, the other local, not supported +-- source table is reference, the target is local, supported SELECT create_reference_table('tbl2'); MERGE INTO tbl1 USING tbl2 ON (true) WHEN MATCHED THEN DELETE; --- now, both are reference, still not supported +-- now, both are reference, not supported SELECT create_reference_table('tbl1'); MERGE INTO tbl1 USING tbl2 ON (true)