MERGE: Support reference table as source with local table as target

pull/6692/merge
Teja Mupparti 2023-03-25 15:54:02 -07:00 committed by Teja Mupparti
parent efd41e8ea5
commit e444dd4f3f
10 changed files with 410 additions and 156 deletions

View File

@ -36,6 +36,7 @@
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_node_metadata.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
@ -119,7 +120,6 @@ static char * NodeMetadataSyncedUpdateCommand(uint32 nodeId, bool metadataSynced
static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, static void ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value,
char *field); char *field);
static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards); static WorkerNode * SetShouldHaveShards(WorkerNode *workerNode, bool shouldHaveShards);
static int FindCoordinatorNodeId(void);
static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId); static WorkerNode * FindNodeAnyClusterByNodeId(uint32 nodeId);
static void ErrorIfAnyNodeNotExist(List *nodeList); static void ErrorIfAnyNodeNotExist(List *nodeList);
static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context); static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context);
@ -1800,7 +1800,7 @@ FindNodeWithNodeId(int nodeId, bool missingOk)
/* /*
* FindCoordinatorNodeId returns the node id of the coordinator node * FindCoordinatorNodeId returns the node id of the coordinator node
*/ */
static int int
FindCoordinatorNodeId() FindCoordinatorNodeId()
{ {
bool includeNodesFromOtherClusters = false; bool includeNodesFromOtherClusters = false;

View File

@ -22,29 +22,27 @@
#include "distributed/merge_planner.h" #include "distributed/merge_planner.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_node_metadata.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "distributed/query_pushdown_planning.h" #include "distributed/query_pushdown_planning.h"
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
static DeferredErrorMessage * CheckIfRTETypeIsUnsupported(Query *parse,
RangeTblEntry *rangeTableEntry);
static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse, static DeferredErrorMessage * ErrorIfDistTablesNotColocated(Query *parse,
List * List *
distTablesList, distTablesList,
PlannerRestrictionContext PlannerRestrictionContext
* *
plannerRestrictionContext); plannerRestrictionContext);
static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Query *parse, static DeferredErrorMessage * ErrorIfMergeHasUnsupportedTables(Oid targetRelationId,
Query *parse,
List *rangeTableList, List *rangeTableList,
PlannerRestrictionContext * PlannerRestrictionContext *
restrictionContext); restrictionContext);
static bool IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool static bool IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool
skipOuterVars); skipOuterVars);
static DeferredErrorMessage * InsertDistributionColumnMatchesSource(Query *query, static DeferredErrorMessage * InsertDistributionColumnMatchesSource(Oid targetRelationId,
RangeTblEntry * Query *query);
resultRte);
static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid static DeferredErrorMessage * MergeQualAndTargetListFunctionsSupported(Oid
resultRelationId, resultRelationId,
FromExpr *joinTree, FromExpr *joinTree,
@ -65,12 +63,15 @@ CreateMergePlan(Query *originalQuery, Query *query,
{ {
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
bool multiShardQuery = false; bool multiShardQuery = false;
Oid targetRelationId = ModifyQueryResultRelationId(originalQuery);
Assert(originalQuery->commandType == CMD_MERGE); Assert(originalQuery->commandType == CMD_MERGE);
Assert(OidIsValid(targetRelationId));
distributedPlan->targetRelationId = targetRelationId;
distributedPlan->modLevel = RowModifyLevelForQuery(query); distributedPlan->modLevel = RowModifyLevelForQuery(query);
distributedPlan->planningError = MergeQuerySupported(targetRelationId,
distributedPlan->planningError = MergeQuerySupported(originalQuery, originalQuery,
multiShardQuery, multiShardQuery,
plannerRestrictionContext); plannerRestrictionContext);
@ -94,8 +95,6 @@ CreateMergePlan(Query *originalQuery, Query *query,
/* MERGE doesn't support RETURNING clause */ /* MERGE doesn't support RETURNING clause */
distributedPlan->expectResults = false; distributedPlan->expectResults = false;
distributedPlan->targetRelationId = ResultRelationOidForQuery(query);
distributedPlan->fastPathRouterPlan = distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
@ -111,7 +110,7 @@ CreateMergePlan(Query *originalQuery, Query *query,
* - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported * - Checks target-lists and functions-in-quals in TargetlistAndFunctionsSupported
*/ */
DeferredErrorMessage * DeferredErrorMessage *
MergeQuerySupported(Query *originalQuery, bool multiShardQuery, MergeQuerySupported(Oid resultRelationId, Query *originalQuery, bool multiShardQuery,
PlannerRestrictionContext *plannerRestrictionContext) PlannerRestrictionContext *plannerRestrictionContext)
{ {
/* function is void for pre-15 versions of Postgres */ /* function is void for pre-15 versions of Postgres */
@ -138,7 +137,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
} }
List *rangeTableList = ExtractRangeTableEntryList(originalQuery); List *rangeTableList = ExtractRangeTableEntryList(originalQuery);
RangeTblEntry *resultRte = ExtractResultRelationRTE(originalQuery);
/* /*
* Fast path queries cannot have merge command, and we prevent the remaining here. * Fast path queries cannot have merge command, and we prevent the remaining here.
@ -149,7 +147,8 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
* ErrorIfDistTablesNotColocated for details. * ErrorIfDistTablesNotColocated for details.
*/ */
DeferredErrorMessage *deferredError = DeferredErrorMessage *deferredError =
ErrorIfMergeHasUnsupportedTables(originalQuery, ErrorIfMergeHasUnsupportedTables(resultRelationId,
originalQuery,
rangeTableList, rangeTableList,
plannerRestrictionContext); plannerRestrictionContext);
if (deferredError) if (deferredError)
@ -158,7 +157,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
RaiseDeferredError(deferredError, ERROR); RaiseDeferredError(deferredError, ERROR);
} }
Oid resultRelationId = resultRte->relid;
deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId, deferredError = MergeQualAndTargetListFunctionsSupported(resultRelationId,
originalQuery->jointree, originalQuery->jointree,
originalQuery->jointree-> originalQuery->jointree->
@ -191,7 +189,7 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
} }
deferredError = deferredError =
InsertDistributionColumnMatchesSource(originalQuery, resultRte); InsertDistributionColumnMatchesSource(resultRelationId, originalQuery);
if (deferredError) if (deferredError)
{ {
/* MERGE's unsupported scenario, raise the exception */ /* MERGE's unsupported scenario, raise the exception */
@ -222,32 +220,6 @@ MergeQuerySupported(Query *originalQuery, bool multiShardQuery,
} }
/*
* IsMergeAllowedOnRelation takes a relation entry and checks if MERGE command is
* permitted on special relations, such as materialized view, returns true only if
* it's a "source" relation.
*/
bool
IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte)
{
if (!IsMergeQuery(parse))
{
return false;
}
/* Fetch the MERGE target relation */
RangeTblEntry *targetRte = rt_fetch(parse->resultRelation, parse->rtable);
/* Is it a target relation? */
if (targetRte->relid == rte->relid)
{
return false;
}
return true;
}
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
/* /*
@ -283,70 +255,6 @@ ErrorIfDistTablesNotColocated(Query *parse, List *distTablesList,
} }
/*
* ErrorIfRTETypeIsUnsupported Checks for types of tables that are not supported, such
* as, reference tables, append-distributed tables and materialized view as target relation.
* Routine returns NULL for the supported types, error message for everything else.
*/
static DeferredErrorMessage *
CheckIfRTETypeIsUnsupported(Query *parse, RangeTblEntry *rangeTableEntry)
{
if (rangeTableEntry->relkind == RELKIND_MATVIEW ||
rangeTableEntry->relkind == RELKIND_FOREIGN_TABLE)
{
/* Materialized view or Foreign table as target is not allowed */
if (IsMergeAllowedOnRelation(parse, rangeTableEntry))
{
/* Non target relation is ok */
return NULL;
}
else
{
/* Usually we don't reach this exception as the Postgres parser catches it */
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "MERGE command is not allowed on "
"relation type(relkind:%c)",
rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
}
if (rangeTableEntry->relkind != RELKIND_RELATION &&
rangeTableEntry->relkind != RELKIND_PARTITIONED_TABLE)
{
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
"in MERGE command", rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
Assert(rangeTableEntry->relid != 0);
/* Reference tables are not supported yet */
if (IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is not supported on reference "
"tables yet", NULL, NULL);
}
/* Append/Range tables are not supported */
if (IsCitusTableType(rangeTableEntry->relid, APPEND_DISTRIBUTED) ||
IsCitusTableType(rangeTableEntry->relid, RANGE_DISTRIBUTED))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
"must be colocated, for append/range distribution, "
"colocation is not supported", NULL,
"Consider using hash distribution instead");
}
return NULL;
}
/* /*
* ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE * ErrorIfMergeHasUnsupportedTables checks if all the tables(target, source or any CTE
* present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus * present) in the MERGE command are local i.e. a combination of Citus local and Non-Citus
@ -355,11 +263,12 @@ CheckIfRTETypeIsUnsupported(Query *parse, RangeTblEntry *rangeTableEntry)
* for all other combinations. * for all other combinations.
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList, ErrorIfMergeHasUnsupportedTables(Oid targetRelationId, Query *parse, List *rangeTableList,
PlannerRestrictionContext *restrictionContext) PlannerRestrictionContext *restrictionContext)
{ {
List *distTablesList = NIL; List *distTablesList = NIL;
bool foundLocalTables = false; bool foundLocalTables = false;
bool foundReferenceTables = false;
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;
foreach_ptr(rangeTableEntry, rangeTableList) foreach_ptr(rangeTableEntry, rangeTableList)
@ -410,18 +319,48 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
} }
/* RTE Relation can be of various types, check them now */ /* RTE Relation can be of various types, check them now */
switch (rangeTableEntry->relkind)
{
/* skip the regular views as they are replaced with subqueries */ /* skip the regular views as they are replaced with subqueries */
if (rangeTableEntry->relkind == RELKIND_VIEW) case RELKIND_VIEW:
{ {
continue; continue;
} }
DeferredErrorMessage *errorMessage = case RELKIND_MATVIEW:
CheckIfRTETypeIsUnsupported(parse, rangeTableEntry); case RELKIND_FOREIGN_TABLE:
if (errorMessage)
{ {
return errorMessage; /* These two cases as a target is not allowed */
if (relationId == targetRelationId)
{
/* Usually we don't reach this exception as the Postgres parser catches it */
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "MERGE command is not allowed on "
"relation type(relkind:%c)",
rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
break;
}
case RELKIND_RELATION:
case RELKIND_PARTITIONED_TABLE:
{
/* Check for citus/postgres table types */
Assert(OidIsValid(relationId));
break;
}
default:
{
StringInfo errorMessage = makeStringInfo();
appendStringInfo(errorMessage, "Unexpected table type(relkind:%c) "
"in MERGE command",
rangeTableEntry->relkind);
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
errorMessage->data, NULL, NULL);
}
} }
/* /*
@ -430,28 +369,63 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
*/ */
if (IsCitusTableType(relationId, DISTRIBUTED_TABLE)) if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
{ {
distTablesList = lappend(distTablesList, rangeTableEntry); /* Append/Range distributed tables are not supported */
continue; if (IsCitusTableType(relationId, APPEND_DISTRIBUTED) ||
IsCitusTableType(relationId, RANGE_DISTRIBUTED))
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"For MERGE command, all the distributed tables "
"must be colocated, for append/range distribution, "
"colocation is not supported", NULL,
"Consider using hash distribution instead");
} }
/* Regular Postgres tables and Citus local tables are allowed */ distTablesList = lappend(distTablesList, rangeTableEntry);
if (!IsCitusTable(relationId) || }
IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) else if (IsCitusTableType(relationId, REFERENCE_TABLE))
{ {
/* Reference table as a target is not allowed */
if (relationId == targetRelationId)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"Reference table as target "
"is not allowed in "
"MERGE command", NULL, NULL);
}
foundReferenceTables = true;
}
else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/* Citus local tables */
foundLocalTables = true;
}
else if (!IsCitusTable(relationId))
{
/* Regular Postgres table */
foundLocalTables = true; foundLocalTables = true;
continue;
} }
/* Any other Citus table type missing ? */ /* Any other Citus table type missing ? */
} }
/* Ensure all tables are indeed local */ /* Ensure all tables are indeed local (or a combination of reference and local) */
if (foundLocalTables && list_length(distTablesList) == 0) if (list_length(distTablesList) == 0)
{ {
/* All the tables are local, supported */ /*
* All the tables are local/reference, supported as long as
* coordinator is in the metadata.
*/
if (FindCoordinatorNodeId() == -1)
{
elog(ERROR, "Coordinator node is not in the metadata. TODO better meesage");
}
/* All the tables are local/reference, supported */
return NULL; return NULL;
} }
else if (foundLocalTables && list_length(distTablesList) > 0)
if (foundLocalTables)
{ {
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is not supported with " "MERGE command is not supported with "
@ -459,6 +433,17 @@ ErrorIfMergeHasUnsupportedTables(Query *parse, List *rangeTableList,
NULL, NULL); NULL, NULL);
} }
if (foundReferenceTables)
{
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE command is not supported with "
"combination of distributed/reference yet",
NULL,
"If target is distributed, source "
"must be distributed and co-located");
}
/* Ensure all distributed tables are indeed co-located */ /* Ensure all distributed tables are indeed co-located */
return ErrorIfDistTablesNotColocated(parse, return ErrorIfDistTablesNotColocated(parse,
distTablesList, distTablesList,
@ -515,11 +500,11 @@ IsDistributionColumnInMergeSource(Expr *columnExpression, Query *query, bool
* prevent such mishaps, we disallow such inserts here. * prevent such mishaps, we disallow such inserts here.
*/ */
static DeferredErrorMessage * static DeferredErrorMessage *
InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte) InsertDistributionColumnMatchesSource(Oid targetRelationId, Query *query)
{ {
Assert(IsMergeQuery(query)); Assert(IsMergeQuery(query));
if (!IsCitusTableType(resultRte->relid, DISTRIBUTED_TABLE)) if (!IsCitusTableType(targetRelationId, DISTRIBUTED_TABLE))
{ {
return NULL; return NULL;
} }
@ -549,7 +534,7 @@ InsertDistributionColumnMatchesSource(Query *query, RangeTblEntry *resultRte)
} }
Assert(action->commandType == CMD_INSERT); Assert(action->commandType == CMD_INSERT);
Var *targetKey = PartitionColumn(resultRte->relid, 1); Var *targetKey = PartitionColumn(targetRelationId, 1);
TargetEntry *targetEntry = NULL; TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, action->targetList) foreach_ptr(targetEntry, action->targetList)
@ -736,3 +721,34 @@ MergeQualAndTargetListFunctionsSupported(Oid resultRelationId, FromExpr *joinTre
#endif #endif
/*
* IsLocalTableModification returns true if the table modified is a Postgres table.
* We do not support recursive planning for MERGE yet, so we could have a join
* between local and Citus tables. Only allow local tables when it is the target table.
*/
bool
IsLocalTableModification(Oid targetRelationId, Query *query, uint64 shardId,
RTEListProperties *rteProperties)
{
/* No-op for SELECT command */
if (!IsModifyCommand(query))
{
return false;
}
/* For MERGE, we have to check only the target relation */
if (IsMergeQuery(query) && !IsCitusTable(targetRelationId))
{
/* Postgres table */
return true;
}
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
{
return true;
}
return false;
}

View File

@ -2246,10 +2246,8 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
} }
static bool ContainsOnlyLocalTables(RTEListProperties *rteProperties);
/* /*
* RouterQuery runs router pruning logic for SELECT, UPDATE and DELETE queries. * RouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and MERGE queries.
* If there are shards present and query is routable, all RTEs have been updated * If there are shards present and query is routable, all RTEs have been updated
* to point to the relevant shards in the originalQuery. Also, placementList is * to point to the relevant shards in the originalQuery. Also, placementList is
* filled with the list of worker nodes that has all the required shard placements * filled with the list of worker nodes that has all the required shard placements
@ -2282,6 +2280,7 @@ PlanRouterQuery(Query *originalQuery,
DeferredErrorMessage *planningError = NULL; DeferredErrorMessage *planningError = NULL;
bool shardsPresent = false; bool shardsPresent = false;
CmdType commandType = originalQuery->commandType; CmdType commandType = originalQuery->commandType;
Oid targetRelationId = InvalidOid;
bool fastPathRouterQuery = bool fastPathRouterQuery =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
@ -2350,7 +2349,8 @@ PlanRouterQuery(Query *originalQuery,
if (IsMergeQuery(originalQuery)) if (IsMergeQuery(originalQuery))
{ {
planningError = MergeQuerySupported(originalQuery, targetRelationId = ModifyQueryResultRelationId(originalQuery);
planningError = MergeQuerySupported(targetRelationId, originalQuery,
isMultiShardQuery, isMultiShardQuery,
plannerRestrictionContext); plannerRestrictionContext);
} }
@ -2403,13 +2403,14 @@ PlanRouterQuery(Query *originalQuery,
/* both Postgres tables and materialized tables are locally avaliable */ /* both Postgres tables and materialized tables are locally avaliable */
RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery); RTEListProperties *rteProperties = GetRTEListPropertiesForQuery(originalQuery);
if (shardId == INVALID_SHARD_ID && ContainsOnlyLocalTables(rteProperties))
if (isLocalTableModification)
{ {
if (commandType != CMD_SELECT) *isLocalTableModification =
{ IsLocalTableModification(targetRelationId, originalQuery, shardId,
*isLocalTableModification = true; rteProperties);
}
} }
bool hasPostgresLocalRelation = bool hasPostgresLocalRelation =
rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView; rteProperties->hasPostgresLocalTable || rteProperties->hasMaterializedView;
List *taskPlacementList = List *taskPlacementList =
@ -2447,7 +2448,7 @@ PlanRouterQuery(Query *originalQuery,
* ContainsOnlyLocalTables returns true if there is only * ContainsOnlyLocalTables returns true if there is only
* local tables and not any distributed or reference table. * local tables and not any distributed or reference table.
*/ */
static bool bool
ContainsOnlyLocalTables(RTEListProperties *rteProperties) ContainsOnlyLocalTables(RTEListProperties *rteProperties)
{ {
return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable; return !rteProperties->hasDistributedTable && !rteProperties->hasReferenceTable;

View File

@ -19,13 +19,16 @@
#include "distributed/errormessage.h" #include "distributed/errormessage.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
extern bool IsMergeAllowedOnRelation(Query *parse, RangeTblEntry *rte); extern DeferredErrorMessage * MergeQuerySupported(Oid resultRelationId,
extern DeferredErrorMessage * MergeQuerySupported(Query *originalQuery, Query *originalQuery,
bool multiShardQuery, bool multiShardQuery,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query, extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query,
PlannerRestrictionContext * PlannerRestrictionContext *
plannerRestrictionContext); plannerRestrictionContext);
extern bool IsLocalTableModification(Oid targetRelationId, Query *query,
uint64 shardId,
RTEListProperties *rteProperties);
#endif /* MERGE_PLANNER_H */ #endif /* MERGE_PLANNER_H */

View File

@ -117,5 +117,6 @@ extern bool HasDangerousJoinUsing(List *rtableList, Node *jtnode);
extern Job * RouterJob(Query *originalQuery, extern Job * RouterJob(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext, PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError); DeferredErrorMessage **planningError);
extern bool ContainsOnlyLocalTables(RTEListProperties *rteProperties);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -19,4 +19,6 @@
#define Natts_pg_dist_node_metadata 1 #define Natts_pg_dist_node_metadata 1
#define Anum_pg_dist_node_metadata_metadata 1 #define Anum_pg_dist_node_metadata_metadata 1
extern int FindCoordinatorNodeId(void);
#endif /* PG_DIST_NODE_METADATA_H */ #endif /* PG_DIST_NODE_METADATA_H */

View File

@ -2416,9 +2416,134 @@ SELECT * FROM target_set ORDER BY 1, 2;
2 | 2 |
(2 rows) (2 rows)
--
-- Reference as a source
--
CREATE TABLE reftarget_local(t1 int, t2 int);
CREATE TABLE refsource_ref(s1 int, s2 int);
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
DROP TABLE IF EXISTS pg_result;
SELECT * INTO pg_result FROM reftarget_local ORDER BY 1, 2;
-- Make source table as reference (target is Postgres)
TRUNCATE reftarget_local;
TRUNCATE refsource_ref;
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
SELECT create_reference_table('refsource_ref');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_schema.refsource_ref$$)
create_reference_table
---------------------------------------------------------------------
(1 row)
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
SELECT * INTO pg_ref FROM reftarget_local ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM pg_ref c, pg_result p
WHERE c.t1 = p.t1
ORDER BY 1,2;
t1 | t2 | t1 | t2
---------------------------------------------------------------------
1 | 100 | 1 | 100
2 | | 2 |
(2 rows)
-- Must return zero rows
SELECT count(*)
FROM pg_result FULL OUTER JOIN pg_ref ON pg_result.t1 = pg_ref.t1
WHERE pg_result.t1 IS NULL OR pg_ref.t1 IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
-- Now make both Citus tables, reference as source, local as target
TRUNCATE reftarget_local;
TRUNCATE refsource_ref;
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
SELECT citus_add_local_table_to_metadata('reftarget_local');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
SELECT * INTO local_ref FROM reftarget_local ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM local_ref c, pg_result p
WHERE c.t1 = p.t1
ORDER BY 1,2;
t1 | t2 | t1 | t2
---------------------------------------------------------------------
1 | 100 | 1 | 100
2 | | 2 |
(2 rows)
-- Must return zero rows
SELECT count(*)
FROM pg_result FULL OUTER JOIN local_ref ON pg_result.t1 = local_ref.t1
WHERE pg_result.t1 IS NULL OR local_ref.t1 IS NULL;
count
---------------------------------------------------------------------
0
(1 row)
-- --
-- Error and Unsupported scenarios -- Error and Unsupported scenarios
-- --
-- Reference as a target and local as source
MERGE INTO refsource_ref
USING (SELECT * FROM reftarget_local UNION SELECT * FROM reftarget_local) AS foo ON refsource_ref.s1 = foo.t1
WHEN MATCHED THEN
UPDATE SET s2 = s2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.t1);
ERROR: Reference table as target is not allowed in MERGE command
-- Reference as a source and distributed as target
MERGE INTO target_set t
USING refsource_ref AS s ON t.t1 = s.s1
WHEN MATCHED THEN
DO NOTHING;
ERROR: MERGE command is not supported with combination of distributed/reference yet
HINT: If target is distributed, source must be distributed and co-located
MERGE INTO target_set MERGE INTO target_set
USING source_set AS foo ON target_set.t1 = foo.s1 USING source_set AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN WHEN MATCHED THEN
@ -2735,7 +2860,7 @@ MERGE INTO t1
UPDATE SET val = t1.val + 1 UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1.id, s1.val); INSERT (id, val) VALUES (s1.id, s1.val);
ERROR: MERGE command is not supported on reference tables yet ERROR: Reference table as target is not allowed in MERGE command
-- --
-- Postgres + Citus-Distributed table -- Postgres + Citus-Distributed table
-- --
@ -3113,9 +3238,8 @@ CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_
PL/pgSQL function citus_drop_trigger() line XX at PERFORM PL/pgSQL function citus_drop_trigger() line XX at PERFORM
DROP FUNCTION merge_when_and_write(); DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE; DROP SCHEMA merge_schema CASCADE;
NOTICE: drop cascades to 84 other objects NOTICE: drop cascades to 90 other objects
DETAIL: drop cascades to function insert_data() DETAIL: drop cascades to function insert_data()
drop cascades to table pg_result
drop cascades to table local_local drop cascades to table local_local
drop cascades to table target drop cascades to table target
drop cascades to table source drop cascades to table source
@ -3188,10 +3312,17 @@ drop cascades to table source_serial
drop cascades to table target_serial drop cascades to table target_serial
drop cascades to table target_set drop cascades to table target_set
drop cascades to table source_set drop cascades to table source_set
drop cascades to table reftarget_local_4000113
drop cascades to table refsource_ref
drop cascades to table pg_result
drop cascades to table refsource_ref_4000112
drop cascades to table pg_ref
drop cascades to table reftarget_local
drop cascades to table local_ref
drop cascades to function add_s(integer,integer) drop cascades to function add_s(integer,integer)
drop cascades to table pg drop cascades to table pg
drop cascades to table t1_4000131 drop cascades to table t1_4000133
drop cascades to table s1_4000132 drop cascades to table s1_4000134
drop cascades to table t1 drop cascades to table t1
drop cascades to table s1 drop cascades to table s1
drop cascades to table dist_colocated drop cascades to table dist_colocated

View File

@ -306,7 +306,7 @@ SELECT citus_add_local_table_to_metadata('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
-- one table is reference, the other local, not supported -- source table is reference, the target is local, supported
SELECT create_reference_table('tbl2'); SELECT create_reference_table('tbl2');
create_reference_table create_reference_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -315,8 +315,7 @@ SELECT create_reference_table('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on reference tables yet -- now, both are reference, not supported
-- now, both are reference, still not supported
SELECT create_reference_table('tbl1'); SELECT create_reference_table('tbl1');
create_reference_table create_reference_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -325,7 +324,7 @@ SELECT create_reference_table('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
ERROR: MERGE command is not supported on reference tables yet ERROR: Reference table as target is not allowed in MERGE command
-- now, both distributed, not works -- now, both distributed, not works
SELECT undistribute_table('tbl1'); SELECT undistribute_table('tbl1');
NOTICE: creating a new table for pg15.tbl1 NOTICE: creating a new table for pg15.tbl1

View File

@ -1536,10 +1536,111 @@ WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1); INSERT VALUES(foo.s1);
SELECT * FROM target_set ORDER BY 1, 2; SELECT * FROM target_set ORDER BY 1, 2;
--
-- Reference as a source
--
CREATE TABLE reftarget_local(t1 int, t2 int);
CREATE TABLE refsource_ref(s1 int, s2 int);
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
DROP TABLE IF EXISTS pg_result;
SELECT * INTO pg_result FROM reftarget_local ORDER BY 1, 2;
-- Make source table as reference (target is Postgres)
TRUNCATE reftarget_local;
TRUNCATE refsource_ref;
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
SELECT create_reference_table('refsource_ref');
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
SELECT * INTO pg_ref FROM reftarget_local ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM pg_ref c, pg_result p
WHERE c.t1 = p.t1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_result FULL OUTER JOIN pg_ref ON pg_result.t1 = pg_ref.t1
WHERE pg_result.t1 IS NULL OR pg_ref.t1 IS NULL;
-- Now make both Citus tables, reference as source, local as target
TRUNCATE reftarget_local;
TRUNCATE refsource_ref;
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
SELECT citus_add_local_table_to_metadata('reftarget_local');
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
SELECT * INTO local_ref FROM reftarget_local ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM local_ref c, pg_result p
WHERE c.t1 = p.t1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_result FULL OUTER JOIN local_ref ON pg_result.t1 = local_ref.t1
WHERE pg_result.t1 IS NULL OR local_ref.t1 IS NULL;
-- --
-- Error and Unsupported scenarios -- Error and Unsupported scenarios
-- --
-- Reference as a target and local as source
MERGE INTO refsource_ref
USING (SELECT * FROM reftarget_local UNION SELECT * FROM reftarget_local) AS foo ON refsource_ref.s1 = foo.t1
WHEN MATCHED THEN
UPDATE SET s2 = s2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.t1);
-- Reference as a source and distributed as target
MERGE INTO target_set t
USING refsource_ref AS s ON t.t1 = s.s1
WHEN MATCHED THEN
DO NOTHING;
MERGE INTO target_set MERGE INTO target_set
USING source_set AS foo ON target_set.t1 = foo.s1 USING source_set AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN WHEN MATCHED THEN

View File

@ -198,13 +198,13 @@ SELECT citus_add_local_table_to_metadata('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
-- one table is reference, the other local, not supported -- source table is reference, the target is local, supported
SELECT create_reference_table('tbl2'); SELECT create_reference_table('tbl2');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE; WHEN MATCHED THEN DELETE;
-- now, both are reference, still not supported -- now, both are reference, not supported
SELECT create_reference_table('tbl1'); SELECT create_reference_table('tbl1');
MERGE INTO tbl1 USING tbl2 ON (true) MERGE INTO tbl1 USING tbl2 ON (true)