mirror of https://github.com/citusdata/citus.git
Enhance MERGE .. WHEN NOT MATCHED BY SOURCE for repartitioned source (#7900)
DESCRIPTION: Ensure that a MERGE command on a distributed table with a `WHEN NOT MATCHED BY SOURCE` clause runs against all shards of the distributed table. The Postgres MERGE command updates a table using a table or a query as a data source. It provides three ways to match the target table with the source: `WHEN MATCHED` means that there is a row in both the target and source; `WHEN NOT MATCHED` means that there is a row in the source that has no match (is not present) in the target; and, as of PG17, `WHEN NOT MATCHED BY SOURCE` means that there is a row in the target that has no match in the source. In Citus, when a MERGE command updates a distributed table using a local/reference table or a distributed query as source, that source is repartitioned, and for each repartitioned shard that has data (i.e. 1 or more rows) the MERGE is run against the corresponding distributed table shard. Suppose the distributed table has 32 shards, and the source repartitions into 4 shards that have data, with the remaining 28 shards being empty; then the MERGE command is performed on the 4 corresponding shards of the distributed table. However, the semantics of `WHEN NOT MATCHED BY SOURCE` are that the specified action must be performed on the target for each row in the target that is not in the source; so if the source is empty, all target rows should be updated. To see this, consider the following MERGE command: ``` MERGE INTO target AS t USING source AS s ON t.id = s.id WHEN NOT MATCHED BY SOURCE THEN UPDATE t SET t.col1 = 100 ``` If the source has zero rows then every row in the target is updated s.t. its col1 value is 100. Currently in Citus a MERGE on a distributed table with a local/reference table or a distributed query as source ignores shards of the distributed table when the corresponding shard of the repartitioned source has zero rows. However, if the MERGE command specifies a `WHEN NOT MATCHED BY SOURCE` clause, then the MERGE should be performed on all shards of the distributed table, to ensure that the specified action is performed on the target for each row in the target that is not in the source. This PR enhances Citus MERGE execution so that when a repartitioned source shard has zero rows, and the MERGE command specifies a `WHEN NOT MATCHED BY SOURCE` clause, the MERGE is performed against the corresponding shard of the distributed table using an empty (zero row) relation as source, by generating a query of the form: ``` MERGE INTO target_shard_0002 AS t USING (SELECT id FROM (VALUES (NULL) ) source_0002(id) WHERE FALSE) AS s ON t.id = s.id WHEN NOT MATCHED BY SOURCE THEN UPDATE t set t.col1 = 100 ``` This works because each row in the target shard will be updated, and `WHEN MATCHED` and `WHEN NOT MATCHED`, if specified, will be no-ops because the source has zero rows. To implement this when the source is a local or reference table involves teaching function `ExcuteSourceAtCoordAndRedistribution()` in `merge_executor.c` to not prune tasks when the query has `WHEN NOT MATCHED BY SOURCE` but to instead replace the task's query to one that uses an empty relation as source. And when the source is a distributed query, function `ExecuteMergeSourcePlanIntoColocatedIntermediateResults()` (also in `merge_executor.c`) instead of skipping empty tasks now generates a query that uses an empty relation as source for the corresponding target shard of the distributed table, but again only when the query has `WHEN NOT MATCHED BY SOURCE`. A new function `BuildEmptyResultQuery()` is added to `recursive_planning.c` and it is used by both the aforementioned functions in `merge_executor.c` to build an empty relation to use as the source. It applies the appropriate type to each column of the empty relation so the join with the target makes sense to the query compiler.pull/7907/head
parent
459c283e7d
commit
c1f5762645
|
@ -219,6 +219,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
|||
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
|
||||
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
|
||||
bool hasReturning = distributedPlan->expectResults;
|
||||
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(mergeQuery);
|
||||
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;
|
||||
|
||||
/*
|
||||
|
@ -233,7 +234,7 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
|||
|
||||
ereport(DEBUG1, (errmsg("Collect source query results on coordinator")));
|
||||
|
||||
List *prunedTaskList = NIL;
|
||||
List *prunedTaskList = NIL, *emptySourceTaskList = NIL;
|
||||
HTAB *shardStateHash =
|
||||
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(
|
||||
targetRelationId,
|
||||
|
@ -255,7 +256,8 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
|||
* We cannot actually execute MERGE INTO ... tasks that read from
|
||||
* intermediate results that weren't created because no rows were
|
||||
* written to them. Prune those tasks out by only including tasks
|
||||
* on shards with connections.
|
||||
* on shards with connections; however, if the MERGE INTO includes
|
||||
* a NOT MATCHED BY SOURCE clause we need to include the task.
|
||||
*/
|
||||
Task *task = NULL;
|
||||
foreach_declared_ptr(task, taskList)
|
||||
|
@ -268,6 +270,19 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
|
|||
{
|
||||
prunedTaskList = lappend(prunedTaskList, task);
|
||||
}
|
||||
else if (hasNotMatchedBySource)
|
||||
{
|
||||
emptySourceTaskList = lappend(emptySourceTaskList, task);
|
||||
}
|
||||
}
|
||||
|
||||
if (emptySourceTaskList != NIL)
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("MERGE has NOT MATCHED BY SOURCE clause, "
|
||||
"execute MERGE on all shards")));
|
||||
AdjustTaskQueryForEmptySource(targetRelationId, mergeQuery, emptySourceTaskList,
|
||||
intermediateResultIdPrefix);
|
||||
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
|
||||
}
|
||||
|
||||
if (prunedTaskList == NIL)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "nodes/parsenodes.h"
|
||||
|
||||
#include "distributed/citus_custom_scan.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
@ -101,6 +102,40 @@ IsRedistributablePlan(Plan *selectPlan)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasMergeNotMatchedBySource returns true if the MERGE query has a
|
||||
* WHEN NOT MATCHED BY SOURCE clause. If it does, we need to execute
|
||||
* the MERGE query on all shards of the target table, regardless of
|
||||
* whether or not the source shard has any rows.
|
||||
*/
|
||||
bool
|
||||
HasMergeNotMatchedBySource(Query *query)
|
||||
{
|
||||
if (!IsMergeQuery(query))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
bool haveNotMatchedBySource = false;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
ListCell *lc;
|
||||
foreach(lc, query->mergeActionList)
|
||||
{
|
||||
MergeAction *action = lfirst_node(MergeAction, lc);
|
||||
|
||||
if (action->matchKind == MERGE_WHEN_NOT_MATCHED_BY_SOURCE)
|
||||
{
|
||||
haveNotMatchedBySource = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
return haveNotMatchedBySource;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateTaskListWithColocatedIntermediateResults generates a list of tasks
|
||||
* for a query that inserts into a target relation and selects from a set of
|
||||
|
@ -200,6 +235,61 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AdjustTaskQueryForEmptySource adjusts the query for tasks that read from an
|
||||
* intermediate result to instead read from an empty relation. This ensures that
|
||||
* the MERGE query is executed on all shards of the target table, because it has
|
||||
* a NOT MATCHED BY SOURCE clause, which will be true for all target shards where
|
||||
* the source shard has no rows.
|
||||
*/
|
||||
void
|
||||
AdjustTaskQueryForEmptySource(Oid targetRelationId,
|
||||
Query *mergeQuery,
|
||||
List *tasks,
|
||||
char *resultIdPrefix)
|
||||
{
|
||||
Query *mergeQueryCopy = copyObject(mergeQuery);
|
||||
RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(mergeQueryCopy);
|
||||
RangeTblEntry *mergeRte = ExtractResultRelationRTE(mergeQueryCopy);
|
||||
List *targetList = selectRte->subquery->targetList;
|
||||
ListCell *taskCell = NULL;
|
||||
|
||||
foreach(taskCell, tasks)
|
||||
{
|
||||
Task *task = lfirst(taskCell);
|
||||
uint64 shardId = task->anchorShardId;
|
||||
StringInfo queryString = makeStringInfo();
|
||||
StringInfo resultId = makeStringInfo();
|
||||
|
||||
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
|
||||
|
||||
/* Generate a query for an empty relation */
|
||||
selectRte->subquery = BuildEmptyResultQuery(targetList, resultId->data);
|
||||
|
||||
/* setting an alias simplifies deparsing of RETURNING */
|
||||
if (mergeRte->alias == NULL)
|
||||
{
|
||||
Alias *alias = makeAlias(CITUS_TABLE_ALIAS, NIL);
|
||||
mergeRte->alias = alias;
|
||||
}
|
||||
|
||||
/*
|
||||
* Generate a query string for the query that merges into a shard and reads
|
||||
* from an empty relation.
|
||||
*
|
||||
* Since CTEs have already been converted to intermediate results, they need
|
||||
* to removed from the query. Otherwise, worker queries include both
|
||||
* intermediate results and CTEs in the query.
|
||||
*/
|
||||
mergeQueryCopy->cteList = NIL;
|
||||
deparse_shard_query(mergeQueryCopy, targetRelationId, shardId, queryString);
|
||||
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
|
||||
|
||||
SetTaskQueryString(task, queryString->data);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateTaskListWithRedistributedResults returns a task list to insert given
|
||||
* redistributedResults into the given target relation.
|
||||
|
@ -223,6 +313,7 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
|
|||
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
|
||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
|
||||
Oid targetRelationId = targetRelation->relationId;
|
||||
bool hasNotMatchedBySource = HasMergeNotMatchedBySource(modifyResultQuery);
|
||||
|
||||
int shardCount = targetRelation->shardIntervalArrayLength;
|
||||
int shardOffset = 0;
|
||||
|
@ -242,19 +333,33 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
|
|||
StringInfo queryString = makeStringInfo();
|
||||
|
||||
/* skip empty tasks */
|
||||
if (resultIdList == NIL)
|
||||
if (resultIdList == NIL && !hasNotMatchedBySource)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* sort result ids for consistent test output */
|
||||
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
|
||||
Query *fragmentSetQuery = NULL;
|
||||
|
||||
/* generate the query on the intermediate result */
|
||||
Query *fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
|
||||
NIL,
|
||||
sortedResultIds,
|
||||
useBinaryFormat);
|
||||
if (resultIdList != NIL)
|
||||
{
|
||||
/* sort result ids for consistent test output */
|
||||
List *sortedResultIds = SortList(resultIdList, pg_qsort_strcmp);
|
||||
|
||||
/* generate the query on the intermediate result */
|
||||
fragmentSetQuery = BuildReadIntermediateResultsArrayQuery(selectTargetList,
|
||||
NIL,
|
||||
sortedResultIds,
|
||||
useBinaryFormat);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* No source data, but MERGE query has NOT MATCHED BY SOURCE */
|
||||
StringInfo emptyFragmentId = makeStringInfo();
|
||||
appendStringInfo(emptyFragmentId, "%s_" UINT64_FORMAT, "temp_empty_rel_",
|
||||
shardId);
|
||||
fragmentSetQuery = BuildEmptyResultQuery(selectTargetList,
|
||||
emptyFragmentId->data);
|
||||
}
|
||||
|
||||
/* put the intermediate result query in the INSERT..SELECT */
|
||||
selectRte->subquery = fragmentSetQuery;
|
||||
|
|
|
@ -2291,6 +2291,129 @@ BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* For the given target list, build an empty relation with the same target list.
|
||||
* For example, if the target list is (a, b, c), and resultId is "empty", then
|
||||
* it returns a Query object for this SQL:
|
||||
* SELECT a, b, c FROM (VALUES (NULL, NULL, NULL)) AS empty(a, b, c) WHERE false;
|
||||
*/
|
||||
Query *
|
||||
BuildEmptyResultQuery(List *targetEntryList, char *resultId)
|
||||
{
|
||||
List *targetList = NIL;
|
||||
ListCell *targetEntryCell = NULL;
|
||||
|
||||
List *colTypes = NIL;
|
||||
List *colTypMods = NIL;
|
||||
List *colCollations = NIL;
|
||||
List *colNames = NIL;
|
||||
|
||||
List *valueConsts = NIL;
|
||||
List *valueTargetList = NIL;
|
||||
List *valueColNames = NIL;
|
||||
|
||||
int targetIndex = 1;
|
||||
|
||||
/* build the target list and column lists needed */
|
||||
foreach(targetEntryCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell);
|
||||
Node *targetExpr = (Node *) targetEntry->expr;
|
||||
char *columnName = targetEntry->resname;
|
||||
Oid columnType = exprType(targetExpr);
|
||||
Oid columnTypMod = exprTypmod(targetExpr);
|
||||
Oid columnCollation = exprCollation(targetExpr);
|
||||
|
||||
if (targetEntry->resjunk)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
Var *tgtVar = makeVar(1, targetIndex, columnType, columnTypMod, columnCollation,
|
||||
0);
|
||||
TargetEntry *tgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex, columnName,
|
||||
false);
|
||||
Const *valueConst = makeConst(columnType, columnTypMod, columnCollation, 0,
|
||||
(Datum) 0, true, false);
|
||||
|
||||
StringInfoData *columnString = makeStringInfo();
|
||||
appendStringInfo(columnString, "column%d", targetIndex);
|
||||
|
||||
TargetEntry *valueTgtEntry = makeTargetEntry((Expr *) tgtVar, targetIndex,
|
||||
columnString->data, false);
|
||||
|
||||
valueConsts = lappend(valueConsts, valueConst);
|
||||
valueTargetList = lappend(valueTargetList, valueTgtEntry);
|
||||
valueColNames = lappend(valueColNames, makeString(columnString->data));
|
||||
|
||||
colNames = lappend(colNames, makeString(columnName));
|
||||
colTypes = lappend_oid(colTypes, columnType);
|
||||
colTypMods = lappend_oid(colTypMods, columnTypMod);
|
||||
colCollations = lappend_oid(colCollations, columnCollation);
|
||||
|
||||
targetList = lappend(targetList, tgtEntry);
|
||||
|
||||
targetIndex++;
|
||||
}
|
||||
|
||||
/* Build a RangeTable Entry for the VALUES relation */
|
||||
RangeTblEntry *valuesRangeTable = makeNode(RangeTblEntry);
|
||||
valuesRangeTable->rtekind = RTE_VALUES;
|
||||
valuesRangeTable->values_lists = list_make1(valueConsts);
|
||||
valuesRangeTable->colcollations = colCollations;
|
||||
valuesRangeTable->coltypes = colTypes;
|
||||
valuesRangeTable->coltypmods = colTypMods;
|
||||
valuesRangeTable->alias = NULL;
|
||||
valuesRangeTable->eref = makeAlias("*VALUES*", valueColNames);
|
||||
valuesRangeTable->inFromCl = true;
|
||||
|
||||
RangeTblRef *valuesRTRef = makeNode(RangeTblRef);
|
||||
valuesRTRef->rtindex = 1;
|
||||
|
||||
FromExpr *valuesJoinTree = makeNode(FromExpr);
|
||||
valuesJoinTree->fromlist = list_make1(valuesRTRef);
|
||||
|
||||
/* build the VALUES query */
|
||||
Query *valuesQuery = makeNode(Query);
|
||||
valuesQuery->canSetTag = true;
|
||||
valuesQuery->commandType = CMD_SELECT;
|
||||
valuesQuery->rtable = list_make1(valuesRangeTable);
|
||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||
valuesQuery->rteperminfos = NIL;
|
||||
#endif
|
||||
valuesQuery->jointree = valuesJoinTree;
|
||||
valuesQuery->targetList = valueTargetList;
|
||||
|
||||
/* build the relation selecting from the VALUES */
|
||||
RangeTblEntry *emptyRangeTable = makeNode(RangeTblEntry);
|
||||
emptyRangeTable->rtekind = RTE_SUBQUERY;
|
||||
emptyRangeTable->subquery = valuesQuery;
|
||||
emptyRangeTable->alias = makeAlias(resultId, colNames);
|
||||
emptyRangeTable->eref = emptyRangeTable->alias;
|
||||
emptyRangeTable->inFromCl = true;
|
||||
|
||||
/* build the SELECT query */
|
||||
Query *resultQuery = makeNode(Query);
|
||||
resultQuery->commandType = CMD_SELECT;
|
||||
resultQuery->canSetTag = true;
|
||||
resultQuery->rtable = list_make1(emptyRangeTable);
|
||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||
resultQuery->rteperminfos = NIL;
|
||||
#endif
|
||||
RangeTblRef *rangeTableRef = makeNode(RangeTblRef);
|
||||
rangeTableRef->rtindex = 1;
|
||||
|
||||
/* insert a FALSE qual to ensure 0 rows returned */
|
||||
FromExpr *joinTree = makeNode(FromExpr);
|
||||
joinTree->fromlist = list_make1(rangeTableRef);
|
||||
joinTree->quals = makeBoolConst(false, false);
|
||||
resultQuery->jointree = joinTree;
|
||||
resultQuery->targetList = targetList;
|
||||
|
||||
return resultQuery;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildReadIntermediateResultsQuery is the common code for generating
|
||||
* queries to read from result files. It is used by
|
||||
|
|
|
@ -40,6 +40,7 @@ extern Query * BuildReadIntermediateResultsArrayQuery(List *targetEntryList,
|
|||
List *columnAliasList,
|
||||
List *resultIdList,
|
||||
bool useBinaryCopyFormat);
|
||||
extern Query * BuildEmptyResultQuery(List *targetEntryList, char *resultId);
|
||||
extern bool GeneratingSubplans(void);
|
||||
extern bool ContainsLocalTableDistributedTableJoin(List *rangeTableList);
|
||||
extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
|
||||
|
|
|
@ -28,5 +28,10 @@ extern List * GenerateTaskListWithRedistributedResults(
|
|||
bool useBinaryFormat);
|
||||
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||
extern bool HasMergeNotMatchedBySource(Query *query);
|
||||
extern void AdjustTaskQueryForEmptySource(Oid targetRelationId,
|
||||
Query *mergeQuery,
|
||||
List *emptySourceTaskList,
|
||||
char *resultIdPrefix);
|
||||
|
||||
#endif /* REPARTITION_EXECUTOR_H */
|
||||
|
|
|
@ -2555,6 +2555,285 @@ MERGE INTO citus_reference_target t
|
|||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
ERROR: Reference table as target is not allowed in MERGE command
|
||||
-- Test Distributed-reference and distributed-local when the source table has fewer rows
|
||||
-- than distributed target; this tests that MERGE with NOT MATCHED BY SOURCE needs to run
|
||||
-- on all shards of the distributed target, regardless of whether or not the reshuffled
|
||||
-- source table has data in the corresponding shard.
|
||||
-- Re-populate the Postgres tables;
|
||||
DELETE FROM postgres_source;
|
||||
DELETE FROM postgres_target_1;
|
||||
DELETE FROM postgres_target_2;
|
||||
-- This time, the source table has fewer rows
|
||||
INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
-- try simple MERGE
|
||||
MERGE INTO postgres_target_1 t
|
||||
USING postgres_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT * FROM postgres_target_1 ORDER BY tid, val;
|
||||
tid | balance | val
|
||||
---------------------------------------------------------------------
|
||||
1 | 110 | initial updated by merge
|
||||
2 | 20 | inserted by merge
|
||||
3 | 330 | initial updated by merge
|
||||
4 | 40 | inserted by merge
|
||||
5 | 500 | initial not matched by source
|
||||
7 | 700 | initial not matched by source
|
||||
9 | 900 | initial not matched by source
|
||||
11 | 1100 | initial not matched by source
|
||||
13 | 1300 | initial not matched by source
|
||||
15 | 1500 | initial not matched by source
|
||||
(10 rows)
|
||||
|
||||
-- same with a constant qual
|
||||
MERGE INTO postgres_target_2 t
|
||||
USING postgres_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT * FROM postgres_target_2 ORDER BY tid, val;
|
||||
tid | balance | val
|
||||
---------------------------------------------------------------------
|
||||
1 | 110 | initial updated by merge
|
||||
2 | 20 | inserted by merge
|
||||
3 | 300 | initial not matched by source
|
||||
3 | 30 | inserted by merge
|
||||
4 | 40 | inserted by merge
|
||||
5 | 500 | initial not matched by source
|
||||
7 | 700 | initial not matched by source
|
||||
9 | 900 | initial not matched by source
|
||||
11 | 1100 | initial not matched by source
|
||||
13 | 1300 | initial not matched by source
|
||||
15 | 1500 | initial not matched by source
|
||||
(11 rows)
|
||||
|
||||
-- Re-populate the Citus tables; this time, the source table has fewer rows
|
||||
DELETE FROM citus_local_source;
|
||||
DELETE FROM citus_reference_source;
|
||||
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
SET citus.shard_count to 32;
|
||||
CREATE TABLE citus_distributed_target32 (tid integer, balance float, val text);
|
||||
SELECT create_distributed_table('citus_distributed_target32', 'tid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO citus_distributed_target32 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
-- Distributed-Local
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_local_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_1');
|
||||
compare_tables
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- same with a constant qual
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_local_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED BY TARGET THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_2');
|
||||
compare_tables
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- Distributed-Reference
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_reference_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_1');
|
||||
compare_tables
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- same with a constant qual
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_reference_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_2');
|
||||
compare_tables
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- Test that MERGE with NOT MATCHED BY SOURCE runs on all shards of
|
||||
-- a distributed table when the source is a repartition query with
|
||||
-- rows that do not match the distributed target
|
||||
set citus.shard_count = 32;
|
||||
CREATE TABLE dist_target (tid integer, balance float);
|
||||
CREATE TABLE dist_src1(sid integer, tid integer, val float);
|
||||
CREATE TABLE dist_src2(sid integer);
|
||||
CREATE TABLE dist_ref(sid integer);
|
||||
INSERT INTO dist_target SELECT id, 0 FROM generate_series(1,9,2) AS id;
|
||||
INSERT INTO dist_src1 SELECT id, id%3 + 1, id*10 FROM generate_series(1,15) AS id;
|
||||
INSERT INTO dist_src2 SELECT id FROM generate_series(1,100) AS id;
|
||||
INSERT INTO dist_ref SELECT id FROM generate_series(1,10) AS id;
|
||||
-- Run a MERGE command with dist_target as target and an aggregating query
|
||||
-- as source; note that at this point all tables are vanilla Postgres tables
|
||||
BEGIN;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
tid | balance
|
||||
---------------------------------------------------------------------
|
||||
1 | 0
|
||||
3 | 0
|
||||
5 | 0
|
||||
7 | 0
|
||||
9 | 0
|
||||
(5 rows)
|
||||
|
||||
MERGE INTO dist_target t
|
||||
USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x
|
||||
FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid
|
||||
INNER JOIN dist_ref dr ON dt.sid=dr.sid
|
||||
GROUP BY dt.tid) dv ON (t.tid=dv.tid)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = dv.av
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (tid, balance) VALUES (dv.tid, dv.m)
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET balance = 99.95;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
tid | balance
|
||||
---------------------------------------------------------------------
|
||||
1 | 60
|
||||
2 | 10
|
||||
3 | 50
|
||||
5 | 99.95
|
||||
7 | 99.95
|
||||
9 | 99.95
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- Distribute the tables
|
||||
SELECT create_distributed_table('dist_target', 'tid');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_target$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_src1', 'sid');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_src1$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_src2', 'sid');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_src2$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('dist_ref');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$pg17.dist_ref$$)
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Re-run the merge; the target is now distributed and the source is a
|
||||
-- distributed query that is repartitioned.
|
||||
BEGIN;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
tid | balance
|
||||
---------------------------------------------------------------------
|
||||
1 | 0
|
||||
3 | 0
|
||||
5 | 0
|
||||
7 | 0
|
||||
9 | 0
|
||||
(5 rows)
|
||||
|
||||
MERGE INTO dist_target t
|
||||
USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x
|
||||
FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid
|
||||
INNER JOIN dist_ref dr ON dt.sid=dr.sid
|
||||
GROUP BY dt.tid) dv ON (t.tid=dv.tid)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = dv.av
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (tid, balance) VALUES (dv.tid, dv.m)
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET balance = 99.95;
|
||||
-- Data in dist_target is as it was with vanilla Postgres tables:
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
tid | balance
|
||||
---------------------------------------------------------------------
|
||||
1 | 60
|
||||
2 | 10
|
||||
3 | 50
|
||||
5 | 99.95
|
||||
7 | 99.95
|
||||
9 | 99.95
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- Reset shard_count for the DEBUG output in the following test
|
||||
SET citus.shard_count to 4;
|
||||
-- Complex repartition query example with a mix of tables
|
||||
-- Example from blog post
|
||||
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
|
||||
|
@ -2670,8 +2949,10 @@ DEBUG: Using column - index:0 from the source list to redistribute
|
|||
DEBUG: Executing subplans of the source query and storing the results at the respective node(s)
|
||||
DEBUG: Redistributing source result rows across nodes
|
||||
DEBUG: Executing final MERGE on workers using intermediate results
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072043 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072047_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072045 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072049_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072171 sr USING (SELECT temp_empty_rel__1072171.sensor_id, temp_empty_rel__1072171.average_reading, temp_empty_rel__1072171.last_reading_timestamp, temp_empty_rel__1072171.rid FROM (VALUES (NULL::numeric,NULL::numeric,NULL::timestamp without time zone,NULL::numeric)) temp_empty_rel__1072171(sensor_id, average_reading, last_reading_timestamp, rid) WHERE false) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072172 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072176_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072173 sr USING (SELECT temp_empty_rel__1072173.sensor_id, temp_empty_rel__1072173.average_reading, temp_empty_rel__1072173.last_reading_timestamp, temp_empty_rel__1072173.rid FROM (VALUES (NULL::numeric,NULL::numeric,NULL::timestamp without time zone,NULL::numeric)) temp_empty_rel__1072173(sensor_id, average_reading, last_reading_timestamp, rid) WHERE false) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
DEBUG: <Deparsed MERGE query: MERGE INTO pg17.sensor_readings_1072174 sr USING (SELECT intermediate_result.sensor_id, intermediate_result.average_reading, intermediate_result.last_reading_timestamp, intermediate_result.rid FROM read_intermediate_results('{repartitioned_results_xxxxx_from_1072178_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(sensor_id numeric, average_reading numeric, last_reading_timestamp timestamp without time zone, rid numeric)) new_readings ON (sr.sensor_id OPERATOR(pg_catalog.=) new_readings.sensor_id) WHEN MATCHED THEN UPDATE SET reading_value = new_readings.average_reading, reading_timestamp = new_readings.last_reading_timestamp WHEN NOT MATCHED BY TARGET THEN INSERT (reading_id, sensor_id, reading_value, reading_timestamp) VALUES (new_readings.rid, new_readings.sensor_id, new_readings.average_reading, new_readings.last_reading_timestamp) WHEN NOT MATCHED BY SOURCE THEN UPDATE SET reading_value = 100>
|
||||
RESET client_min_messages;
|
||||
-- Expected output is:
|
||||
-- reading_id | sensor_id | reading_value | reading_timestamp
|
||||
|
|
|
@ -1336,6 +1336,177 @@ MERGE INTO citus_reference_target t
|
|||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
|
||||
-- Test Distributed-reference and distributed-local when the source table has fewer rows
|
||||
-- than distributed target; this tests that MERGE with NOT MATCHED BY SOURCE needs to run
|
||||
-- on all shards of the distributed target, regardless of whether or not the reshuffled
|
||||
-- source table has data in the corresponding shard.
|
||||
|
||||
-- Re-populate the Postgres tables;
|
||||
DELETE FROM postgres_source;
|
||||
DELETE FROM postgres_target_1;
|
||||
DELETE FROM postgres_target_2;
|
||||
|
||||
-- This time, the source table has fewer rows
|
||||
INSERT INTO postgres_target_1 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
INSERT INTO postgres_target_2 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
INSERT INTO postgres_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
|
||||
-- try simple MERGE
|
||||
MERGE INTO postgres_target_1 t
|
||||
USING postgres_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT * FROM postgres_target_1 ORDER BY tid, val;
|
||||
|
||||
-- same with a constant qual
|
||||
MERGE INTO postgres_target_2 t
|
||||
USING postgres_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT * FROM postgres_target_2 ORDER BY tid, val;
|
||||
|
||||
-- Re-populate the Citus tables; this time, the source table has fewer rows
|
||||
DELETE FROM citus_local_source;
|
||||
DELETE FROM citus_reference_source;
|
||||
INSERT INTO citus_reference_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
INSERT INTO citus_local_source SELECT id, id * 10 FROM generate_series(1,4) AS id;
|
||||
|
||||
SET citus.shard_count to 32;
|
||||
CREATE TABLE citus_distributed_target32 (tid integer, balance float, val text);
|
||||
SELECT create_distributed_table('citus_distributed_target32', 'tid');
|
||||
INSERT INTO citus_distributed_target32 SELECT id, id * 100, 'initial' FROM generate_series(1,15,2) AS id;
|
||||
|
||||
-- Distributed-Local
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_local_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_1');
|
||||
ROLLBACK;
|
||||
|
||||
-- same with a constant qual
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_local_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED BY TARGET THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_2');
|
||||
ROLLBACK;
|
||||
|
||||
-- Distributed-Reference
|
||||
-- try simple MERGE
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_reference_source s
|
||||
ON t.tid = s.sid
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_1');
|
||||
ROLLBACK;
|
||||
|
||||
-- same with a constant qual
|
||||
BEGIN;
|
||||
MERGE INTO citus_distributed_target32 t
|
||||
USING citus_reference_source s
|
||||
ON t.tid = s.sid AND tid = 1
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = balance + delta, val = val || ' updated by merge'
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT VALUES (sid, delta, 'inserted by merge')
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET val = val || ' not matched by source';
|
||||
SELECT compare_tables('citus_distributed_target32', 'postgres_target_2');
|
||||
ROLLBACK;
|
||||
|
||||
-- Test that MERGE with NOT MATCHED BY SOURCE runs on all shards of
|
||||
-- a distributed table when the source is a repartition query with
|
||||
-- rows that do not match the distributed target
|
||||
|
||||
set citus.shard_count = 32;
|
||||
|
||||
CREATE TABLE dist_target (tid integer, balance float);
|
||||
CREATE TABLE dist_src1(sid integer, tid integer, val float);
|
||||
CREATE TABLE dist_src2(sid integer);
|
||||
CREATE TABLE dist_ref(sid integer);
|
||||
|
||||
INSERT INTO dist_target SELECT id, 0 FROM generate_series(1,9,2) AS id;
|
||||
INSERT INTO dist_src1 SELECT id, id%3 + 1, id*10 FROM generate_series(1,15) AS id;
|
||||
INSERT INTO dist_src2 SELECT id FROM generate_series(1,100) AS id;
|
||||
INSERT INTO dist_ref SELECT id FROM generate_series(1,10) AS id;
|
||||
|
||||
-- Run a MERGE command with dist_target as target and an aggregating query
|
||||
-- as source; note that at this point all tables are vanilla Postgres tables
|
||||
BEGIN;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
MERGE INTO dist_target t
|
||||
USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x
|
||||
FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid
|
||||
INNER JOIN dist_ref dr ON dt.sid=dr.sid
|
||||
GROUP BY dt.tid) dv ON (t.tid=dv.tid)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = dv.av
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (tid, balance) VALUES (dv.tid, dv.m)
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET balance = 99.95;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
-- Distribute the tables
|
||||
SELECT create_distributed_table('dist_target', 'tid');
|
||||
SELECT create_distributed_table('dist_src1', 'sid');
|
||||
SELECT create_distributed_table('dist_src2', 'sid');
|
||||
SELECT create_reference_table('dist_ref');
|
||||
|
||||
-- Re-run the merge; the target is now distributed and the source is a
|
||||
-- distributed query that is repartitioned.
|
||||
BEGIN;
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
MERGE INTO dist_target t
|
||||
USING (SELECT dt.tid, avg(dt.val) as av, min(dt.val) as m, max(dt.val) as x
|
||||
FROM dist_src1 dt INNER JOIN dist_src2 dt2 on dt.sid=dt2.sid
|
||||
INNER JOIN dist_ref dr ON dt.sid=dr.sid
|
||||
GROUP BY dt.tid) dv ON (t.tid=dv.tid)
|
||||
WHEN MATCHED THEN
|
||||
UPDATE SET balance = dv.av
|
||||
WHEN NOT MATCHED THEN
|
||||
INSERT (tid, balance) VALUES (dv.tid, dv.m)
|
||||
WHEN NOT MATCHED BY SOURCE THEN
|
||||
UPDATE SET balance = 99.95;
|
||||
|
||||
-- Data in dist_target is as it was with vanilla Postgres tables:
|
||||
SELECT * FROM dist_target ORDER BY tid;
|
||||
ROLLBACK;
|
||||
|
||||
-- Reset shard_count for the DEBUG output in the following test
|
||||
|
||||
SET citus.shard_count to 4;
|
||||
-- Complex repartition query example with a mix of tables
|
||||
-- Example from blog post
|
||||
-- https://www.citusdata.com/blog/2023/07/27/how-citus-12-supports-postgres-merge
|
||||
|
|
Loading…
Reference in New Issue