This pull request introduces support for nonroutable merge commands in the following scenarios:

1) For distributed tables that are not colocated.
2) When joining on a non-distribution column for colocated tables.
3) When merging into a distributed table using reference or citus-local tables as the data source.

This is accomplished primarily through the implementation of the following two strategies.

Repartition: Plan the source query independently,
execute the results into intermediate files, and repartition the files to
co-locate them with the merge-target table. Subsequently, compile a final
merge query on the target table using the intermediate results as the data
source.

Pull-to-coordinator: Execute the plan that requires evaluation at the coordinator,
run the query on the coordinator, and redistribute the resulting rows to ensure
colocation with the target shards. Direct the MERGE SQL operation to the worker
nodes' target shards, using the intermediate files colocated with the data as the
data source.
pull/7014/head
Teja Mupparti 2023-05-16 16:45:06 -07:00 committed by Teja Mupparti
parent c10cb50aa9
commit 58da8771aa
52 changed files with 5554 additions and 625 deletions

View File

@ -2128,12 +2128,36 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
int columnCount = inputTupleDescriptor->natts;
Oid *finalTypeArray = palloc0(columnCount * sizeof(Oid));
copyDest->columnCoercionPaths =
ColumnCoercionPaths(destTupleDescriptor, inputTupleDescriptor,
tableId, columnNameList, finalTypeArray);
copyDest->columnOutputFunctions =
TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary);
/*
* To ensure the proper co-location and distribution of the target table,
* the entire process of repartitioning intermediate files requires the
* destReceiver to be created on the target rather than the source.
*
* Within this specific code path, it is assumed that the employed model
* is for insert-select. Consequently, it validates the column types of
* destTupleDescriptor(target) during the intermediate result generation
* process. However, this approach varies significantly for MERGE operations,
* where the source tuple(s) can have arbitrary types and are not required to
* align with the target column names.
*
* Despite this minor setback, a significant portion of the code responsible
* for repartitioning intermediate files can be reused for the MERGE
* operation. By leveraging the ability to perform actual coercion during
* the writing process to the target table, we can bypass this specific route.
*/
if (copyDest->skipCoercions)
{
copyDest->columnOutputFunctions =
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
}
else
{
copyDest->columnCoercionPaths =
ColumnCoercionPaths(destTupleDescriptor, inputTupleDescriptor,
tableId, columnNameList, finalTypeArray);
copyDest->columnOutputFunctions =
TypeOutputFunctions(columnCount, finalTypeArray, copyOutState->binary);
}
}
/* wrap the column names as Values */
@ -2597,9 +2621,11 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
/* find the partition column value */
partitionColumnValue = columnValues[partitionColumnIndex];
/* annoyingly this is evaluated twice, but at least we don't crash! */
partitionColumnValue = CoerceColumnValue(partitionColumnValue, coercePath);
if (!copyDest->skipCoercions)
{
/* annoyingly this is evaluated twice, but at least we don't crash! */
partitionColumnValue = CoerceColumnValue(partitionColumnValue, coercePath);
}
}
/*

View File

@ -1015,6 +1015,32 @@ ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
}
/*
* ExecuteTaskListIntoTupleDestWithParam is a proxy to ExecuteTaskListExtended() which uses
* bind params from executor state, and with defaults for some of the arguments.
*/
uint64
ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo)
{
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
bool localExecutionSupported = true;
ExecutionParams *executionParams = CreateBasicExecutionParams(
modLevel, taskList, targetPoolSize, localExecutionSupported
);
executionParams->xactProperties = DecideTransactionPropertiesForTaskList(
modLevel, taskList, false);
executionParams->expectResults = expectResults;
executionParams->tupleDestination = tupleDest;
executionParams->paramListInfo = paramListInfo;
return ExecuteTaskListExtended(executionParams);
}
/*
* ExecuteTaskListIntoTupleDest is a proxy to ExecuteTaskListExtended() with defaults
* for some of the arguments.
@ -1052,7 +1078,6 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
return 0;
}
ParamListInfo paramListInfo = NULL;
uint64 locallyProcessedRows = 0;
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
@ -1065,7 +1090,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
DistributedExecution *execution =
CreateDistributedExecution(
executionParams->modLevel, executionParams->taskList,
paramListInfo, executionParams->targetPoolSize,
executionParams->paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
@ -1117,6 +1142,7 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
executionParams->expectResults = false;
executionParams->isUtilityCommand = false;
executionParams->jobIdList = NIL;
executionParams->paramListInfo = NULL;
return executionParams;
}

View File

@ -27,6 +27,8 @@
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/merge_executor.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h"
@ -53,6 +55,7 @@ extern AllowedDistributionColumn AllowedDistributionColumnValue;
static Node * AdaptiveExecutorCreateScan(CustomScan *scan);
static Node * NonPushableInsertSelectCreateScan(CustomScan *scan);
static Node * DelayedErrorCreateScan(CustomScan *scan);
static Node * NonPushableMergeCommandCreateScan(CustomScan *scan);
/* functions that are common to different scans */
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
@ -88,6 +91,11 @@ CustomScanMethods DelayedErrorCustomScanMethods = {
DelayedErrorCreateScan
};
CustomScanMethods NonPushableMergeCommandCustomScanMethods = {
"Citus MERGE INTO ...",
NonPushableMergeCommandCreateScan
};
/*
* Define executor methods for the different executor types.
@ -111,6 +119,16 @@ static CustomExecMethods NonPushableInsertSelectCustomExecMethods = {
};
static CustomExecMethods NonPushableMergeCommandCustomExecMethods = {
.CustomName = "NonPushableMergeCommandScan",
.BeginCustomScan = CitusBeginScan,
.ExecCustomScan = NonPushableMergeCommandExecScan,
.EndCustomScan = CitusEndScan,
.ReScanCustomScan = CitusReScan,
.ExplainCustomScan = NonPushableMergeCommandExplainScan
};
/*
* IsCitusCustomState returns if a given PlanState node is a CitusCustomState node.
*/
@ -124,7 +142,8 @@ IsCitusCustomState(PlanState *planState)
CustomScanState *css = castNode(CustomScanState, planState);
if (css->methods == &AdaptiveExecutorCustomExecMethods ||
css->methods == &NonPushableInsertSelectCustomExecMethods)
css->methods == &NonPushableInsertSelectCustomExecMethods ||
css->methods == &NonPushableMergeCommandCustomExecMethods)
{
return true;
}
@ -142,6 +161,7 @@ RegisterCitusCustomScanMethods(void)
RegisterCustomScanMethods(&AdaptiveExecutorCustomScanMethods);
RegisterCustomScanMethods(&NonPushableInsertSelectCustomScanMethods);
RegisterCustomScanMethods(&DelayedErrorCustomScanMethods);
RegisterCustomScanMethods(&NonPushableMergeCommandCustomScanMethods);
}
@ -723,6 +743,26 @@ DelayedErrorCreateScan(CustomScan *scan)
}
/*
* NonPushableMergeCommandCreateScan creates the scan state for executing
* MERGE INTO ... into a distributed table with repartition of source rows.
*/
static Node *
NonPushableMergeCommandCreateScan(CustomScan *scan)
{
CitusScanState *scanState = palloc0(sizeof(CitusScanState));
scanState->executorType = MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY;
scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &NonPushableMergeCommandCustomExecMethods;
scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
return (Node *) scanState;
}
/*
* CitusEndScan is used to clean up tuple store of the given custom scan state.
*/

View File

@ -610,6 +610,18 @@ QueryStringForFragmentsTransfer(NodeToNodeFragmentsTransfer *fragmentsTransfer)
StringInfo fragmentNamesArrayString = makeStringInfo();
int fragmentCount = 0;
NodePair *nodePair = &fragmentsTransfer->nodes;
uint32 sourceNodeId = nodePair->sourceNodeId;
/*
* If the placement is dummy, for example, queries that generate
* intermediate results at the coordinator that need to be redistributed
* to worker nodes, we need the local id.
*/
if (sourceNodeId == LOCAL_NODE_ID)
{
nodePair->sourceNodeId = GetLocalNodeId();
}
WorkerNode *sourceNode = LookupNodeByNodeIdOrError(nodePair->sourceNodeId);
appendStringInfoString(fragmentNamesArrayString, "ARRAY[");

View File

@ -20,6 +20,7 @@
#include "distributed/insert_select_planner.h"
#include "distributed/intermediate_results.h"
#include "distributed/local_executor.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
@ -63,8 +64,6 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
PlannedStmt *selectPlan,
EState *executorState,
char *intermediateResultIdPrefix);
static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
List *insertTargetList);
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
@ -374,7 +373,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
* BuildColumnNameListForCopyStatement build the column name list given the insert
* target list.
*/
static List *
List *
BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList)
{
List *columnNameList = NIL;
@ -424,13 +423,13 @@ PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList)
* given target list.
*/
int
DistributionColumnIndex(List *insertTargetList, Var *partitionColumn)
DistributionColumnIndex(List *insertTargetList, Var *distributionColumn)
{
TargetEntry *insertTargetEntry = NULL;
int targetEntryIndex = 0;
foreach_ptr(insertTargetEntry, insertTargetList)
{
if (insertTargetEntry->resno == partitionColumn->varattno)
if (insertTargetEntry->resno == distributionColumn->varattno)
{
return targetEntryIndex;
}

View File

@ -0,0 +1,337 @@
/*-------------------------------------------------------------------------
*
* merge_executor.c
*
* Executor logic for MERGE SQL statement.
*
* Copyright (c) Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
#include "distributed/intermediate_results.h"
#include "distributed/listutils.h"
#include "distributed/merge_executor.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_router_planner.h"
#include "distributed/repartition_executor.h"
#include "distributed/subplan_execution.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
static void ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState);
static void ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState);
static HTAB * ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
Query *mergeQuery,
List *
sourceTargetList,
PlannedStmt *
sourcePlan,
EState *executorState,
char *
intermediateResultIdPrefix,
int
partitionColumnIndex);
/*
* NonPushableMergeCommandExecScan performs an MERGE INTO distributed_table
* USING (source-query) ... command. This can be done either by aggregating
* task results at the coordinator and repartitioning the results, or by
* repartitioning task results and directly transferring data between nodes.
*/
TupleTableSlot *
NonPushableMergeCommandExecScan(CustomScanState *node)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
if (!scanState->finishedRemoteScan)
{
switch (distributedPlan->modifyWithSelectMethod)
{
case MODIFY_WITH_SELECT_REPARTITION:
{
ExecuteSourceAtWorkerAndRepartition(scanState);
break;
}
case MODIFY_WITH_SELECT_VIA_COORDINATOR:
{
ExecuteSourceAtCoordAndRedistribution(scanState);
break;
}
default:
{
ereport(ERROR, (errmsg("Unexpected MERGE execution method(%d)",
distributedPlan->modifyWithSelectMethod)));
}
}
scanState->finishedRemoteScan = true;
}
TupleTableSlot *resultSlot = ReturnTupleFromTuplestore(scanState);
return resultSlot;
}
/*
* ExecuteSourceAtWorkerAndRepartition Executes the Citus distributed plan, including any
* sub-plans, and captures the results in intermediate files. Subsequently, redistributes
* the result files to ensure colocation with the target, and directs the MERGE SQL
* operation to the target shards on the worker nodes, utilizing the colocated
* intermediate files as the data source.
*/
static void
ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
{
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
Oid targetRelationId = targetRte->relid;
bool hasReturning = distributedPlan->expectResults;
Query *sourceQuery = sourceRte->subquery;
PlannedStmt *sourcePlan =
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
EState *executorState = ScanStateGetExecutorState(scanState);
/*
* If we are dealing with partitioned table, we also need to lock its
* partitions. Here we only lock targetRelation, we acquire necessary
* locks on source tables during execution of those source queries.
*/
if (PartitionedTable(targetRelationId))
{
LockPartitionRelations(targetRelationId, RowExclusiveLock);
}
bool randomAccess = true;
bool interTransactions = false;
DistributedPlan *distSourcePlan =
GetDistributedPlan((CustomScan *) sourcePlan->planTree);
Job *distSourceJob = distSourcePlan->workerJob;
List *distSourceTaskList = distSourceJob->taskList;
bool binaryFormat =
CanUseBinaryCopyFormatForTargetList(sourceQuery->targetList);
ereport(DEBUG1, (errmsg("Executing subplans of the source query and "
"storing the results at the respective node(s)")));
ExecuteSubPlans(distSourcePlan);
/*
* We have a separate directory for each transaction, so choosing
* the same result prefix won't cause filename conflicts. Results
* directory name also includes node id and database id, so we don't
* need to include them in the filename. We include job id here for
* the case "MERGE USING <source query>" is executed recursively.
*/
StringInfo distResultPrefixString = makeStringInfo();
appendStringInfo(distResultPrefixString,
"repartitioned_results_" UINT64_FORMAT,
distSourceJob->jobId);
char *distResultPrefix = distResultPrefixString->data;
CitusTableCacheEntry *targetRelation = GetCitusTableCacheEntry(targetRelationId);
ereport(DEBUG1, (errmsg("Redistributing source result rows across nodes")));
/*
* partitionColumnIndex determines the column in the selectTaskList to
* use for (re)partitioning of the source result, which will colocate
* the result data with the target.
*/
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;
/*
* Below call partitions the results using shard ranges and partition method of
* targetRelation, and then colocates the result files with shards. These
* transfers are done by calls to fetch_intermediate_results() between nodes.
*/
List **redistributedResults =
RedistributeTaskListResults(distResultPrefix,
distSourceTaskList, partitionColumnIndex,
targetRelation, binaryFormat);
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
"intermediate results")));
/*
* At this point source query has been executed on workers and results
* have been fetched in such a way that they are colocated with corresponding
* target shard(s). Create and execute a list of tasks of form
* MERGE INTO ... USING SELECT * FROM read_intermediate_results(...);
*/
List *taskList =
GenerateTaskListWithRedistributedResults(mergeQuery,
targetRelation,
redistributedResults,
binaryFormat);
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ParamListInfo paramListInfo = executorState->es_param_list_info;
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
TupleDestination *tupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate,
tupleDescriptor);
uint64 rowsMerged =
ExecuteTaskListIntoTupleDestWithParam(ROW_MODIFY_NONCOMMUTATIVE, taskList,
tupleDest,
hasReturning,
paramListInfo);
executorState->es_processed = rowsMerged;
}
/*
* ExecuteSourceAtCoordAndRedistribution Executes the plan that necessitates evaluation
* at the coordinator and redistributes the resulting rows to intermediate files,
* ensuring colocation with the target shards. Directs the MERGE SQL operation to the
* target shards on the worker nodes, utilizing the colocated intermediate files as the
* data source.
*/
void
ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
{
EState *executorState = ScanStateGetExecutorState(scanState);
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *mergeQuery =
copyObject(distributedPlan->modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *targetRte = ExtractResultRelationRTE(mergeQuery);
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
Query *sourceQuery = sourceRte->subquery;
Oid targetRelationId = targetRte->relid;
PlannedStmt *sourcePlan =
copyObject(distributedPlan->selectPlanForModifyViaCoordinatorOrRepartition);
char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix;
bool hasReturning = distributedPlan->expectResults;
int partitionColumnIndex = distributedPlan->sourceResultRepartitionColumnIndex;
/*
* If we are dealing with partitioned table, we also need to lock its
* partitions. Here we only lock targetRelation, we acquire necessary
* locks on source tables during execution of those source queries.
*/
if (PartitionedTable(targetRelationId))
{
LockPartitionRelations(targetRelationId, RowExclusiveLock);
}
ereport(DEBUG1, (errmsg("Collect source query results on coordinator")));
List *prunedTaskList = NIL;
HTAB *shardStateHash =
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(
targetRelationId,
mergeQuery,
sourceQuery->targetList,
sourcePlan,
executorState,
intermediateResultIdPrefix,
partitionColumnIndex);
ereport(DEBUG1, (errmsg("Create a MERGE task list that needs to be routed")));
/* generate tasks for the .. phase */
List *taskList =
GenerateTaskListWithColocatedIntermediateResults(targetRelationId, mergeQuery,
intermediateResultIdPrefix);
/*
* We cannot actually execute MERGE INTO ... tasks that read from
* intermediate results that weren't created because no rows were
* written to them. Prune those tasks out by only including tasks
* on shards with connections.
*/
Task *task = NULL;
foreach_ptr(task, taskList)
{
uint64 shardId = task->anchorShardId;
bool shardModified = false;
hash_search(shardStateHash, &shardId, HASH_FIND, &shardModified);
if (shardModified)
{
prunedTaskList = lappend(prunedTaskList, task);
}
}
if (prunedTaskList == NIL)
{
/* No task to execute */
return;
}
ereport(DEBUG1, (errmsg("Execute MERGE task list")));
bool randomAccess = true;
bool interTransactions = false;
Assert(scanState->tuplestorestate == NULL);
scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions,
work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
TupleDestination *tupleDest =
CreateTupleStoreTupleDest(scanState->tuplestorestate, tupleDescriptor);
uint64 rowsMerged =
ExecuteTaskListIntoTupleDestWithParam(ROW_MODIFY_NONCOMMUTATIVE,
prunedTaskList,
tupleDest,
hasReturning,
paramListInfo);
executorState->es_processed = rowsMerged;
}
/*
* ExecuteMergeSourcePlanIntoColocatedIntermediateResults Executes the given PlannedStmt
* and inserts tuples into a set of intermediate results that are colocated with the
* target table for further processing MERGE INTO. It also returns the hash of shard
* states that were used to insert tuplesinto the target relation.
*/
static HTAB *
ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
Query *mergeQuery,
List *sourceTargetList,
PlannedStmt *sourcePlan,
EState *executorState,
char *intermediateResultIdPrefix,
int partitionColumnIndex)
{
ParamListInfo paramListInfo = executorState->es_param_list_info;
/* Get column name list and partition column index for the target table */
List *columnNameList =
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
/* set up a DestReceiver that copies into the intermediate file */
const bool publishableData = false;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState,
intermediateResultIdPrefix,
publishableData);
/* We can skip when writing to intermediate files */
copyDest->skipCoercions = true;
ExecutePlanIntoDestReceiver(sourcePlan, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA;
return copyDest->shardStateHash;
}

View File

@ -24,6 +24,7 @@
#include "distributed/multi_executor.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/subplan_execution.h"
#include "distributed/tuple_destination.h"
@ -49,6 +50,11 @@ JobExecutorType(DistributedPlan *distributedPlan)
if (distributedPlan->modifyQueryViaCoordinatorOrRepartition != NULL)
{
if (IsMergeQuery(distributedPlan->modifyQueryViaCoordinatorOrRepartition))
{
return MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY;
}
/*
* We go through
* MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT because

View File

@ -120,7 +120,7 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
*/
Query *modifyWithResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyWithResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyWithResultQuery);
RangeTblEntry *selectRte = ExtractSourceResultRangeTableEntry(modifyWithResultQuery);
CitusTableCacheEntry *targetCacheEntry = GetCitusTableCacheEntry(targetRelationId);
int shardCount = targetCacheEntry->shardIntervalArrayLength;
@ -139,11 +139,18 @@ GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
/* during COPY, the shard ID is appended to the result name */
appendStringInfo(resultId, "%s_" UINT64_FORMAT, resultIdPrefix, shardId);
/*
* For MERGE SQL, use the USING clause list, the main query target list
* is NULL
*/
List *targetList = IsMergeQuery(modifyQueryViaCoordinatorOrRepartition) ?
selectRte->subquery->targetList :
modifyQueryViaCoordinatorOrRepartition->targetList;
/* generate the query on the intermediate result */
Query *resultSelectQuery = BuildSubPlanResultQuery(
modifyQueryViaCoordinatorOrRepartition->targetList,
columnAliasList,
resultId->data);
Query *resultSelectQuery = BuildSubPlanResultQuery(targetList,
columnAliasList,
resultId->data);
/* put the intermediate result query in the INSERT..SELECT */
selectRte->subquery = resultSelectQuery;
@ -214,8 +221,6 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
*/
Query *modifyResultQuery = copyObject(modifyQueryViaCoordinatorOrRepartition);
RangeTblEntry *insertRte = ExtractResultRelationRTE(modifyResultQuery);
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(modifyResultQuery);
List *selectTargetList = selectRte->subquery->targetList;
Oid targetRelationId = targetRelation->relationId;
int shardCount = targetRelation->shardIntervalArrayLength;
@ -223,6 +228,10 @@ GenerateTaskListWithRedistributedResults(Query *modifyQueryViaCoordinatorOrRepar
uint32 taskIdIndex = 1;
uint64 jobId = INVALID_JOB_ID;
RangeTblEntry *selectRte =
ExtractSourceResultRangeTableEntry(modifyResultQuery);
List *selectTargetList = selectRte->subquery->targetList;
for (shardOffset = 0; shardOffset < shardCount; shardOffset++)
{
ShardInterval *targetShardInterval =

View File

@ -925,6 +925,10 @@ GetRouterPlanType(Query *query, Query *originalQuery, bool hasUnresolvedParams)
}
else if (IsMergeQuery(originalQuery))
{
if (hasUnresolvedParams)
{
return REPLAN_WITH_BOUND_PARAMETERS;
}
return MERGE_QUERY;
}
else
@ -990,7 +994,8 @@ CreateDistributedPlan(uint64 planId, bool allowRecursivePlanning, Query *origina
case MERGE_QUERY:
{
distributedPlan =
CreateMergePlan(originalQuery, query, plannerRestrictionContext);
CreateMergePlan(planId, originalQuery, query, plannerRestrictionContext,
boundParams);
break;
}
@ -1377,6 +1382,12 @@ FinalizePlan(PlannedStmt *localPlan, DistributedPlan *distributedPlan)
break;
}
case MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY:
{
customScan->methods = &NonPushableMergeCommandCustomScanMethods;
break;
}
default:
{
customScan->methods = &DelayedErrorCustomScanMethods;

View File

@ -86,7 +86,6 @@ static DeferredErrorMessage * InsertPartitionColumnMatchesSelect(Query *query,
static DistributedPlan * CreateNonPushableInsertSelectPlan(uint64 planId, Query *parse,
ParamListInfo boundParams);
static DeferredErrorMessage * NonPushableInsertSelectSupported(Query *insertSelectQuery);
static Query * WrapSubquery(Query *subquery);
static void RelabelTargetEntryList(List *selectTargetList, List *insertTargetList);
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
Oid targetRelationId);
@ -1477,7 +1476,7 @@ InsertSelectResultIdPrefix(uint64 planId)
* WrapSubquery wraps the given query as a subquery in a newly constructed
* "SELECT * FROM (...subquery...) citus_insert_select_subquery" query.
*/
static Query *
Query *
WrapSubquery(Query *subquery)
{
ParseState *pstate = make_parsestate(NULL);

View File

@ -485,6 +485,8 @@ RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
PlannerInfo *plannerInfo = relationRestriction->plannerInfo;
int rteIndex = relationRestriction->index;
/*
* Here we used the query from plannerInfo because it has the optimizations
* so that it doesn't have unnecessary columns. The original query doesn't have
@ -492,8 +494,18 @@ RequiredAttrNumbersForRelation(RangeTblEntry *rangeTableEntry,
* 'required' attributes.
*/
Query *queryToProcess = plannerInfo->parse;
int rteIndex = relationRestriction->index;
return RequiredAttrNumbersForRelationInternal(queryToProcess, rteIndex);
}
/*
* RequiredAttrNumbersForRelationInternal returns the required attribute numbers
* for the input range-table-index in the query parameter.
*/
List *
RequiredAttrNumbersForRelationInternal(Query *queryToProcess, int rteIndex)
{
List *allVarsInQuery = pull_vars_of_level((Node *) queryToProcess, 0);
List *requiredAttrNumbers = NIL;

File diff suppressed because it is too large Load Diff

View File

@ -33,6 +33,7 @@
#include "distributed/insert_select_planner.h"
#include "distributed/insert_select_executor.h"
#include "distributed/listutils.h"
#include "distributed/merge_planner.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_logical_optimizer.h"
@ -244,9 +245,8 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
*/
Query *queryCopy = copyObject(selectRte->subquery);
bool repartition = distributedPlan->modifyWithSelectMethod ==
MODIFY_WITH_SELECT_REPARTITION;
bool repartition =
distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION;
if (es->analyze)
{
@ -282,6 +282,67 @@ NonPushableInsertSelectExplainScan(CustomScanState *node, List *ancestors,
}
/*
* NonPushableMergeSqlExplainScan is a custom scan explain callback function
* which is used to print explain information of a Citus plan for MERGE INTO
* distributed_table USING (source query/table), where source can be any query
* whose results are repartitioned to colocated with the target table.
*/
void
NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
Query *mergeQuery = distributedPlan->modifyQueryViaCoordinatorOrRepartition;
RangeTblEntry *sourceRte = ExtractMergeSourceRangeTableEntry(mergeQuery);
/*
* Create a copy because ExplainOneQuery can modify the query, and later
* executions of prepared statements might require it. See
* https://github.com/citusdata/citus/issues/3947 for what can happen.
*/
Query *sourceQueryCopy = copyObject(sourceRte->subquery);
bool repartition =
distributedPlan->modifyWithSelectMethod == MODIFY_WITH_SELECT_REPARTITION;
if (es->analyze)
{
ereport(ERROR, (errmsg("EXPLAIN ANALYZE is currently not supported for "
"MERGE INTO ... commands with repartitioning")));
}
Oid targetRelationId = ModifyQueryResultRelationId(mergeQuery);
StringInfo mergeMethodMessage = makeStringInfo();
appendStringInfo(mergeMethodMessage,
"MERGE INTO %s method", get_rel_name(targetRelationId));
if (repartition)
{
ExplainPropertyText(mergeMethodMessage->data, "repartition", es);
}
else
{
ExplainPropertyText(mergeMethodMessage->data, "pull to coordinator", es);
}
ExplainOpenGroup("Source Query", "Source Query", false, es);
/* explain the MERGE source query */
IntoClause *into = NULL;
ParamListInfo params = NULL;
/*
* With PG14, we need to provide a string here, for now we put an empty
* string, which is valid according to postgres.
*/
char *queryString = pstrdup("");
ExplainOneQuery(sourceQueryCopy, 0, into, es, queryString, params, NULL);
ExplainCloseGroup("Source Query", "Source Query", false, es);
}
/*
* ExplainSubPlans generates EXPLAIN output for subplans for CTEs
* and complex subqueries. Because the planning for these queries

View File

@ -81,8 +81,6 @@ static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *joinNode,
JoinType joinType);
static JoinOrderNode * LocalJoin(JoinOrderNode *joinNode, TableEntry *candidateTable,
List *applicableJoinClauses, JoinType joinType);
static bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn,
List *joinClauseList);
static JoinOrderNode * SinglePartitionJoin(JoinOrderNode *joinNode,
TableEntry *candidateTable,
List *applicableJoinClauses,
@ -212,7 +210,7 @@ ExtractLeftMostRangeTableIndex(Node *node, int *rangeTableIndex)
/*
* JoinOnColumns determines whether two columns are joined by a given join clause list.
*/
static bool
bool
JoinOnColumns(List *currentPartitionColumnList, Var *candidateColumn,
List *joinClauseList)
{

View File

@ -388,6 +388,26 @@ AddPartitionKeyNotNullFilterToSelect(Query *subqery)
}
/*
* ExtractSourceResultRangeTableEntry Generic wrapper for modification commands that
* utilizes results as input, based on an source query.
*/
RangeTblEntry *
ExtractSourceResultRangeTableEntry(Query *query)
{
if (IsMergeQuery(query))
{
return ExtractMergeSourceRangeTableEntry(query);
}
else if (CheckInsertSelectQuery(query))
{
return ExtractSelectRangeTableEntry(query);
}
return NULL;
}
/*
* ExtractSelectRangeTableEntry returns the range table entry of the subquery.
* Note that the function expects and asserts that the input query be
@ -1863,19 +1883,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
if (*planningError)
{
/*
* For MERGE, we do _not_ plan any other router job than the MERGE job itself,
* let's not continue further down the lane in distributed planning, simply
* bail out.
*/
if (IsMergeQuery(originalQuery))
{
RaiseDeferredError(*planningError, ERROR);
}
else
{
return NULL;
}
return NULL;
}
Job *job = CreateJob(originalQuery);
@ -2366,14 +2374,7 @@ PlanRouterQuery(Query *originalQuery,
Assert(UpdateOrDeleteOrMergeQuery(originalQuery));
if (IsMergeQuery(originalQuery))
{
targetRelationId = ModifyQueryResultRelationId(originalQuery);
planningError = MergeQuerySupported(targetRelationId, originalQuery,
isMultiShardQuery,
plannerRestrictionContext);
}
else
if (!IsMergeQuery(originalQuery))
{
planningError = ModifyQuerySupported(originalQuery, originalQuery,
isMultiShardQuery,

View File

@ -188,7 +188,6 @@ static Query * BuildReadIntermediateResultsQuery(List *targetEntryList,
List *columnAliasList,
Const *resultIdConst, Oid functionOid,
bool useBinaryCopyFormat);
static void UpdateVarNosInNode(Node *node, Index newVarNo);
static Query * CreateOuterSubquery(RangeTblEntry *rangeTableEntry,
List *outerSubqueryTargetList);
static List * GenerateRequiredColNamesFromTargetList(List *targetList);
@ -1891,7 +1890,7 @@ GenerateRequiredColNamesFromTargetList(List *targetList)
* UpdateVarNosInNode iterates the Vars in the
* given node and updates the varno's as the newVarNo.
*/
static void
void
UpdateVarNosInNode(Node *node, Index newVarNo)
{
List *varList = pull_var_clause(node, PVC_RECURSE_AGGREGATES |

View File

@ -34,6 +34,7 @@ typedef struct CitusScanState
extern CustomScanMethods AdaptiveExecutorCustomScanMethods;
extern CustomScanMethods NonPushableInsertSelectCustomScanMethods;
extern CustomScanMethods DelayedErrorCustomScanMethods;
extern CustomScanMethods NonPushableMergeCommandCustomScanMethods;
extern void RegisterCitusCustomScanMethods(void);

View File

@ -152,6 +152,12 @@ typedef struct CitusCopyDestReceiver
* upfront.
*/
uint64 appendShardId;
/*
* When copying to intermediate files, we can skip coercions and run them
* when merging into the target tables.
*/
bool skipCoercions;
} CitusCopyDestReceiver;

View File

@ -18,6 +18,7 @@
extern TupleTableSlot * NonPushableInsertSelectExecScan(CustomScanState *node);
extern List * BuildColumnNameListFromTargetList(Oid targetRelationId,
List *insertTargetList);
#endif /* INSERT_SELECT_EXECUTOR_H */

View File

@ -44,6 +44,7 @@ extern DistributedPlan * CreateInsertSelectIntoLocalTablePlan(uint64 planId,
plannerRestrictionContext);
extern char * InsertSelectResultIdPrefix(uint64 planId);
extern bool PlanningInsertSelect(void);
extern Query * WrapSubquery(Query *subquery);
#endif /* INSERT_SELECT_PLANNER_H */

View File

@ -33,5 +33,6 @@ extern void RecursivelyPlanLocalTableJoins(Query *query,
extern List * RequiredAttrNumbersForRelation(RangeTblEntry *relationRte,
PlannerRestrictionContext *
plannerRestrictionContext);
extern List * RequiredAttrNumbersForRelationInternal(Query *queryToProcess, int rteIndex);
#endif /* LOCAL_DISTRIBUTED_JOIN_PLANNER_H */

View File

@ -0,0 +1,17 @@
/*-------------------------------------------------------------------------
*
* merge_executor.h
*
* Declarations for public functions and types related to executing
* MERGE INTO ... SQL commands.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MERGE_EXECUTOR_H
#define MERGE_EXECUTOR_H
extern TupleTableSlot * NonPushableMergeCommandExecScan(CustomScanState *node);
#endif /* MERGE_EXECUTOR_H */

View File

@ -19,16 +19,18 @@
#include "distributed/errormessage.h"
#include "distributed/multi_physical_planner.h"
extern DeferredErrorMessage * MergeQuerySupported(Oid resultRelationId,
Query *originalQuery,
bool multiShardQuery,
PlannerRestrictionContext *
plannerRestrictionContext);
extern DistributedPlan * CreateMergePlan(Query *originalQuery, Query *query,
extern DistributedPlan * CreateMergePlan(uint64 planId, Query *originalQuery,
Query *query,
PlannerRestrictionContext *
plannerRestrictionContext);
plannerRestrictionContext,
ParamListInfo boundParams);
extern bool IsLocalTableModification(Oid targetRelationId, Query *query,
uint64 shardId,
RTEListProperties *rteProperties);
extern void NonPushableMergeCommandExplainScan(CustomScanState *node, List *ancestors,
struct ExplainState *es);
extern Var * FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query);
extern RangeTblEntry * ExtractMergeSourceRangeTableEntry(Query *query);
#endif /* MERGE_PLANNER_H */

View File

@ -114,6 +114,9 @@ typedef struct ExecutionParams
/* isUtilityCommand is true if the current execution is for a utility
* command such as a DDL command.*/
bool isUtilityCommand;
/* pass bind parameters to the distributed executor for parameterized plans */
ParamListInfo paramListInfo;
} ExecutionParams;
ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
@ -122,6 +125,11 @@ ExecutionParams * CreateBasicExecutionParams(RowModifyLevel modLevel,
bool localExecutionSupported);
extern uint64 ExecuteTaskListExtended(ExecutionParams *executionParams);
extern uint64 ExecuteTaskListIntoTupleDestWithParam(RowModifyLevel modLevel,
List *taskList,
TupleDestination *tupleDest,
bool expectResults,
ParamListInfo paramListInfo);
extern uint64 ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
TupleDestination *tupleDest,
bool expectResults);

View File

@ -108,6 +108,8 @@ extern Var * DistPartitionKey(Oid relationId);
extern Var * DistPartitionKeyOrError(Oid relationId);
extern char PartitionMethod(Oid relationId);
extern char TableReplicationModel(Oid relationId);
extern bool JoinOnColumns(List *currentPartitionColumnList, Var *candidatePartitionColumn,
List *joinClauseList);
#endif /* MULTI_JOIN_ORDER_H */

View File

@ -463,6 +463,13 @@ typedef struct DistributedPlan
* or if prepared statement parameters prevented successful planning.
*/
DeferredErrorMessage *planningError;
/*
* When performing query execution scenarios that require repartitioning
* the source rows, this field stores the index of the column in the list
* of source rows to be repartitioned for colocation with the target.
*/
int sourceResultRepartitionColumnIndex;
} DistributedPlan;

View File

@ -118,5 +118,6 @@ extern Job * RouterJob(Query *originalQuery,
PlannerRestrictionContext *plannerRestrictionContext,
DeferredErrorMessage **planningError);
extern bool ContainsOnlyLocalTables(RTEListProperties *rteProperties);
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
#endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -29,7 +29,8 @@ typedef enum
{
MULTI_EXECUTOR_INVALID_FIRST = 0,
MULTI_EXECUTOR_ADAPTIVE = 1,
MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 2
MULTI_EXECUTOR_NON_PUSHABLE_INSERT_SELECT = 2,
MULTI_EXECUTOR_NON_PUSHABLE_MERGE_QUERY = 3
} MultiExecutorType;

View File

@ -46,6 +46,7 @@ extern void ReplaceRTERelationWithRteSubquery(RangeTblEntry *rangeTableEntry,
extern bool IsRecursivelyPlannableRelation(RangeTblEntry *rangeTableEntry);
extern bool IsRelationLocalTableOrMatView(Oid relationId);
extern bool ContainsReferencesToOuterQuery(Query *query);
extern void UpdateVarNosInNode(Node *node, Index newVarNo);
#endif /* RECURSIVE_PLANNING_H */

View File

@ -15,7 +15,7 @@
extern bool EnableRepartitionedInsertSelect;
extern int DistributionColumnIndex(List *insertTargetList, Var *partitionColumn);
extern int DistributionColumnIndex(List *insertTargetList, Var *distributionColumn);
extern List * GenerateTaskListWithColocatedIntermediateResults(Oid targetRelationId,
Query *
modifyQueryViaCoordinatorOrRepartition,

View File

@ -159,6 +159,8 @@ s/Subplan [0-9]+\_/Subplan XXX\_/g
# Plan numbers in insert select
s/read_intermediate_result\('insert_select_[0-9]+_/read_intermediate_result('insert_select_XXX_/g
# Plan numbers in merge into
s/read_intermediate_result\('merge_into_[0-9]+_/read_intermediate_result('merge_into_XXX_/g
# ignore job id in repartitioned insert/select
s/repartitioned_results_[0-9]+/repartitioned_results_xxxxx/g

View File

@ -220,6 +220,7 @@ class AllSingleShardTableDefaultConfig(CitusDefaultClusterConfig):
# "dist_query_single_shard" table acts differently when the table
# has a single shard. This is explained with a comment in the test.
"nested_execution",
"merge_arbitrary",
]

File diff suppressed because it is too large Load Diff

View File

@ -148,3 +148,51 @@ SELECT * FROM t1 order by id;
(5 rows)
ROLLBACK;
-- Test prepared statements with repartition
PREPARE merge_repartition_pg(int,int,int,int) as
MERGE INTO pg_target target
USING (SELECT id+1+$1 as key, val FROM (SELECT * FROM pg_source UNION SELECT * FROM pg_source WHERE id = $2) as foo) as source
ON (source.key = target.id AND $3 < 10000)
WHEN MATCHED THEN UPDATE SET val = (source.key::int+$4)
WHEN NOT MATCHED THEN INSERT VALUES (source.key, source.val);
PREPARE merge_repartition_citus(int,int,int,int) as
MERGE INTO citus_target target
USING (SELECT id+1+$1 as key, val FROM (SELECT * FROM citus_source UNION SELECT * FROM citus_source WHERE id = $2) as foo) as source
ON (source.key = target.id AND $3 < 10000)
WHEN MATCHED THEN UPDATE SET val = (source.key::int+$4)
WHEN NOT MATCHED THEN INSERT VALUES (source.key, source.val);
EXECUTE merge_repartition_pg(1,1,1,1);
EXECUTE merge_repartition_citus(1,1,1,1);
SET client_min_messages = NOTICE;
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
NOTICE: The average of pg_target.val is equal to citus_target.val
compare_data
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;
EXECUTE merge_repartition_pg(1,100,1,1);
EXECUTE merge_repartition_citus(1,100,1,1);
EXECUTE merge_repartition_pg(2,200,1,1);
EXECUTE merge_repartition_citus(2,200,1,1);
EXECUTE merge_repartition_pg(3,300,1,1);
EXECUTE merge_repartition_citus(3,300,1,1);
EXECUTE merge_repartition_pg(4,400,1,1);
EXECUTE merge_repartition_citus(4,400,1,1);
EXECUTE merge_repartition_pg(5,500,1,1);
EXECUTE merge_repartition_citus(5,500,1,1);
-- Sixth time
EXECUTE merge_repartition_pg(6,600,1,6);
EXECUTE merge_repartition_citus(6,600,1,6);
SET client_min_messages = NOTICE;
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
NOTICE: The average of pg_target.val is equal to citus_target.val
compare_data
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;

View File

@ -70,3 +70,77 @@ SELECT citus_add_local_table_to_metadata('s1');
(1 row)
-- Test prepared statements with repartition
CREATE TABLE pg_target(id int, val int);
CREATE TABLE pg_source(id int, val int, const int);
CREATE TABLE citus_target(id int, val int);
CREATE TABLE citus_source(id int, val int, const int);
SELECT citus_add_local_table_to_metadata('pg_target');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('pg_source');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(5001, 10000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(5001, 10000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
--
-- Target and source are distributed, and non-colocated
--
SELECT setup_data();
setup_data
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_target', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,230 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- We create two sets of source and target tables, one set in Postgres and
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
-- and compare the final results of the target tables in Postgres and Citus.
-- The results should match. This process is repeated for various combinations
-- of MERGE SQL.
DROP SCHEMA IF EXISTS merge_partition_tables CASCADE;
NOTICE: schema "merge_partition_tables" does not exist, skipping
CREATE SCHEMA merge_partition_tables;
SET search_path TO merge_partition_tables;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 7000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
CREATE TABLE pg_target(id int, val int) PARTITION BY RANGE(id);
CREATE TABLE pg_source(id int, val int, const int) PARTITION BY RANGE(val);
CREATE TABLE citus_target(id int, val int) PARTITION BY RANGE(id);
CREATE TABLE citus_source(id int, val int, const int) PARTITION BY RANGE(val);
SELECT citus_add_local_table_to_metadata('citus_target');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('citus_source');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE TABLE part1 PARTITION OF pg_target FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part2 PARTITION OF pg_target FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part3 PARTITION OF pg_target FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part4 PARTITION OF pg_target DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part5 PARTITION OF citus_target FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part6 PARTITION OF citus_target FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part7 PARTITION OF citus_target FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part8 PARTITION OF citus_target DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part9 PARTITION OF pg_source FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part10 PARTITION OF pg_source FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part11 PARTITION OF pg_source FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part12 PARTITION OF pg_source DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part13 PARTITION OF citus_source FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part14 PARTITION OF citus_source FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part15 PARTITION OF citus_source FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part16 PARTITION OF citus_source DEFAULT WITH (autovacuum_enabled=off);
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
TRUNCATE pg_target;
TRUNCATE pg_source;
TRUNCATE citus_target;
TRUNCATE citus_source;
SELECT undistribute_table('citus_target');
SELECT undistribute_table('citus_source');
$$
LANGUAGE SQL;
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(5001, 10000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(5001, 10000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID SET search_path TO merge_partition_tables AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
-- Test colocated partition tables
SET client_min_messages = ERROR;
SELECT cleanup_data();
cleanup_data
---------------------------------------------------------------------
(1 row)
SELECT setup_data();
setup_data
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_target', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
create_distributed_table
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 1
NOTICE: The average of pg_target.val is equal to citus_target.val
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 2
compare_data
---------------------------------------------------------------------
(1 row)
-- Test non-colocated partition tables
SET client_min_messages = ERROR;
SELECT cleanup_data();
cleanup_data
---------------------------------------------------------------------
(1 row)
SELECT setup_data();
setup_data
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_target', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
RESET client_min_messages;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 1
NOTICE: The average of pg_target.val is equal to citus_target.val
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 2
compare_data
---------------------------------------------------------------------
(1 row)
DROP SCHEMA merge_partition_tables CASCADE;
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table pg_target
drop cascades to table pg_source
drop cascades to function cleanup_data()
drop cascades to function setup_data()
drop cascades to function check_data(text,text,text,text)
drop cascades to function compare_data()
drop cascades to table citus_target
drop cascades to table citus_source

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -0,0 +1,212 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- We create two sets of source and target tables, one set in Postgres and
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
-- and compare the final results of the target tables in Postgres and Citus.
-- The results should match. This process is repeated for various combinations
-- of MERGE SQL.
DROP SCHEMA IF EXISTS merge_repartition2_schema CASCADE;
NOTICE: schema "merge_repartition2_schema" does not exist, skipping
CREATE SCHEMA merge_repartition2_schema;
SET search_path TO merge_repartition2_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 6000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
CREATE TABLE pg_target(id int, val int);
CREATE TABLE pg_source(id int, val int, const int);
CREATE TABLE citus_target(id int, val int);
CREATE TABLE citus_source(id int, val int, const int);
SELECT citus_add_local_table_to_metadata('citus_target');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT citus_add_local_table_to_metadata('citus_source');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
TRUNCATE pg_target;
TRUNCATE pg_source;
TRUNCATE citus_target;
TRUNCATE citus_source;
SELECT undistribute_table('citus_target');
SELECT undistribute_table('citus_source');
$$
LANGUAGE SQL;
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(50001, 100000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(50001, 100000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
-- Test nested cte
SELECT cleanup_data();
NOTICE: creating a new table for merge_repartition2_schema.citus_target
CONTEXT: SQL function "cleanup_data" statement 5
NOTICE: moving the data of merge_repartition2_schema.citus_target
CONTEXT: SQL function "cleanup_data" statement 5
NOTICE: dropping the old merge_repartition2_schema.citus_target
CONTEXT: SQL function "cleanup_data" statement 5
NOTICE: renaming the new table to merge_repartition2_schema.citus_target
CONTEXT: SQL function "cleanup_data" statement 5
NOTICE: creating a new table for merge_repartition2_schema.citus_source
CONTEXT: SQL function "cleanup_data" statement 6
NOTICE: moving the data of merge_repartition2_schema.citus_source
CONTEXT: SQL function "cleanup_data" statement 6
NOTICE: dropping the old merge_repartition2_schema.citus_source
CONTEXT: SQL function "cleanup_data" statement 6
NOTICE: renaming the new table to merge_repartition2_schema.citus_source
CONTEXT: SQL function "cleanup_data" statement 6
cleanup_data
---------------------------------------------------------------------
(1 row)
SELECT setup_data();
setup_data
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_target', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition2_schema.citus_target$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$merge_repartition2_schema.citus_source$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM pg_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
MERGE INTO pg_target t
USING (SELECT const, val, id FROM pg_source WHERE id IN (SELECT id FROM cte_top)) as s
ON (s.id = t.id)
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.val::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM citus_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
MERGE INTO citus_target t
USING (SELECT const, val, id FROM citus_source WHERE id IN (SELECT id FROM cte_top)) as s
ON (s.id = t.id)
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.val::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 1
NOTICE: The average of pg_target.val is equal to citus_target.val
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 2
compare_data
---------------------------------------------------------------------
(1 row)
-- Test aggregate function in source query
MERGE INTO pg_target t
USING (SELECT count(id+1)::text as value, val as key FROM pg_source group by key) s
ON t.id = s.key
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.value::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.key, value::int4+10);
MERGE INTO citus_target t
USING (SELECT count(id+1)::text as value, val as key FROM citus_source group by key) s
ON t.id = s.key
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.value::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.key, value::int4+10);
SELECT compare_data();
NOTICE: The average of pg_target.id is equal to citus_target.id
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 1
NOTICE: The average of pg_target.val is equal to citus_target.val
CONTEXT: PL/pgSQL function check_data(text,text,text,text) line XX at RAISE
SQL function "compare_data" statement 2
compare_data
---------------------------------------------------------------------
(1 row)
DROP SCHEMA merge_repartition2_schema CASCADE;
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table pg_target
drop cascades to table pg_source
drop cascades to function cleanup_data()
drop cascades to function setup_data()
drop cascades to function check_data(text,text,text,text)
drop cascades to function compare_data()
drop cascades to table citus_target
drop cascades to table citus_source

View File

@ -0,0 +1,6 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q

View File

@ -406,14 +406,16 @@ SELECT create_distributed_table('tbl2', 'x');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
-- also, not inside subqueries & ctes
ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined.
DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting
-- also, inside subqueries & ctes
WITH targq AS (
SELECT * FROM tbl2
)
MERGE INTO tbl1 USING targq ON (true)
WHEN MATCHED THEN DELETE;
ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined.
DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting
WITH foo AS (
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE
@ -429,7 +431,8 @@ USING tbl2
ON (true)
WHEN MATCHED THEN
DO NOTHING;
ERROR: MERGE command is only supported when all distributed tables are co-located and joined on their distribution columns
ERROR: The required join operation is missing between the target's distribution column and any expression originating from the source. The issue may arise from either a non-equi-join or a mismatch in the datatypes of the columns being joined.
DETAIL: Without a equi-join condition on the target's distribution column, the source rows cannot be efficiently redistributed, and the NOT-MATCHED condition cannot be evaluated unambiguously. This can result in incorrect or unexpected results when attempting to merge tables in a distributed setting
MERGE INTO tbl1 t
USING tbl2
ON (true)

View File

@ -15,6 +15,14 @@ SET search_path TO pgmerge_schema;
SET citus.use_citus_managed_tables to true;
\set SHOW_CONTEXT errors
SET citus.next_shard_id TO 4001000;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
CREATE USER regress_merge_privs;
CREATE USER regress_merge_no_privs;
DROP TABLE IF EXISTS target;

View File

@ -111,8 +111,9 @@ test: background_task_queue_monitor
test: clock
# MERGE tests
test: merge
test: pgmerge
test: merge pgmerge merge_repartition2
test: merge_repartition1
test: merge_partition_tables
# ---------
# test that no tests leaked intermediate results. This should always be last

View File

@ -21,6 +21,9 @@ SET citus.next_shard_id TO 4000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
CREATE TABLE source
(
@ -142,7 +145,7 @@ SELECT insert_data();
SELECT undistribute_table('target');
SELECT undistribute_table('source');
SELECT create_distributed_table('target', 'customer_id');
SELECT create_distributed_table('source', 'customer_id');
SELECT create_distributed_table('source', 'customer_id', colocate_with=>'target');
-- Updates one of the row with customer_id = 30002
SELECT * from target t WHERE t.customer_id = 30002;
@ -280,7 +283,7 @@ TRUNCATE t1;
TRUNCATE s1;
SELECT load();
SELECT create_distributed_table('t1', 'id');
SELECT create_distributed_table('s1', 'id');
SELECT create_distributed_table('s1', 'id', colocate_with=>'t1');
SELECT * FROM t1 order by id;
@ -368,7 +371,7 @@ SELECT insert_data();
SELECT undistribute_table('t2');
SELECT undistribute_table('s2');
SELECT create_distributed_table('t2', 'id');
SELECT create_distributed_table('s2', 'id');
SELECT create_distributed_table('s2', 'id', colocate_with => 't2');
SELECT * FROM t2 ORDER BY 1;
SET citus.log_remote_commands to true;
@ -924,27 +927,25 @@ ROLLBACK;
-- Test the same scenarios with distributed tables
SELECT create_distributed_table('target_cj', 'tid');
SELECT create_distributed_table('source_cj1', 'sid1');
SELECT create_distributed_table('source_cj2', 'sid2');
SELECT create_distributed_table('source_cj1', 'sid1', colocate_with => 'target_cj');
SELECT create_distributed_table('source_cj2', 'sid2', colocate_with => 'target_cj');
BEGIN;
SET citus.log_remote_commands to true;
MERGE INTO target_cj t
USING source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2
USING (SELECT * FROM source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = sid2) s
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src2
WHEN NOT MATCHED THEN
DO NOTHING;
SET citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;
BEGIN;
-- try accessing columns from either side of the source join
MERGE INTO target_cj t
USING source_cj1 s2
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10
USING (SELECT * FROM source_cj1 s2
INNER JOIN source_cj2 s1 ON sid1 = sid2 AND val1 = 10) s
ON t.tid = sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src1, val = val2
@ -982,7 +983,7 @@ ROLLBACK;
-- Test PREPARE
PREPARE foo(int) AS
PREPARE merge_prepare(int) AS
MERGE INTO target_cj target
USING (SELECT * FROM source_cj1) sub
ON target.tid = sub.sid1 AND target.tid = $1
@ -994,11 +995,11 @@ WHEN NOT MATCHED THEN
SELECT * FROM target_cj ORDER BY 1;
BEGIN;
EXECUTE foo(2);
EXECUTE foo(2);
EXECUTE foo(2);
EXECUTE foo(2);
EXECUTE foo(2);
EXECUTE merge_prepare(2);
EXECUTE merge_prepare(2);
EXECUTE merge_prepare(2);
EXECUTE merge_prepare(2);
EXECUTE merge_prepare(2);
SELECT * FROM target_cj ORDER BY 1;
ROLLBACK;
@ -1006,10 +1007,10 @@ BEGIN;
SET citus.log_remote_commands to true;
SET client_min_messages TO DEBUG1;
EXECUTE foo(2);
EXECUTE merge_prepare(2);
RESET client_min_messages;
EXECUTE foo(2);
EXECUTE merge_prepare(2);
SET citus.log_remote_commands to false;
SELECT * FROM target_cj ORDER BY 1;
@ -1036,7 +1037,7 @@ INSERT INTO citus_target SELECT i, 'target' FROM generate_series(250, 500) i;
INSERT INTO citus_source SELECT i, 'source' FROM generate_series(1, 500) i;
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
--
-- This routine compares the target tables of Postgres and Citus and
@ -1622,10 +1623,271 @@ SELECT count(*)
FROM pg_result FULL OUTER JOIN local_ref ON pg_result.t1 = local_ref.t1
WHERE pg_result.t1 IS NULL OR local_ref.t1 IS NULL;
-- Now make target as distributed, keep reference as source
TRUNCATE reftarget_local;
TRUNCATE refsource_ref;
INSERT INTO reftarget_local VALUES(1, 0);
INSERT INTO reftarget_local VALUES(3, 100);
INSERT INTO refsource_ref VALUES(1, 1);
INSERT INTO refsource_ref VALUES(2, 2);
INSERT INTO refsource_ref VALUES(3, 3);
SELECT create_distributed_table('reftarget_local', 't1');
MERGE INTO reftarget_local
USING (SELECT * FROM refsource_ref UNION SELECT * FROM refsource_ref) AS foo ON reftarget_local.t1 = foo.s1
WHEN MATCHED AND reftarget_local.t2 = 100 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 100
WHEN NOT MATCHED THEN
INSERT VALUES(foo.s1);
SELECT * INTO dist_reftarget FROM reftarget_local ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM dist_reftarget c, pg_result p
WHERE c.t1 = p.t1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_result FULL OUTER JOIN dist_reftarget ON pg_result.t1 = dist_reftarget.t1
WHERE pg_result.t1 IS NULL OR dist_reftarget.t1 IS NULL;
--
-- Distributed (target), Reference(source)
--
CREATE TABLE demo_distributed(id1 int, val1 int);
CREATE TABLE demo_source_table(id2 int, val2 int);
CREATE FUNCTION setup_demo_data() RETURNS VOID AS $$
INSERT INTO demo_distributed VALUES(1, 100);
INSERT INTO demo_distributed VALUES(7, 100);
INSERT INTO demo_distributed VALUES(15, 100);
INSERT INTO demo_distributed VALUES(100, 0);
INSERT INTO demo_distributed VALUES(300, 100);
INSERT INTO demo_distributed VALUES(400, 0);
INSERT INTO demo_source_table VALUES(1, 77);
INSERT INTO demo_source_table VALUES(15, 77);
INSERT INTO demo_source_table VALUES(75, 77);
INSERT INTO demo_source_table VALUES(100, 77);
INSERT INTO demo_source_table VALUES(300, 77);
INSERT INTO demo_source_table VALUES(400, 77);
INSERT INTO demo_source_table VALUES(500, 77);
$$
LANGUAGE SQL;
CREATE FUNCTION merge_demo_data() RETURNS VOID AS $$
MERGE INTO demo_distributed t
USING demo_source_table s ON s.id2 = t.id1
WHEN MATCHED AND t.val1= 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val1 = val1 + s.val2
WHEN NOT MATCHED THEN
INSERT VALUES(s.id2, s.val2);
$$
LANGUAGE SQL;
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO pg_demo_result FROM demo_distributed ORDER BY 1, 2;
TRUNCATE demo_distributed;
TRUNCATE demo_source_table;
SELECT create_distributed_table('demo_distributed', 'id1');
SELECT create_reference_table('demo_source_table');
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO dist_demo_result FROM demo_distributed ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM dist_demo_result c, pg_demo_result p
WHERE c.id1 = p.id1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_demo_result p FULL OUTER JOIN dist_demo_result d ON p.id1 = d.id1
WHERE p.id1 IS NULL OR d.id1 IS NULL;
-- Now convert source as distributed, but non-colocated with target
DROP TABLE pg_demo_result, dist_demo_result;
SELECT undistribute_table('demo_distributed');
SELECT undistribute_table('demo_source_table');
CREATE OR REPLACE FUNCTION merge_demo_data() RETURNS VOID AS $$
MERGE INTO demo_distributed t
USING (SELECT id2,val2 FROM demo_source_table UNION SELECT val2,id2 FROM demo_source_table) AS s
ON t.id1 = s.id2
WHEN MATCHED THEN
UPDATE SET val1 = val1 + 1;
$$
LANGUAGE SQL;
TRUNCATE demo_distributed;
TRUNCATE demo_source_table;
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO pg_demo_result FROM demo_distributed ORDER BY 1, 2;
SELECT create_distributed_table('demo_distributed', 'id1');
SELECT create_distributed_table('demo_source_table', 'id2', colocate_with=>'none');
TRUNCATE demo_distributed;
TRUNCATE demo_source_table;
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO dist_demo_result FROM demo_distributed ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM dist_demo_result c, pg_demo_result p
WHERE c.id1 = p.id1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_demo_result p FULL OUTER JOIN dist_demo_result d ON p.id1 = d.id1
WHERE p.id1 IS NULL OR d.id1 IS NULL;
-- Test with LIMIT
CREATE OR REPLACE FUNCTION merge_demo_data() RETURNS VOID AS $$
MERGE INTO demo_distributed t
USING (SELECT 999 as s3, demo_source_table.* FROM (SELECT * FROM demo_source_table ORDER BY 1 LIMIT 3) as foo LEFT JOIN demo_source_table USING(id2)) AS s
ON t.id1 = s.id2
WHEN MATCHED THEN
UPDATE SET val1 = s3
WHEN NOT MATCHED THEN
INSERT VALUES(id2, s3);
$$
LANGUAGE SQL;
DROP TABLE pg_demo_result, dist_demo_result;
SELECT undistribute_table('demo_distributed');
SELECT undistribute_table('demo_source_table');
TRUNCATE demo_distributed;
TRUNCATE demo_source_table;
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO pg_demo_result FROM demo_distributed ORDER BY 1, 2;
SELECT create_distributed_table('demo_distributed', 'id1');
SELECT create_distributed_table('demo_source_table', 'id2', colocate_with=>'none');
TRUNCATE demo_distributed;
TRUNCATE demo_source_table;
SELECT setup_demo_data();
SELECT merge_demo_data();
SELECT * INTO dist_demo_result FROM demo_distributed ORDER BY 1, 2;
-- Should be equal
SELECT c.*, p.*
FROM dist_demo_result c, pg_demo_result p
WHERE c.id1 = p.id1
ORDER BY 1,2;
-- Must return zero rows
SELECT count(*)
FROM pg_demo_result p FULL OUTER JOIN dist_demo_result d ON p.id1 = d.id1
WHERE p.id1 IS NULL OR d.id1 IS NULL;
-- Test explain with repartition
SET citus.explain_all_tasks TO false;
EXPLAIN (COSTS OFF)
MERGE INTO demo_distributed t
USING (SELECT 999 as s3, demo_source_table.* FROM (SELECT * FROM demo_source_table ORDER BY 1 LIMIT 3) as foo LEFT JOIN demo_source_table USING(id2)) AS s
ON t.id1 = s.id2
WHEN MATCHED THEN
UPDATE SET val1 = s3
WHEN NOT MATCHED THEN
INSERT VALUES(id2, s3);
-- Test multiple join conditions on distribution column
MERGE INTO demo_distributed t
USING (SELECT id2+1 as key, id2+3 as key2 FROM demo_source_table) s
ON t.id1 = s.key2 ANd t.id1 = s.key
WHEN NOT MATCHED THEN
INSERT VALUES(s.key2, 333);
MERGE INTO demo_distributed t
USING (SELECT id2+1 as key, id2+2 as key2 FROM demo_source_table) s
ON t.id1 = s.key2 AND t.id1 = s.key
WHEN NOT MATCHED THEN
DO NOTHING;
MERGE INTO demo_distributed t
USING (SELECT id2+1 as key, id2+3 as key2 FROM demo_source_table) s
ON t.val1 = s.key2 AND t.id1 = s.key AND t.id1 = s.key2
WHEN NOT MATCHED THEN
INSERT VALUES(s.key2, 444);
-- Test aggregate functions in source-query
SELECT COUNT(*) FROM demo_distributed where val1 = 150;
SELECT COUNT(*) FROM demo_distributed where id1 = 2;
-- One row with Key=7 updated in demo_distributed to 150
MERGE INTO demo_distributed t
USING (SELECT count(DISTINCT id2)::int4 as key FROM demo_source_table GROUP BY val2) s
ON t.id1 = s.key
WHEN NOT MATCHED THEN INSERT VALUES(s.key, 1)
WHEN MATCHED THEN UPDATE SET val1 = 150;
-- Seven rows with Key=2 inserted in demo_distributed
MERGE INTO demo_distributed t
USING (SELECT (count(DISTINCT val2) + 1)::int4 as key FROM demo_source_table GROUP BY id2) s
ON t.id1 = s.key
WHEN NOT MATCHED THEN INSERT VALUES(s.key, 1)
WHEN MATCHED THEN UPDATE SET val1 = 150;
SELECT COUNT(*) FROM demo_distributed where val1 = 150;
SELECT COUNT(*) FROM demo_distributed where id1 = 2;
--
-- Error and Unsupported scenarios
--
-- Test explain analyze with repartition
EXPLAIN ANALYZE
MERGE INTO demo_distributed t
USING (SELECT 999 as s3, demo_source_table.* FROM (SELECT * FROM demo_source_table ORDER BY 1 LIMIT 3) as foo LEFT JOIN demo_source_table USING(id2)) AS s
ON t.id1 = s.id2
WHEN MATCHED THEN
UPDATE SET val1 = s3
WHEN NOT MATCHED THEN
INSERT VALUES(id2, s3);
-- Source without a table
MERGE INTO target_cj t
USING (VALUES (1, 1), (2, 1), (3, 3)) as s (sid, val)
ON t.tid = s.sid AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET val = s.val
WHEN NOT MATCHED THEN
DO NOTHING;
-- Incomplete source
MERGE INTO target_cj t
USING (source_cj1 s1 INNER JOIN source_cj2 s2 ON sid1 = val2) s
ON t.tid = s.sid1 AND t.tid = 2
WHEN MATCHED THEN
UPDATE SET src = src2
WHEN NOT MATCHED THEN
DO NOTHING;
-- Reference as a target and local as source
MERGE INTO refsource_ref
USING (SELECT * FROM reftarget_local UNION SELECT * FROM reftarget_local) AS foo ON refsource_ref.s1 = foo.t1
@ -1634,34 +1896,16 @@ WHEN MATCHED THEN
WHEN NOT MATCHED THEN
INSERT VALUES(foo.t1);
-- Reference as a source and distributed as target
MERGE INTO target_set t
USING refsource_ref AS s ON t.t1 = s.s1
WHEN MATCHED THEN
DO NOTHING;
MERGE INTO target_set
USING source_set AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN
UPDATE SET ctid = '(0,100)';
MERGE INTO target_set
USING (SELECT s1,s2 FROM source_set UNION SELECT s2,s1 FROM source_set) AS foo ON target_set.t1 = foo.s1
WHEN MATCHED THEN
UPDATE SET t2 = t2 + 1;
MERGE INTO target_set
USING (SELECT 2 as s3, source_set.* FROM (SELECT * FROM source_set LIMIT 1) as foo LEFT JOIN source_set USING( s1)) AS foo
ON target_set.t1 = foo.s1
WHEN MATCHED THEN UPDATE SET t2 = t2 + 1
WHEN NOT MATCHED THEN INSERT VALUES(s1, s3);
-- modifying CTE not supported
EXPLAIN
WITH cte_1 AS (DELETE FROM target_json)
WITH cte_1 AS (DELETE FROM target_json RETURNING *)
MERGE INTO target_json sda
USING source_json sdn
USING cte_1 sdn
ON sda.id = sdn.id
WHEN NOT matched THEN
INSERT (id, z) VALUES (sdn.id, 5);
@ -1710,6 +1954,7 @@ ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (id) VALUES(1000);
-- Colocated merge
MERGE INTO t1 t
USING s1 s
ON t.id = s.id
@ -1722,6 +1967,13 @@ ON t.id = s.id
WHEN NOT MATCHED THEN
INSERT (val) VALUES(s.val);
-- Non-colocated merge
MERGE INTO t1 t
USING s1 s
ON t.id = s.val
WHEN NOT MATCHED THEN
INSERT (id) VALUES(s.id);
-- try updating the distribution key column
BEGIN;
MERGE INTO target_cj t
@ -1810,17 +2062,7 @@ WHEN MATCHED AND (merge_when_and_write()) THEN
ROLLBACK;
-- Joining on partition columns with sub-query
MERGE INTO t1
USING (SELECT * FROM s1) sub ON (sub.val = t1.id) -- sub.val is not a distribution column
WHEN MATCHED AND sub.val = 0 THEN
DELETE
WHEN MATCHED THEN
UPDATE SET val = t1.val + 1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (sub.id, sub.val);
-- Joining on partition columns with CTE
-- Joining on non-partition columns with CTE source, but INSERT incorrect column
WITH s1_res AS (
SELECT * FROM s1
)
@ -1846,7 +2088,7 @@ MERGE INTO t1
WHEN NOT MATCHED THEN
INSERT (id, val) VALUES (s1_res.id, s1_res.val);
-- With a single WHEN clause, which causes a non-left join
-- Join condition without target distribution column
WITH s1_res AS (
SELECT * FROM s1
)
@ -1953,34 +2195,12 @@ WHEN MATCHED THEN
WHEN NOT MATCHED THEN
INSERT VALUES(mv_source.id, mv_source.val);
-- Distributed tables *must* be colocated
-- Do not allow constant values into the distribution column
CREATE TABLE dist_target(id int, val varchar);
SELECT create_distributed_table('dist_target', 'id');
CREATE TABLE dist_source(id int, val varchar);
SELECT create_distributed_table('dist_source', 'id', colocate_with => 'none');
MERGE INTO dist_target
USING dist_source
ON dist_target.id = dist_source.id
WHEN MATCHED THEN
UPDATE SET val = dist_source.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_source.id, dist_source.val);
-- Distributed tables *must* be joined on distribution column
CREATE TABLE dist_colocated(id int, val int);
SELECT create_distributed_table('dist_colocated', 'id', colocate_with => 'dist_target');
MERGE INTO dist_target
USING dist_colocated
ON dist_target.id = dist_colocated.val -- val is not the distribution column
WHEN MATCHED THEN
UPDATE SET val = dist_colocated.val
WHEN NOT MATCHED THEN
INSERT VALUES(dist_colocated.id, dist_colocated.val);
-- Both the source and target must be distributed
MERGE INTO dist_target
USING (SELECT 100 id) AS source
ON dist_target.id = source.id AND dist_target.val = 'const'
@ -2055,7 +2275,6 @@ INSERT VALUES(dist_source.id, dist_source.val);
CREATE SCHEMA query_single_shard_table;
SET search_path TO query_single_shard_table;
SET client_min_messages TO DEBUG2;
CREATE TABLE nullkey_c1_t1(a int, b int);
CREATE TABLE nullkey_c1_t2(a int, b int);
@ -2068,15 +2287,17 @@ SELECT create_distributed_table('nullkey_c2_t1', null, colocate_with=>'none');
SELECT create_distributed_table('nullkey_c2_t2', null, colocate_with=>'nullkey_c2_t1', distribution_type=>null);
CREATE TABLE reference_table(a int, b int);
CREATE TABLE distributed_table(a int, b int);
CREATE TABLE citus_local_table(a int, b int);
SELECT create_reference_table('reference_table');
SELECT create_distributed_table('distributed_table', 'a');
SELECT citus_add_local_table_to_metadata('citus_local_table');
SET client_min_messages TO DEBUG2;
INSERT INTO reference_table SELECT i, i FROM generate_series(0, 5) i;
CREATE TABLE distributed_table(a int, b int);
SELECT create_distributed_table('distributed_table', 'a');
INSERT INTO distributed_table SELECT i, i FROM generate_series(3, 8) i;
CREATE TABLE citus_local_table(a int, b int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
INSERT INTO citus_local_table SELECT i, i FROM generate_series(0, 10) i;
CREATE TABLE postgres_local_table(a int, b int);
@ -2159,9 +2380,117 @@ WHEN MATCHED THEN UPDATE SET b = cte.b;
SET client_min_messages TO WARNING;
DROP SCHEMA query_single_shard_table CASCADE;
RESET client_min_messages;
SET search_path TO merge_schema;
-- Test Columnar table
CREATE TABLE target_columnar(cid int, name text) USING columnar;
SELECT create_distributed_table('target_columnar', 'cid');
MERGE INTO target_columnar t
USING demo_source_table s
ON t.cid = s.id2
WHEN MATCHED THEN
UPDATE SET name = 'Columnar table updated by MERGE'
WHEN NOT MATCHED THEN
DO NOTHING;
MERGE INTO demo_distributed t
USING generate_series(0,100) as source(key)
ON (source.key + 1 = t.id1)
WHEN MATCHED THEN UPDATE SET val1 = 15;
-- This should fail in planning stage itself
EXPLAIN MERGE INTO demo_distributed t
USING demo_source_table s
ON (s.id2 + 1 = t.id1)
WHEN MATCHED THEN UPDATE SET val1 = 15;
-- Sub-queries and CTEs are not allowed in actions and ON clause
CREATE TABLE target_1 (a int, b int, c int);
SELECT create_distributed_table('target_1', 'a');
CREATE TABLE source_2 (a int, b int, c int);
SELECT create_distributed_table('source_2', 'a');
INSERT INTO target_1 VALUES(1, 2, 3);
INSERT INTO target_1 VALUES(4, 5, 6);
INSERT INTO target_1 VALUES(11, 12, 13);
INSERT INTO source_2 VALUES(1, 2, 3);
WITH cte_1 as (SELECT max(a) as max_a, max(b) as b FROM source_2)
MERGE INTO target_1
USING cte_1
ON (target_1.a = cte_1.b)
WHEN NOT MATCHED AND (SELECT max_a > 10 FROM cte_1) THEN
INSERT VALUES (cte_1.b, 100);
WITH cte_1 as (SELECT a, b FROM source_2)
MERGE INTO target_1
USING cte_1
ON (target_1.a = cte_1.b)
WHEN NOT MATCHED AND (SELECT a > 10 FROM cte_1) THEN
INSERT VALUES (cte_1.b, 100);
MERGE INTO target_1
USING source_2
ON (target_1.a = source_2.b)
WHEN NOT MATCHED AND (SELECT max_a > 10 FROM (SELECT max(a) as max_a, max(b) as b FROM target_1) as foo) THEN
INSERT VALUES (source_2.b, 100);
-- or same with CTEs
WITH cte_1 as (SELECT max(a) as max_a, max(b) as b FROM target_1)
MERGE INTO target_1
USING source_2
ON (target_1.a = source_2.b)
WHEN NOT MATCHED AND (SELECT max_a > 10 FROM (SELECT max(a) as max_a, max(b) as b FROM target_1) as foo) THEN
INSERT VALUES (source_2.b, 100);
WITH cte_1 as (SELECT a, b FROM target_1), cte_2 as (select b,a from target_1)
MERGE INTO target_1
USING (SELECT * FROM source_2) as subq
ON (target_1.a = subq.b)
WHEN NOT MATCHED AND (SELECT a > 10 FROM cte_2) THEN
INSERT VALUES (subq.b, 100);
MERGE INTO source_2
USING target_1
ON (target_1.a = source_2.a)
WHEN MATCHED THEN
UPDATE SET b = (SELECT max(a) FROM source_2);
MERGE INTO source_2
USING target_1
ON (target_1.a = source_2.a)
WHEN NOT MATCHED THEN
INSERT VALUES (target_1.a,(select max(a) from target_1));
MERGE INTO target_1
USING source_2
ON (target_1.a = source_2.b)
WHEN NOT MATCHED AND (SELECT max(c) > 10 FROM source_2) THEN
INSERT VALUES (source_2.b, 100);
-- Test in ON clause
MERGE INTO target_1 t2
USING (SELECT * FROM source_2) AS t1
ON (t1.a = t2.a AND (SELECT 1=1 FROM target_1))
WHEN MATCHED THEN
DELETE;
MERGE INTO target_1 t2
USING (SELECT * FROM source_2) AS t1
ON (t1.a = t2.a AND (SELECT max(a) > 55 FROM target_1))
WHEN MATCHED THEN
DELETE;
WITH cte_1 as (SELECT a, b FROM target_1), cte_2 as (select b,a from target_1)
MERGE INTO target_1 t2
USING (SELECT * FROM cte_1) AS t1
ON (t1.a = t2.a AND (SELECT max(a) > 55 FROM cte_2))
WHEN MATCHED THEN
DELETE;
RESET client_min_messages;
DROP SERVER foreign_server CASCADE;
DROP FUNCTION merge_when_and_write();
DROP SCHEMA merge_schema CASCADE;

View File

@ -131,3 +131,48 @@ BEGIN;
EXECUTE local(0, 1);
SELECT * FROM t1 order by id;
ROLLBACK;
-- Test prepared statements with repartition
PREPARE merge_repartition_pg(int,int,int,int) as
MERGE INTO pg_target target
USING (SELECT id+1+$1 as key, val FROM (SELECT * FROM pg_source UNION SELECT * FROM pg_source WHERE id = $2) as foo) as source
ON (source.key = target.id AND $3 < 10000)
WHEN MATCHED THEN UPDATE SET val = (source.key::int+$4)
WHEN NOT MATCHED THEN INSERT VALUES (source.key, source.val);
PREPARE merge_repartition_citus(int,int,int,int) as
MERGE INTO citus_target target
USING (SELECT id+1+$1 as key, val FROM (SELECT * FROM citus_source UNION SELECT * FROM citus_source WHERE id = $2) as foo) as source
ON (source.key = target.id AND $3 < 10000)
WHEN MATCHED THEN UPDATE SET val = (source.key::int+$4)
WHEN NOT MATCHED THEN INSERT VALUES (source.key, source.val);
EXECUTE merge_repartition_pg(1,1,1,1);
EXECUTE merge_repartition_citus(1,1,1,1);
SET client_min_messages = NOTICE;
SELECT compare_data();
RESET client_min_messages;
EXECUTE merge_repartition_pg(1,100,1,1);
EXECUTE merge_repartition_citus(1,100,1,1);
EXECUTE merge_repartition_pg(2,200,1,1);
EXECUTE merge_repartition_citus(2,200,1,1);
EXECUTE merge_repartition_pg(3,300,1,1);
EXECUTE merge_repartition_citus(3,300,1,1);
EXECUTE merge_repartition_pg(4,400,1,1);
EXECUTE merge_repartition_citus(4,400,1,1);
EXECUTE merge_repartition_pg(5,500,1,1);
EXECUTE merge_repartition_citus(5,500,1,1);
-- Sixth time
EXECUTE merge_repartition_pg(6,600,1,6);
EXECUTE merge_repartition_citus(6,600,1,6);
SET client_min_messages = NOTICE;
SELECT compare_data();
RESET client_min_messages;

View File

@ -48,3 +48,57 @@ CREATE TABLE s1(id int, val int);
SELECT citus_add_local_table_to_metadata('t1');
SELECT citus_add_local_table_to_metadata('s1');
-- Test prepared statements with repartition
CREATE TABLE pg_target(id int, val int);
CREATE TABLE pg_source(id int, val int, const int);
CREATE TABLE citus_target(id int, val int);
CREATE TABLE citus_source(id int, val int, const int);
SELECT citus_add_local_table_to_metadata('pg_target');
SELECT citus_add_local_table_to_metadata('pg_source');
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(5001, 10000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(5001, 10000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
--
-- Target and source are distributed, and non-colocated
--
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');

View File

@ -0,0 +1,164 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- We create two sets of source and target tables, one set in Postgres and
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
-- and compare the final results of the target tables in Postgres and Citus.
-- The results should match. This process is repeated for various combinations
-- of MERGE SQL.
DROP SCHEMA IF EXISTS merge_partition_tables CASCADE;
CREATE SCHEMA merge_partition_tables;
SET search_path TO merge_partition_tables;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 7000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
CREATE TABLE pg_target(id int, val int) PARTITION BY RANGE(id);
CREATE TABLE pg_source(id int, val int, const int) PARTITION BY RANGE(val);
CREATE TABLE citus_target(id int, val int) PARTITION BY RANGE(id);
CREATE TABLE citus_source(id int, val int, const int) PARTITION BY RANGE(val);
SELECT citus_add_local_table_to_metadata('citus_target');
SELECT citus_add_local_table_to_metadata('citus_source');
CREATE TABLE part1 PARTITION OF pg_target FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part2 PARTITION OF pg_target FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part3 PARTITION OF pg_target FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part4 PARTITION OF pg_target DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part5 PARTITION OF citus_target FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part6 PARTITION OF citus_target FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part7 PARTITION OF citus_target FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part8 PARTITION OF citus_target DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part9 PARTITION OF pg_source FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part10 PARTITION OF pg_source FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part11 PARTITION OF pg_source FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part12 PARTITION OF pg_source DEFAULT WITH (autovacuum_enabled=off);
CREATE TABLE part13 PARTITION OF citus_source FOR VALUES FROM (1) TO (2500) WITH (autovacuum_enabled=off);
CREATE TABLE part14 PARTITION OF citus_source FOR VALUES FROM (2501) TO (5000) WITH (autovacuum_enabled=off);
CREATE TABLE part15 PARTITION OF citus_source FOR VALUES FROM (5001) TO (7500) WITH (autovacuum_enabled=off);
CREATE TABLE part16 PARTITION OF citus_source DEFAULT WITH (autovacuum_enabled=off);
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
TRUNCATE pg_target;
TRUNCATE pg_source;
TRUNCATE citus_target;
TRUNCATE citus_source;
SELECT undistribute_table('citus_target');
SELECT undistribute_table('citus_source');
$$
LANGUAGE SQL;
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(5001, 10000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(5001, 10000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID SET search_path TO merge_partition_tables AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_partition_tables AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
-- Test colocated partition tables
SET client_min_messages = ERROR;
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
RESET client_min_messages;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
-- Test non-colocated partition tables
SET client_min_messages = ERROR;
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
RESET client_min_messages;
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
DROP SCHEMA merge_partition_tables CASCADE;

View File

@ -0,0 +1,515 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- We create two sets of source and target tables, one set in Postgres and
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
-- and compare the final results of the target tables in Postgres and Citus.
-- The results should match. This process is repeated for various combinations
-- of MERGE SQL.
DROP SCHEMA IF EXISTS merge_repartition1_schema CASCADE;
CREATE SCHEMA merge_repartition1_schema;
SET search_path TO merge_repartition1_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 5000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
CREATE TABLE pg_target(id int, val int);
CREATE TABLE pg_source(id int, val int, const int);
CREATE TABLE citus_target(id int, val int);
CREATE TABLE citus_source(id int, val int, const int);
SELECT citus_add_local_table_to_metadata('citus_target');
SELECT citus_add_local_table_to_metadata('citus_source');
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_repartition1_schema AS $$
TRUNCATE pg_target;
TRUNCATE pg_source;
TRUNCATE citus_target;
TRUNCATE citus_source;
SELECT undistribute_table('citus_target');
SELECT undistribute_table('citus_source');
$$
LANGUAGE SQL;
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_repartition1_schema AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(5001, 10000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 10000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(5001, 10000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID SET search_path TO merge_repartition1_schema AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_repartition1_schema AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
--
-- Target and source are distributed, and non-colocated
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
--
-- Target and source are distributed, and colocated but not joined on distribution column
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
MERGE INTO pg_target t
USING (SELECT * FROM pg_source) subq
ON (subq.val = t.id)
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = subq.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(subq.val, subq.id);
MERGE INTO citus_target t
USING (SELECT * FROM citus_source) subq
ON (subq.val = t.id)
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = subq.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(subq.val, subq.id);
SELECT compare_data();
--
-- Target and source are distributed, colocated, joined on distribution column
-- but with nondistribution values
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
MERGE INTO pg_target t
USING (SELECT id,const FROM pg_source UNION SELECT const,id FROM pg_source ) AS s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET val = s.const + 1
WHEN NOT MATCHED THEN
INSERT VALUES(id, const);
MERGE INTO citus_target t
USING (SELECT id,const FROM citus_source UNION SELECT const,id FROM citus_source) AS s
ON t.id = s.id
WHEN MATCHED THEN
UPDATE SET val = s.const + 1
WHEN NOT MATCHED THEN
INSERT VALUES(id, const);
SELECT compare_data();
--
-- Repartition with a predicate on target_table_name rows in ON clause
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
MERGE INTO pg_target t
USING (SELECT * FROM pg_source WHERE id < 9500) s
ON t.id = s.id AND t.id < 9000
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING (SELECT * FROM citus_source WHERE id < 9500) s
ON t.id = s.id AND t.id < 9000
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
--
-- Test CTE and non-colocated tables
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
WITH cte AS (
SELECT * FROM pg_source
)
MERGE INTO pg_target t
USING cte s
ON s.id = t.id
WHEN MATCHED AND t.id > 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
WITH cte AS (
SELECT * FROM citus_source
)
MERGE INTO citus_target t
USING cte s
ON s.id = t.id
WHEN MATCHED AND t.id > 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
SELECT compare_data();
--
-- Test nested CTEs
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
WITH cte1 AS (
SELECT * FROM pg_source ORDER BY 1 LIMIT 9000
),
cte2 AS(
SELECT * FROM cte1
),
cte3 AS(
SELECT * FROM cte2
)
MERGE INTO pg_target t
USING cte3 s
ON (s.id=t.id)
WHEN MATCHED AND t.id > 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
WITH cte1 AS (
SELECT * FROM citus_source ORDER BY 1 LIMIT 9000
),
cte2 AS(
SELECT * FROM cte1
),
cte3 AS(
SELECT * FROM cte2
)
MERGE INTO citus_target t
USING cte3 s
ON (s.id=t.id)
WHEN MATCHED AND t.id > 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
SELECT compare_data();
--
-- Target and source are distributed and colocated
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with => 'citus_target');
MERGE INTO pg_target t
USING (SELECT 999 as newval, pg_source.* FROM (SELECT * FROM pg_source ORDER BY 1 LIMIT 6000) as src LEFT JOIN pg_source USING(id)) AS s
ON t.id = s.id
WHEN MATCHED AND t.id <= 5500 THEN
UPDATE SET val = newval
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(id, newval);
MERGE INTO citus_target t
USING (SELECT 999 as newval, citus_source.* FROM (SELECT * FROM citus_source ORDER BY 1 LIMIT 6000) as src LEFT JOIN citus_source USING(id)) AS s
ON t.id = s.id
WHEN MATCHED AND t.id <= 5500 THEN
UPDATE SET val = newval
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(id, newval);
SELECT compare_data();
--
-- Target is distributed and source is reference
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_reference_table('citus_source');
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
--
-- Target is distributed and reference as source in a sub-query
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_reference_table('citus_source');
MERGE INTO pg_target t
USING (SELECT * FROM pg_source UNION SELECT * FROM pg_source) AS s ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + t.val
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING (SELECT * FROM citus_source UNION SELECT * FROM citus_source) AS s ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + t.val
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
--
-- Target is distributed and citus-local as source
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT citus_add_local_table_to_metadata('citus_source');
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
--
-- Target and source distributed and non-colocated. The source query requires evaluation
-- at the coordinator
--
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
MERGE INTO pg_target t
USING (SELECT 100 AS insval, MAX(const) AS updval, val, MAX(id) AS sid
FROM pg_source
GROUP BY val ORDER BY sid LIMIT 6000) AS s
ON t.id = s.sid
WHEN MATCHED AND t.id <= 5500 THEN
UPDATE SET val = updval + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(sid, insval);
MERGE INTO citus_target t
USING (SELECT 100 AS insval, MAX(const) AS updval, val, MAX(id) AS sid
FROM citus_source
GROUP BY val ORDER BY sid LIMIT 6000) AS s
ON t.id = s.sid
WHEN MATCHED AND t.id <= 5500 THEN
UPDATE SET val = updval + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(sid, insval);
SELECT compare_data();
-- Test source-query that requires repartitioning on top of MERGE repartitioning
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
MERGE INTO pg_target t
USING (SELECT s1.val FROM pg_source s1 JOIN pg_source s2 USING (val)) AS s
ON t.id = s.val
WHEN MATCHED THEN
UPDATE SET val = t.val + 1;
SET citus.enable_repartition_joins TO true;
MERGE INTO citus_target t
USING (SELECT s1.val FROM citus_source s1 JOIN citus_source s2 USING (val)) AS s
ON t.id = s.val
WHEN MATCHED THEN
UPDATE SET val = t.val + 1;
SELECT compare_data();
--
-- Test columnar as source table
--
SET client_min_messages TO WARNING;
SELECT cleanup_data();
RESET client_min_messages;
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
SELECT alter_table_set_access_method('citus_source', 'columnar');
MERGE INTO pg_target t
USING pg_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
MERGE INTO citus_target t
USING citus_source s
ON t.id = s.id
WHEN MATCHED AND t.id <= 7500 THEN
UPDATE SET val = s.val + 1
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.id, s.val);
SELECT compare_data();
SELECT alter_table_set_access_method('citus_source', 'heap');
-- Test CTE/Subquery in merge-actions (works only for router query)
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'citus_target');
MERGE INTO pg_target
USING pg_source
ON (pg_target.id = pg_source.id)
WHEN MATCHED AND (SELECT max_a > 5001 FROM (SELECT max(id) as max_a, max(val) as b FROM pg_target WHERE id = pg_source.id) AS foo) THEN
DELETE
WHEN NOT MATCHED AND (SELECT max_a < 5001 FROM (SELECT max(id) as max_a, max(val) as b FROM pg_target WHERE id = pg_source.id) AS foo) THEN
INSERT VALUES (pg_source.id, 100);
MERGE INTO citus_target
USING citus_source
ON (citus_target.id = citus_source.id)
WHEN MATCHED AND (SELECT max_a > 5001 FROM (SELECT max(id) as max_a, max(val) as b FROM citus_target WHERE id = citus_source.id) AS foo) THEN
DELETE
WHEN NOT MATCHED AND (SELECT max_a < 5001 FROM (SELECT max(id) as max_a, max(val) as b FROM citus_target WHERE id = citus_source.id) AS foo) THEN
INSERT VALUES (citus_source.id, 100);
SELECT compare_data();
DROP SCHEMA merge_repartition1_schema CASCADE;

View File

@ -0,0 +1,139 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15
\gset
\if :server_version_ge_15
\else
\q
\endif
-- We create two sets of source and target tables, one set in Postgres and
-- the other in Citus distributed. We run the _exact_ MERGE SQL on both sets
-- and compare the final results of the target tables in Postgres and Citus.
-- The results should match. This process is repeated for various combinations
-- of MERGE SQL.
DROP SCHEMA IF EXISTS merge_repartition2_schema CASCADE;
CREATE SCHEMA merge_repartition2_schema;
SET search_path TO merge_repartition2_schema;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 6000000;
SET citus.explain_all_tasks TO true;
SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
CREATE TABLE pg_target(id int, val int);
CREATE TABLE pg_source(id int, val int, const int);
CREATE TABLE citus_target(id int, val int);
CREATE TABLE citus_source(id int, val int, const int);
SELECT citus_add_local_table_to_metadata('citus_target');
SELECT citus_add_local_table_to_metadata('citus_source');
CREATE OR REPLACE FUNCTION cleanup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
TRUNCATE pg_target;
TRUNCATE pg_source;
TRUNCATE citus_target;
TRUNCATE citus_source;
SELECT undistribute_table('citus_target');
SELECT undistribute_table('citus_source');
$$
LANGUAGE SQL;
--
-- Load same set of data to both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION setup_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
INSERT INTO pg_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
INSERT INTO pg_target SELECT i, 1 FROM generate_series(50001, 100000) i;
INSERT INTO citus_source SELECT i, i+1, 1 FROM generate_series(1, 100000) i;
INSERT INTO citus_target SELECT i, 1 FROM generate_series(50001, 100000) i;
$$
LANGUAGE SQL;
--
-- Compares the final target tables, merge-modified data, of both Postgres and Citus tables
--
CREATE OR REPLACE FUNCTION check_data(table1_name text, column1_name text, table2_name text, column2_name text)
RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
DECLARE
table1_avg numeric;
table2_avg numeric;
BEGIN
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column1_name, table1_name) INTO table1_avg;
EXECUTE format('SELECT COALESCE(AVG(%I), 0) FROM %I', column2_name, table2_name) INTO table2_avg;
IF table1_avg > table2_avg THEN
RAISE EXCEPTION 'The average of %.% is greater than %.%', table1_name, column1_name, table2_name, column2_name;
ELSIF table1_avg < table2_avg THEN
RAISE EXCEPTION 'The average of %.% is less than %.%', table1_name, column1_name, table2_name, column2_name;
ELSE
RAISE NOTICE 'The average of %.% is equal to %.%', table1_name, column1_name, table2_name, column2_name;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compare_data() RETURNS VOID SET search_path TO merge_repartition2_schema AS $$
SELECT check_data('pg_target', 'id', 'citus_target', 'id');
SELECT check_data('pg_target', 'val', 'citus_target', 'val');
$$
LANGUAGE SQL;
-- Test nested cte
SELECT cleanup_data();
SELECT setup_data();
SELECT create_distributed_table('citus_target', 'id');
SELECT create_distributed_table('citus_source', 'id', colocate_with=>'none');
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM pg_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
MERGE INTO pg_target t
USING (SELECT const, val, id FROM pg_source WHERE id IN (SELECT id FROM cte_top)) as s
ON (s.id = t.id)
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.val::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
WITH cte_top AS(WITH cte_1 AS (WITH cte_2 AS (SELECT id, val FROM citus_source) SELECT * FROM cte_2) SELECT * FROM cte_1)
MERGE INTO citus_target t
USING (SELECT const, val, id FROM citus_source WHERE id IN (SELECT id FROM cte_top)) as s
ON (s.id = t.id)
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.val::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES (s.id, s.val);
SELECT compare_data();
-- Test aggregate function in source query
MERGE INTO pg_target t
USING (SELECT count(id+1)::text as value, val as key FROM pg_source group by key) s
ON t.id = s.key
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.value::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.key, value::int4+10);
MERGE INTO citus_target t
USING (SELECT count(id+1)::text as value, val as key FROM citus_source group by key) s
ON t.id = s.key
WHEN MATCHED AND t.id <= 75000 THEN
UPDATE SET val = (s.value::int8+1)
WHEN MATCHED THEN
DELETE
WHEN NOT MATCHED THEN
INSERT VALUES(s.key, value::int4+10);
SELECT compare_data();
DROP SCHEMA merge_repartition2_schema CASCADE;

View File

@ -255,7 +255,7 @@ SELECT create_distributed_table('tbl2', 'x');
MERGE INTO tbl1 USING tbl2 ON (true)
WHEN MATCHED THEN DELETE;
-- also, not inside subqueries & ctes
-- also, inside subqueries & ctes
WITH targq AS (
SELECT * FROM tbl2
)

View File

@ -19,6 +19,10 @@ SET citus.use_citus_managed_tables to true;
SET citus.next_shard_id TO 4001000;
SET client_min_messages = warning;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
CREATE USER regress_merge_privs;
CREATE USER regress_merge_no_privs;
DROP TABLE IF EXISTS target;