Merge pull request #3369 from citusdata/move_fast_path_pruning_to_executor

Defer shard pruning for fast-path router queries to execution
pull/3389/head
Önder Kalacı 2020-01-16 17:35:33 +01:00 committed by GitHub
commit 89d5bed88d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 950 additions and 265 deletions

View File

@ -39,6 +39,12 @@ static Node * DelayedErrorCreateScan(CustomScan *scan);
/* functions that are common to different scans */ /* functions that are common to different scans */
static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int
eflags);
static void HandleDeferredShardPruningForFastPathQueries(
DistributedPlan *distributedPlan);
static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan);
static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags);
static void CitusEndScan(CustomScanState *node); static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node); static void CitusReScan(CustomScanState *node);
@ -114,13 +120,11 @@ RegisterCitusCustomScanMethods(void)
* CitusBeginScan sets the coordinator backend initiated by Citus for queries using * CitusBeginScan sets the coordinator backend initiated by Citus for queries using
* that function as the BeginCustomScan callback. * that function as the BeginCustomScan callback.
* *
* The function also handles modification scan actions. * The function also handles deferred shard pruning along with function evaluations.
*/ */
static void static void
CitusBeginScan(CustomScanState *node, EState *estate, int eflags) CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
{ {
DistributedPlan *distributedPlan = NULL;
MarkCitusInitiatedCoordinatorBackend(); MarkCitusInitiatedCoordinatorBackend();
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
@ -129,11 +133,17 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple); ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple);
#endif #endif
distributedPlan = scanState->distributedPlan; DistributedPlan *distributedPlan = scanState->distributedPlan;
Job *workerJob = distributedPlan->workerJob;
if (workerJob &&
(workerJob->requiresMasterEvaluation || workerJob->deferredPruning))
{
CitusGenerateDeferredQueryStrings(node, estate, eflags);
}
if (distributedPlan->modLevel == ROW_MODIFY_READONLY || if (distributedPlan->modLevel == ROW_MODIFY_READONLY ||
distributedPlan->insertSelectQuery != NULL) distributedPlan->insertSelectQuery != NULL)
{ {
/* no more action required */
return; return;
} }
@ -175,6 +185,38 @@ CitusExecScan(CustomScanState *node)
*/ */
static void static void
CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
{
CitusScanState *scanState = (CitusScanState *) node;
DistributedPlan *distributedPlan = scanState->distributedPlan;
/*
* If we have not already copied the plan into this context, do it now.
* Note that we could have copied the plan during CitusGenerateDeferredQueryStrings.
*/
if (GetMemoryChunkContext(distributedPlan) != CurrentMemoryContext)
{
distributedPlan = copyObject(distributedPlan);
scanState->distributedPlan = distributedPlan;
}
Job *workerJob = distributedPlan->workerJob;
List *taskList = workerJob->taskList;
/* prevent concurrent placement changes */
AcquireMetadataLocks(taskList);
/* modify tasks are always assigned using first-replica policy */
workerJob->taskList = FirstReplicaAssignTaskList(taskList);
}
/*
* CitusGenerateDeferredQueryStrings generates query strings at the start of the execution
* in two cases: when the query requires master evaluation and/or deferred shard pruning.
*/
static void
CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int eflags)
{ {
CitusScanState *scanState = (CitusScanState *) node; CitusScanState *scanState = (CitusScanState *) node;
@ -188,12 +230,32 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
Job *workerJob = distributedPlan->workerJob; Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery; Query *jobQuery = workerJob->jobQuery;
List *taskList = workerJob->taskList;
if (workerJob->requiresMasterEvaluation) /* we'd only get to this function with the following conditions */
Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning);
PlanState *planState = &(scanState->customScanState.ss.ps);
EState *executorState = planState->state;
/* citus only evaluates functions for modification queries */
bool modifyQueryRequiresMasterEvaluation =
workerJob->requiresMasterEvaluation && jobQuery->commandType != CMD_SELECT;
/*
* ExecuteMasterEvaluableFunctions handles both function evalation
* and parameter evaluation. Pruning is most likely deferred because
* there is a parameter on the distribution key. So, evaluate in both
* cases.
*/
bool shoudEvaluteFunctionsOrParams =
modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning;
if (shoudEvaluteFunctionsOrParams)
{ {
PlanState *planState = &(scanState->customScanState.ss.ps); distributedPlan = (scanState->distributedPlan);
EState *executorState = planState->state; scanState->distributedPlan = distributedPlan;
workerJob = distributedPlan->workerJob;
jobQuery = workerJob->jobQuery;
ExecuteMasterEvaluableFunctions(jobQuery, planState); ExecuteMasterEvaluableFunctions(jobQuery, planState);
@ -204,37 +266,107 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags)
* parameter values, we set the parameter list to NULL. * parameter values, we set the parameter list to NULL.
*/ */
executorState->es_param_list_info = NULL; executorState->es_param_list_info = NULL;
if (workerJob->deferredPruning)
{
DeferredErrorMessage *planningError = NULL;
/* need to perform shard pruning, rebuild the task list from scratch */
taskList = RouterInsertTaskList(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
workerJob->taskList = taskList;
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
}
RebuildQueryStrings(jobQuery, taskList);
} }
/* prevent concurrent placement changes */ /*
AcquireMetadataLocks(taskList); * After evaluating the function/parameters, we're done unless shard pruning
* is also deferred.
*/
if (!workerJob->deferredPruning)
{
RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList);
return;
}
/* at this point, we're about to do the shard pruning */
Assert(workerJob->deferredPruning);
if (jobQuery->commandType == CMD_INSERT)
{
HandleDeferredShardPruningForInserts(distributedPlan);
}
else
{
HandleDeferredShardPruningForFastPathQueries(distributedPlan);
}
}
/*
* HandleDeferredShardPruningForInserts does the shard pruning for INSERT
* queries and rebuilds the query strings.
*/
static void
HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan)
{
Job *workerJob = distributedPlan->workerJob;
Query *jobQuery = workerJob->jobQuery;
DeferredErrorMessage *planningError = NULL;
/* need to perform shard pruning, rebuild the task list from scratch */
List *taskList = RouterInsertTaskList(jobQuery, &planningError);
if (planningError != NULL)
{
RaiseDeferredError(planningError, ERROR);
}
workerJob->taskList = taskList;
workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery);
RebuildQueryStrings(jobQuery, workerJob->taskList);
}
/*
* HandleDeferredShardPruningForFastPathQueries does the shard pruning for
* UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings.
*/
static void
HandleDeferredShardPruningForFastPathQueries(DistributedPlan *distributedPlan)
{
Assert(distributedPlan->fastPathRouterPlan);
Job *workerJob = distributedPlan->workerJob;
bool isMultiShardQuery = false;
List *shardIntervalList =
TargetShardIntervalForFastPathQuery(workerJob->jobQuery,
&workerJob->partitionKeyValue,
&isMultiShardQuery, NULL);
/* /*
* We are taking locks on partitions of partitioned tables. These locks are * A fast-path router query can only yield multiple shards when the parameter
* necessary for locking tables that appear in the SELECT part of the query. * cannot be resolved properly, which can be triggered by SQL function.
*/ */
LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); if (isMultiShardQuery)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot perform distributed planning on this "
"query because parameterized queries for SQL "
"functions referencing distributed tables are "
"not supported"),
errhint("Consider using PL/pgSQL functions instead.")));
}
/* modify tasks are always assigned using first-replica policy */ bool shardsPresent = false;
workerJob->taskList = FirstReplicaAssignTaskList(taskList); List *relationShardList =
RelationShardListForShardIntervalList(shardIntervalList, &shardsPresent);
UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList);
List *placementList =
FindRouterWorkerList(shardIntervalList, shardsPresent, true);
uint64 shardId = INVALID_SHARD_ID;
if (shardsPresent)
{
shardId = GetAnchorShardId(shardIntervalList);
}
GenerateSingleShardRouterTaskList(workerJob,
relationShardList,
placementList, shardId);
} }
@ -287,7 +419,8 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan)
scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->customScanState.ss.ps.type = T_CustomScanState;
scanState->distributedPlan = GetDistributedPlan(scan); scanState->distributedPlan = GetDistributedPlan(scan);
scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; scanState->customScanState.methods =
&CoordinatorInsertSelectCustomExecMethods;
return (Node *) scanState; return (Node *) scanState;
} }
@ -380,7 +513,8 @@ CitusReScan(CustomScanState *node)
TupleDesc TupleDesc
ScanStateGetTupleDescriptor(CitusScanState *scanState) ScanStateGetTupleDescriptor(CitusScanState *scanState)
{ {
return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; return scanState->customScanState.ss.ps.ps_ResultTupleSlot->
tts_tupleDescriptor;
} }

View File

@ -122,7 +122,7 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList);
static bool QueryIsNotSimpleSelect(Node *node); static bool QueryIsNotSimpleSelect(Node *node);
static bool UpdateReferenceTablesWithShard(Node *node, void *context); static bool UpdateReferenceTablesWithShard(Node *node, void *context);
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Const *distributionKeyValue); Node *distributionKeyValue);
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList, int rteIdCounter); List *rangeTableList, int rteIdCounter);
@ -136,7 +136,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
List *rangeTableList = ExtractRangeTableEntryList(parse); List *rangeTableList = ExtractRangeTableEntryList(parse);
int rteIdCounter = 1; int rteIdCounter = 1;
bool fastPathRouterQuery = false; bool fastPathRouterQuery = false;
Const *distributionKeyValue = NULL; Node *distributionKeyValue = NULL;
DistributedPlanningContext planContext = { DistributedPlanningContext planContext = {
.query = parse, .query = parse,
.cursorOptions = cursorOptions, .cursorOptions = cursorOptions,
@ -551,12 +551,26 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan)
*/ */
static PlannedStmt * static PlannedStmt *
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
Const *distributionKeyValue) Node *distributionKeyValue)
{ {
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
planContext->plannerRestrictionContext->fastPathRestrictionContext-> planContext->plannerRestrictionContext->fastPathRestrictionContext->
fastPathRouterQuery = true; fastPathRouterQuery = true;
planContext->plannerRestrictionContext->fastPathRestrictionContext->
distributionKeyValue = distributionKeyValue; if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
planContext->boundParams); planContext->boundParams);

View File

@ -59,9 +59,9 @@ bool EnableFastPathRouterPlanner = true;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool ConjunctionContainsColumnFilter(Node *node, Var *column, static bool ConjunctionContainsColumnFilter(Node *node, Var *column,
Const **distributionKeyValue); Node **distributionKeyValue);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
Const **distributionKeyValue); Node **distributionKeyValue);
/* /*
@ -75,21 +75,6 @@ static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
PlannedStmt * PlannedStmt *
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
{ {
/*
* To support prepared statements for fast-path queries, we resolve the
* external parameters at this point. Note that this is normally done by
* eval_const_expr() in standard planner when the boundParams are avaliable.
* If not avaliable, as does for all other types of queries, Citus goes
* through the logic of increasing the cost of the plan and forcing
* PostgreSQL to pick custom plans.
*
* We're also only interested in resolving the quals since we'd want to
* do shard pruning based on the filter on the distribution column.
*/
originalQuery->jointree->quals =
ResolveExternalParams((Node *) originalQuery->jointree->quals,
copyParamList(boundParams));
/* /*
* Citus planner relies on some of the transformations on constant * Citus planner relies on some of the transformations on constant
* evaluation on the parse tree. * evaluation on the parse tree.
@ -124,7 +109,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
SeqScan *seqScanNode = makeNode(SeqScan); SeqScan *seqScanNode = makeNode(SeqScan);
Plan *plan = &seqScanNode->plan; Plan *plan = &seqScanNode->plan;
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
AssertArg(FastPathRouterQuery(parse, &distKey)); AssertArg(FastPathRouterQuery(parse, &distKey));
@ -171,7 +156,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
* don't have any sublinks/CTEs etc * don't have any sublinks/CTEs etc
*/ */
bool bool
FastPathRouterQuery(Query *query, Const **distributionKeyValue) FastPathRouterQuery(Query *query, Node **distributionKeyValue)
{ {
FromExpr *joinTree = query->jointree; FromExpr *joinTree = query->jointree;
Node *quals = NULL; Node *quals = NULL;
@ -307,7 +292,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey)
* If the conjuction contains column filter which is const, distributionKeyValue is set. * If the conjuction contains column filter which is const, distributionKeyValue is set.
*/ */
static bool static bool
ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKeyValue) ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue)
{ {
if (node == NULL) if (node == NULL)
{ {
@ -369,7 +354,7 @@ ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKey
* When a const is found, distributionKeyValue is set. * When a const is found, distributionKeyValue is set.
*/ */
static bool static bool
DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionKeyValue) DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue)
{ {
Node *leftOperand = NULL; Node *leftOperand = NULL;
Node *rightOperand = NULL; Node *rightOperand = NULL;
@ -436,7 +421,11 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionK
*distributionKeyValue == NULL) *distributionKeyValue == NULL)
{ {
/* if the vartypes do not match, let shard pruning handle it later */ /* if the vartypes do not match, let shard pruning handle it later */
*distributionKeyValue = copyObject(constantClause); *distributionKeyValue = (Node *) copyObject(constantClause);
}
else if (paramClause != NULL)
{
*distributionKeyValue = (Node *) copyObject(paramClause);
} }
return distColumnExists; return distColumnExists;

View File

@ -153,11 +153,6 @@ static bool SelectsFromDistributedTable(List *rangeTableList, Query *query);
static List * get_all_actual_clauses(List *restrictinfo_list); static List * get_all_actual_clauses(List *restrictinfo_list);
static int CompareInsertValuesByShardId(const void *leftElement, static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement); const void *rightElement);
static uint64 GetAnchorShardId(List *relationShardList);
static List * TargetShardIntervalForFastPathQuery(Query *query,
Const **partitionValueConst,
bool *isMultiShardQuery,
Const *distributionKeyValue);
static List * SingleShardSelectTaskList(Query *query, uint64 jobId, static List * SingleShardSelectTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList, List *relationShardList, List *placementList,
uint64 shardId); uint64 shardId);
@ -165,11 +160,11 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static List * SingleShardModifyTaskList(Query *query, uint64 jobId, static List * SingleShardModifyTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList, List *relationShardList, List *placementList,
uint64 shardId); uint64 shardId);
static List * RemoveCoordinatorPlacement(List *placementList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType TaskAssignmentPolicyType
taskAssignmentPolicy, taskAssignmentPolicy,
List *placementList); List *placementList);
static List * RemoveCoordinatorPlacement(List *placementList);
/* /*
@ -248,6 +243,10 @@ CreateModifyPlan(Query *originalQuery, Query *query,
distributedPlan->hasReturning = true; distributedPlan->hasReturning = true;
} }
distributedPlan->fastPathRouterPlan =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
return distributedPlan; return distributedPlan;
} }
@ -1634,9 +1633,6 @@ ExtractFirstDistributedTableId(Query *query)
List *rangeTableList = query->rtable; List *rangeTableList = query->rtable;
ListCell *rangeTableCell = NULL; ListCell *rangeTableCell = NULL;
Oid distributedTableId = InvalidOid; Oid distributedTableId = InvalidOid;
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(IsModifyCommand(query) || FastPathRouterQuery(query, &distKey));
foreach(rangeTableCell, rangeTableList) foreach(rangeTableCell, rangeTableList)
{ {
@ -1673,13 +1669,34 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
/* check if this query requires master evaluation */ /* check if this query requires master evaluation */
bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery);
FastPathRestrictionContext *fastPathRestrictionContext =
plannerRestrictionContext->fastPathRestrictionContext;
/*
* We prefer to defer shard pruning/task generation to the
* execution when the parameter on the distribution key
* cannot be resolved.
*/
if (fastPathRestrictionContext->fastPathRouterQuery &&
fastPathRestrictionContext->distributionKeyHasParam)
{
Job *job = CreateJob(originalQuery);
job->deferredPruning = true;
ereport(DEBUG2, (errmsg("Deferred pruning for a fast-path router "
"query")));
return job;
}
else
{
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
&placementList, &shardId, &relationShardList,
&prunedShardIntervalListList,
replacePrunedQueryWithDummy,
&isMultiShardModifyQuery,
&partitionKeyValue);
}
(*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext,
&placementList, &shardId, &relationShardList,
&prunedShardIntervalListList,
replacePrunedQueryWithDummy,
&isMultiShardModifyQuery,
&partitionKeyValue);
if (*planningError) if (*planningError)
{ {
return NULL; return NULL;
@ -1703,6 +1720,39 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
return job; return job;
} }
if (isMultiShardModifyQuery)
{
job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
plannerRestrictionContext->
relationRestrictionContext,
prunedShardIntervalListList,
MODIFY_TASK,
requiresMasterEvaluation);
}
else
{
GenerateSingleShardRouterTaskList(job, relationShardList,
placementList, shardId);
}
job->requiresMasterEvaluation = requiresMasterEvaluation;
return job;
}
/*
* SingleShardRouterTaskList is a wrapper around other corresponding task
* list generation functions specific to single shard selects and modifications.
*
* The function updates the input job's taskList in-place.
*/
void
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
List *placementList, uint64 shardId)
{
Query *originalQuery = job->jobQuery;
if (originalQuery->commandType == CMD_SELECT) if (originalQuery->commandType == CMD_SELECT)
{ {
job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId, job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId,
@ -1724,15 +1774,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
placementList); placementList);
} }
} }
else if (isMultiShardModifyQuery)
{
job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId,
plannerRestrictionContext->
relationRestrictionContext,
prunedShardIntervalListList,
MODIFY_TASK,
requiresMasterEvaluation);
}
else if (shardId == INVALID_SHARD_ID) else if (shardId == INVALID_SHARD_ID)
{ {
/* modification that prunes to 0 shards */ /* modification that prunes to 0 shards */
@ -1744,9 +1785,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
relationShardList, placementList, relationShardList, placementList,
shardId); shardId);
} }
job->requiresMasterEvaluation = requiresMasterEvaluation;
return job;
} }
@ -2023,14 +2061,9 @@ PlanRouterQuery(Query *originalQuery,
bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery,
Const **partitionValueConst) Const **partitionValueConst)
{ {
static uint32 zeroShardQueryRoundRobin = 0;
bool isMultiShardQuery = false; bool isMultiShardQuery = false;
DeferredErrorMessage *planningError = NULL; DeferredErrorMessage *planningError = NULL;
ListCell *prunedShardIntervalListCell = NULL;
List *workerList = NIL;
bool shardsPresent = false; bool shardsPresent = false;
uint64 shardId = INVALID_SHARD_ID;
CmdType commandType = originalQuery->commandType; CmdType commandType = originalQuery->commandType;
bool fastPathRouterQuery = bool fastPathRouterQuery =
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
@ -2065,7 +2098,7 @@ PlanRouterQuery(Query *originalQuery,
return planningError; return planningError;
} }
*prunedShardIntervalListList = list_make1(shardIntervalList); *prunedShardIntervalListList = shardIntervalList;
if (!isMultiShardQuery) if (!isMultiShardQuery)
{ {
@ -2111,29 +2144,18 @@ PlanRouterQuery(Query *originalQuery,
} }
} }
foreach(prunedShardIntervalListCell, *prunedShardIntervalListList) *relationShardList =
RelationShardListForShardIntervalList(*prunedShardIntervalListList,
&shardsPresent);
if (!shardsPresent && !replacePrunedQueryWithDummy)
{ {
List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell); /*
ListCell *shardIntervalCell = NULL; * For INSERT ... SELECT, this query could be still a valid for some other target
* shard intervals. Thus, we should return empty list if there aren't any matching
/* no shard is present or all shards are pruned out case will be handled later */ * workers, so that the caller can decide what to do with this task.
if (prunedShardIntervalList == NIL) */
{ return NULL;
continue;
}
shardsPresent = true;
foreach(shardIntervalCell, prunedShardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
*relationShardList = lappend(*relationShardList, relationShard);
}
} }
/* /*
@ -2149,48 +2171,11 @@ PlanRouterQuery(Query *originalQuery,
} }
/* we need anchor shard id for select queries with router planner */ /* we need anchor shard id for select queries with router planner */
shardId = GetAnchorShardId(*prunedShardIntervalListList); uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList);
/* List *workerList =
* Determine the worker that has all shard placements if a shard placement found. FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent,
* If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will replacePrunedQueryWithDummy);
* still run the query but the result will be empty. We create a dummy shard
* placement for the first active worker.
*/
if (shardsPresent)
{
workerList = WorkersContainingAllShards(*prunedShardIntervalListList);
}
else if (replacePrunedQueryWithDummy)
{
List *workerNodeList = ActiveReadableWorkerNodeList();
if (workerNodeList != NIL)
{
int workerNodeCount = list_length(workerNodeList);
int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList,
workerNodeIndex);
ShardPlacement *dummyPlacement =
(ShardPlacement *) CitusMakeNode(ShardPlacement);
dummyPlacement->nodeName = workerNode->workerName;
dummyPlacement->nodePort = workerNode->workerPort;
dummyPlacement->nodeId = workerNode->nodeId;
dummyPlacement->groupId = workerNode->groupId;
workerList = lappend(workerList, dummyPlacement);
zeroShardQueryRoundRobin++;
}
}
else
{
/*
* For INSERT ... SELECT, this query could be still a valid for some other target
* shard intervals. Thus, we should return empty list if there aren't any matching
* workers, so that the caller can decide what to do with this task.
*/
return NULL;
}
if (workerList == NIL) if (workerList == NIL)
{ {
@ -2219,6 +2204,89 @@ PlanRouterQuery(Query *originalQuery,
} }
List *
FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
bool replacePrunedQueryWithDummy)
{
static uint32 zeroShardQueryRoundRobin = 0;
List *workerList = NIL;
/*
* Determine the worker that has all shard placements if a shard placement found.
* If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will
* still run the query but the result will be empty. We create a dummy shard
* placement for the first active worker.
*/
if (shardsPresent)
{
workerList = WorkersContainingAllShards(shardIntervalList);
}
else if (replacePrunedQueryWithDummy)
{
List *workerNodeList = ActiveReadableWorkerNodeList();
if (workerNodeList != NIL)
{
int workerNodeCount = list_length(workerNodeList);
int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList,
workerNodeIndex);
ShardPlacement *dummyPlacement =
(ShardPlacement *) CitusMakeNode(ShardPlacement);
dummyPlacement->nodeName = workerNode->workerName;
dummyPlacement->nodePort = workerNode->workerPort;
dummyPlacement->nodeId = workerNode->nodeId;
dummyPlacement->groupId = workerNode->groupId;
workerList = lappend(workerList, dummyPlacement);
zeroShardQueryRoundRobin++;
}
}
return workerList;
}
/*
* RelationShardListForShardIntervalList is a utility function which gets a list of
* shardInterval, and returns a list of RelationShard.
*/
List *
RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent)
{
List *relationShardList = NIL;
ListCell *shardIntervalListCell = NULL;
foreach(shardIntervalListCell, shardIntervalList)
{
List *prunedShardIntervalList = (List *) lfirst(shardIntervalListCell);
/* no shard is present or all shards are pruned out case will be handled later */
if (prunedShardIntervalList == NIL)
{
continue;
}
*shardsPresent = true;
ListCell *shardIntervalCell = NULL;
foreach(shardIntervalCell, prunedShardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->relationId = shardInterval->relationId;
relationShard->shardId = shardInterval->shardId;
relationShardList = lappend(relationShardList, relationShard);
}
}
return relationShardList;
}
/* /*
* GetAnchorShardId returns the anchor shard id given relation shard list. * GetAnchorShardId returns the anchor shard id given relation shard list.
* The desired anchor shard is found as follows: * The desired anchor shard is found as follows:
@ -2229,7 +2297,7 @@ PlanRouterQuery(Query *originalQuery,
* reference tables * reference tables
* - Return INVALID_SHARD_ID on empty lists * - Return INVALID_SHARD_ID on empty lists
*/ */
static uint64 uint64
GetAnchorShardId(List *prunedShardIntervalListList) GetAnchorShardId(List *prunedShardIntervalListList)
{ {
ListCell *prunedShardIntervalListCell = NULL; ListCell *prunedShardIntervalListCell = NULL;
@ -2264,12 +2332,13 @@ GetAnchorShardId(List *prunedShardIntervalListList)
/* /*
* TargetShardIntervalForFastPathQuery gets a query which is in * TargetShardIntervalForFastPathQuery gets a query which is in
* the form defined by FastPathRouterQuery() and returns exactly * the form defined by FastPathRouterQuery() and returns exactly
* one shard interval (see FastPathRouterQuery() for the detail). * one list of a a one shard interval (see FastPathRouterQuery()
* for the detail).
* *
* Also set the outgoing partition column value if requested via * Also set the outgoing partition column value if requested via
* partitionValueConst * partitionValueConst
*/ */
static List * List *
TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
bool *isMultiShardQuery, Const *distributionKeyValue) bool *isMultiShardQuery, Const *distributionKeyValue)
{ {
@ -2286,7 +2355,9 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
/* set the outgoing partition column value if requested */ /* set the outgoing partition column value if requested */
*partitionValueConst = distributionKeyValue; *partitionValueConst = distributionKeyValue;
} }
return list_make1(shardInterval); List *shardIntervalList = list_make1(shardInterval);
return list_make1(shardIntervalList);
} }
Node *quals = query->jointree->quals; Node *quals = query->jointree->quals;
@ -2297,8 +2368,8 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
&queryPartitionValueConst); &queryPartitionValueConst);
/* we're only expecting single shard from a single table */ /* we're only expecting single shard from a single table */
Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
Assert(FastPathRouterQuery(query, &distKey)); Assert(FastPathRouterQuery(query, &distKey) || !EnableFastPathRouterPlanner);
if (list_length(prunedShardIntervalList) > 1) if (list_length(prunedShardIntervalList) > 1)
{ {
@ -2311,7 +2382,7 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst,
*partitionValueConst = queryPartitionValueConst; *partitionValueConst = queryPartitionValueConst;
} }
return prunedShardIntervalList; return list_make1(prunedShardIntervalList);
} }

View File

@ -117,7 +117,7 @@ CopyNodeDistributedPlan(COPYFUNC_ARGS)
COPY_NODE_FIELD(subPlanList); COPY_NODE_FIELD(subPlanList);
COPY_NODE_FIELD(usedSubPlanNodeList); COPY_NODE_FIELD(usedSubPlanNodeList);
COPY_SCALAR_FIELD(fastPathRouterPlan);
COPY_NODE_FIELD(planningError); COPY_NODE_FIELD(planningError);
} }

View File

@ -96,6 +96,11 @@ typedef struct FastPathRestrictionContext
* key contains parameter, so check for it before using. * key contains parameter, so check for it before using.
*/ */
Const *distributionKeyValue; Const *distributionKeyValue;
/*
* Set to true when distKey = Param; in the queryTree
*/
bool distributionKeyHasParam;
}FastPathRestrictionContext; }FastPathRestrictionContext;
typedef struct PlannerRestrictionContext typedef struct PlannerRestrictionContext

View File

@ -43,6 +43,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery,
bool replacePrunedQueryWithDummy, bool replacePrunedQueryWithDummy,
bool *multiShardModifyQuery, bool *multiShardModifyQuery,
Const **partitionValueConst); Const **partitionValueConst);
extern List * RelationShardListForShardIntervalList(List *shardIntervalList,
bool *shardsPresent);
extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent,
bool replacePrunedQueryWithDummy);
extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError);
extern Const * ExtractInsertPartitionKeyValue(Query *query); extern Const * ExtractInsertPartitionKeyValue(Query *query);
extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext * extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *
@ -71,6 +75,15 @@ extern void AddShardIntervalRestrictionToSelect(Query *subqery,
extern bool UpdateOrDeleteQuery(Query *query); extern bool UpdateOrDeleteQuery(Query *query);
extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern uint64 GetAnchorShardId(List *relationShardList);
extern List * TargetShardIntervalForFastPathQuery(Query *query,
Const **partitionValueConst,
bool *isMultiShardQuery,
Const *distributionKeyValue);
extern void GenerateSingleShardRouterTaskList(Job *job,
List *relationShardList,
List *placementList, uint64 shardId);
/* /*
* FastPathPlanner is a subset of router planner, that's why we prefer to * FastPathPlanner is a subset of router planner, that's why we prefer to
* keep the external function here. * keep the external function here.
@ -78,6 +91,7 @@ extern List * WorkersContainingAllShards(List *prunedShardIntervalsList);
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
boundParams); boundParams);
extern bool FastPathRouterQuery(Query *query, Const **distributionKeyValue); extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue);
#endif /* MULTI_ROUTER_PLANNER_H */ #endif /* MULTI_ROUTER_PLANNER_H */

View File

@ -234,50 +234,43 @@ DETAIL: distribution column value: 1
PREPARE p1 (int, int, int) AS PREPARE p1 (int, int, int) AS
UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3;
EXECUTE p1(1,1,1); EXECUTE p1(1,1,1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE p1(2,2,2); EXECUTE p1(2,2,2);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 2
EXECUTE p1(3,3,3); EXECUTE p1(3,3,3);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 3
EXECUTE p1(4,4,4); EXECUTE p1(4,4,4);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 4
EXECUTE p1(5,5,5); EXECUTE p1(5,5,5);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 5
EXECUTE p1(6,6,6); EXECUTE p1(6,6,6);
DEBUG: Router planner cannot handle multi-shard modify queries DEBUG: Deferred pruning for a fast-path router query
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 6 EXECUTE p1(7,7,7);
CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$ CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$
BEGIN BEGIN
DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2; DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2;
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
SELECT modify_fast_path_plpsql(1,1); SELECT modify_fast_path_plpsql(1,1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -286,14 +279,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(2,2); SELECT modify_fast_path_plpsql(2,2);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 2
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -302,14 +294,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(3,3); SELECT modify_fast_path_plpsql(3,3);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 3
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -318,14 +309,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(4,4); SELECT modify_fast_path_plpsql(4,4);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 4
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -334,14 +324,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(5,5); SELECT modify_fast_path_plpsql(5,5);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 5
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -350,17 +339,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(6,6); SELECT modify_fast_path_plpsql(6,6);
DEBUG: Router planner cannot handle multi-shard modify queries DEBUG: Deferred pruning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Distributed planning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 6
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
@ -369,22 +354,231 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme
(1 row) (1 row)
SELECT modify_fast_path_plpsql(6,6); SELECT modify_fast_path_plpsql(6,6);
DEBUG: Distributed planning for a fast-path router query
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Creating router plan
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
DEBUG: Plan is router executable
DETAIL: distribution column value: 6
CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2"
PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement
modify_fast_path_plpsql modify_fast_path_plpsql
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
-- prepared statements with zero shard
PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false;
PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false;
SET client_min_messages TO DEBUG2;
SET citus.log_remote_commands TO ON;
EXECUTE prepared_zero_shard_select(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(2);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(3);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 3) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(4);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 4) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(5);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 5) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(6);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 6) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(7);
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 7) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_update(1);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(2);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(3);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(4);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(5);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(6);
DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(7);
-- same test with fast-path disabled
SET citus.enable_fast_path_router_planner TO FALSE;
EXECUTE prepared_zero_shard_select(1);
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(2);
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_update(1);
EXECUTE prepared_zero_shard_update(2);
DEALLOCATE prepared_zero_shard_select;
DEALLOCATE prepared_zero_shard_update;
PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false;
PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false;
EXECUTE prepared_zero_shard_select(1);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(2);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(3);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(4);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(5);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 4
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(6);
DEBUG: Creating router plan
DEBUG: Plan is router executable
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_select(7);
NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false)
DETAIL: on server postgres@localhost:xxxxx connectionId: 3
count
---------------------------------------------------------------------
0
(1 row)
EXECUTE prepared_zero_shard_update(1);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(2);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(3);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(4);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(5);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(6);
DEBUG: Creating router plan
DEBUG: Plan is router executable
EXECUTE prepared_zero_shard_update(7);
-- same test with fast-path disabled
-- reset back to the original value, in case any new test comes after
RESET citus.enable_fast_path_router_planner;
RESET client_min_messages; RESET client_min_messages;
RESET citus.log_remote_commands;
DROP SCHEMA fast_path_router_modify CASCADE; DROP SCHEMA fast_path_router_modify CASCADE;
NOTICE: drop cascades to 4 other objects NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table modify_fast_path DETAIL: drop cascades to table modify_fast_path

View File

@ -227,6 +227,142 @@ SELECT * FROM coerce_hash WHERE id = 1.0::numeric;
1 | test value 1 | test value
(1 row) (1 row)
-- same queries with PREPARE
PREPARE coerce_bigint(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::bigint;
PREPARE coerce_numeric(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::numeric;
PREPARE coerce_numeric_2(numeric) AS SELECT * FROM coerce_hash WHERE id=$1;
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_bigint(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
EXECUTE coerce_numeric_2(1);
id | value
---------------------------------------------------------------------
1 | test value
(1 row)
SET search_path TO public; SET search_path TO public;
DROP SCHEMA prune_shard_list CASCADE; DROP SCHEMA prune_shard_list CASCADE;
NOTICE: drop cascades to 9 other objects NOTICE: drop cascades to 9 other objects

View File

@ -1559,10 +1559,9 @@ PREPARE author_articles(int) as
FROM articles_hash FROM articles_hash
WHERE author_id = $1; WHERE author_id = $1;
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1573,10 +1572,9 @@ DETAIL: distribution column value: 1
(5 rows) (5 rows)
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1587,10 +1585,9 @@ DETAIL: distribution column value: 1
(5 rows) (5 rows)
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1601,10 +1598,9 @@ DETAIL: distribution column value: 1
(5 rows) (5 rows)
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1615,10 +1611,9 @@ DETAIL: distribution column value: 1
(5 rows) (5 rows)
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1629,11 +1624,9 @@ DETAIL: distribution column value: 1
(5 rows) (5 rows)
EXECUTE author_articles(1); EXECUTE author_articles(1);
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Deferred pruning for a fast-path router query
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
id | author_id | title | word_count id | author_id | title | word_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 | 1 | arsenous | 9572 1 | 1 | arsenous | 9572
@ -1709,7 +1702,7 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1718,7 +1711,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1727,7 +1720,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1736,7 +1729,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1745,7 +1738,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1754,8 +1747,7 @@ DEBUG: Plan is router executable
(1 row) (1 row)
SELECT author_articles_max_id(1); SELECT author_articles_max_id(1);
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Deferred pruning for a fast-path router query
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
author_articles_max_id author_articles_max_id
@ -1849,7 +1841,7 @@ BEGIN
END; END;
$$ LANGUAGE plpgsql; $$ LANGUAGE plpgsql;
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1862,7 +1854,7 @@ DEBUG: Plan is router executable
(5 rows) (5 rows)
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1875,7 +1867,7 @@ DEBUG: Plan is router executable
(5 rows) (5 rows)
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1888,7 +1880,7 @@ DEBUG: Plan is router executable
(5 rows) (5 rows)
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1901,7 +1893,7 @@ DEBUG: Plan is router executable
(5 rows) (5 rows)
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1914,8 +1906,7 @@ DEBUG: Plan is router executable
(5 rows) (5 rows)
SELECT * FROM author_articles_id_word_count(1); SELECT * FROM author_articles_id_word_count(1);
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Deferred pruning for a fast-path router query
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
id | word_count id | word_count
@ -1936,45 +1927,39 @@ INSERT INTO articles_hash
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
EXECUTE insert_sel(1,1); EXECUTE insert_sel(1,1);
DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Collecting INSERT ... SELECT results on coordinator
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
-- one final interesting preperad statement -- one final interesting preperad statement
-- where one of the filters is on the target list -- where one of the filters is on the target list
PREPARE fast_path_agg_filter(int, int) AS PREPARE fast_path_agg_filter(int, int) AS
@ -1984,61 +1969,54 @@ PREPARE fast_path_agg_filter(int, int) AS
articles_hash articles_hash
WHERE author_id = $2; WHERE author_id = $2;
EXECUTE fast_path_agg_filter(1,1); EXECUTE fast_path_agg_filter(1,1);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 1
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
EXECUTE fast_path_agg_filter(2,2); EXECUTE fast_path_agg_filter(2,2);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 2
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
EXECUTE fast_path_agg_filter(3,3); EXECUTE fast_path_agg_filter(3,3);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 3
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
EXECUTE fast_path_agg_filter(4,4); EXECUTE fast_path_agg_filter(4,4);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 4
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
EXECUTE fast_path_agg_filter(5,5); EXECUTE fast_path_agg_filter(5,5);
DEBUG: Distributed planning for a fast-path router query DEBUG: Deferred pruning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 5
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
EXECUTE fast_path_agg_filter(6,6); EXECUTE fast_path_agg_filter(6,6);
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Deferred pruning for a fast-path router query
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
DETAIL: distribution column value: 6
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0

View File

@ -322,16 +322,19 @@ INSERT INTO test_parameterized_sql VALUES(1, 1);
SELECT * FROM test_parameterized_sql_function(1); SELECT * FROM test_parameterized_sql_function(1);
ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported
HINT: Consider using PL/pgSQL functions instead. HINT: Consider using PL/pgSQL functions instead.
SELECT test_parameterized_sql_function(1);
ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PL/pgSQL functions instead.
CONTEXT: SQL function "test_parameterized_sql_function" statement 1
SELECT test_parameterized_sql_function_in_subquery_where(1); SELECT test_parameterized_sql_function_in_subquery_where(1);
ERROR: could not create distributed plan ERROR: could not create distributed plan
DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus.
HINT: Consider using PL/pgSQL functions instead. HINT: Consider using PL/pgSQL functions instead.
CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1
-- postgres behaves slightly differently for the following
-- query where the target list is empty
SELECT test_parameterized_sql_function(1);
test_parameterized_sql_function
---------------------------------------------------------------------
1
(1 row)
-- test that sql function calls are treated as multi-statement transactions -- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables -- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction. -- don't go over 2PC if they are not part of a bigger transaction.

View File

@ -261,7 +261,38 @@ RETURNS void AS $$
DECLARE c CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; DECLARE c CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1;
$$ LANGUAGE SQL; $$ LANGUAGE SQL;
SELECT declares_cursor(5); SELECT declares_cursor(5);
declares_cursor
---------------------------------------------------------------------
(1 row)
-- Test DECLARE CURSOR .. WITH HOLD without parameter
CREATE OR REPLACE FUNCTION declares_cursor_2()
RETURNS void AS $$
DECLARE c2 CURSOR WITH HOLD FOR SELECT * FROM cursor_me;
$$ LANGUAGE SQL;
SELECT declares_cursor_2();
declares_cursor_2
---------------------------------------------------------------------
(1 row)
-- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key
CREATE OR REPLACE FUNCTION declares_cursor_3(p int)
RETURNS void AS $$
DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1;
$$ LANGUAGE SQL;
SELECT declares_cursor_3(19);
ERROR: Cursors for queries on distributed tables with parameters are currently unsupported ERROR: Cursors for queries on distributed tables with parameters are currently unsupported
-- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner
CREATE OR REPLACE FUNCTION declares_cursor_4(p int)
RETURNS void AS $$
DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1;
$$ LANGUAGE SQL;
SELECT declares_cursor_4(19);
ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses
HINT: Consider using an equality filter on the distributed table's partition column.
CONTEXT: SQL function "declares_cursor_4" statement 1
CREATE OR REPLACE FUNCTION cursor_plpgsql(p int) CREATE OR REPLACE FUNCTION cursor_plpgsql(p int)
RETURNS SETOF int AS $$ RETURNS SETOF int AS $$
DECLARE DECLARE

View File

@ -99,6 +99,7 @@ EXECUTE p1(3,3,3);
EXECUTE p1(4,4,4); EXECUTE p1(4,4,4);
EXECUTE p1(5,5,5); EXECUTE p1(5,5,5);
EXECUTE p1(6,6,6); EXECUTE p1(6,6,6);
EXECUTE p1(7,7,7);
CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$ CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$
BEGIN BEGIN
@ -114,6 +115,62 @@ SELECT modify_fast_path_plpsql(5,5);
SELECT modify_fast_path_plpsql(6,6); SELECT modify_fast_path_plpsql(6,6);
SELECT modify_fast_path_plpsql(6,6); SELECT modify_fast_path_plpsql(6,6);
RESET client_min_messages; -- prepared statements with zero shard
PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false;
PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false;
SET client_min_messages TO DEBUG2;
SET citus.log_remote_commands TO ON;
EXECUTE prepared_zero_shard_select(1);
EXECUTE prepared_zero_shard_select(2);
EXECUTE prepared_zero_shard_select(3);
EXECUTE prepared_zero_shard_select(4);
EXECUTE prepared_zero_shard_select(5);
EXECUTE prepared_zero_shard_select(6);
EXECUTE prepared_zero_shard_select(7);
EXECUTE prepared_zero_shard_update(1);
EXECUTE prepared_zero_shard_update(2);
EXECUTE prepared_zero_shard_update(3);
EXECUTE prepared_zero_shard_update(4);
EXECUTE prepared_zero_shard_update(5);
EXECUTE prepared_zero_shard_update(6);
EXECUTE prepared_zero_shard_update(7);
-- same test with fast-path disabled
SET citus.enable_fast_path_router_planner TO FALSE;
EXECUTE prepared_zero_shard_select(1);
EXECUTE prepared_zero_shard_select(2);
EXECUTE prepared_zero_shard_update(1);
EXECUTE prepared_zero_shard_update(2);
DEALLOCATE prepared_zero_shard_select;
DEALLOCATE prepared_zero_shard_update;
PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false;
PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false;
EXECUTE prepared_zero_shard_select(1);
EXECUTE prepared_zero_shard_select(2);
EXECUTE prepared_zero_shard_select(3);
EXECUTE prepared_zero_shard_select(4);
EXECUTE prepared_zero_shard_select(5);
EXECUTE prepared_zero_shard_select(6);
EXECUTE prepared_zero_shard_select(7);
EXECUTE prepared_zero_shard_update(1);
EXECUTE prepared_zero_shard_update(2);
EXECUTE prepared_zero_shard_update(3);
EXECUTE prepared_zero_shard_update(4);
EXECUTE prepared_zero_shard_update(5);
EXECUTE prepared_zero_shard_update(6);
EXECUTE prepared_zero_shard_update(7);
-- same test with fast-path disabled
-- reset back to the original value, in case any new test comes after
RESET citus.enable_fast_path_router_planner;
RESET client_min_messages;
RESET citus.log_remote_commands;
DROP SCHEMA fast_path_router_modify CASCADE; DROP SCHEMA fast_path_router_modify CASCADE;

View File

@ -145,5 +145,37 @@ SELECT * FROM coerce_hash WHERE id = 1.0;
SELECT * FROM coerce_hash WHERE id = 1.0::numeric; SELECT * FROM coerce_hash WHERE id = 1.0::numeric;
-- same queries with PREPARE
PREPARE coerce_bigint(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::bigint;
PREPARE coerce_numeric(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::numeric;
PREPARE coerce_numeric_2(numeric) AS SELECT * FROM coerce_hash WHERE id=$1;
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_bigint(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
EXECUTE coerce_numeric_2(1);
SET search_path TO public; SET search_path TO public;
DROP SCHEMA prune_shard_list CASCADE; DROP SCHEMA prune_shard_list CASCADE;

View File

@ -146,9 +146,12 @@ INSERT INTO test_parameterized_sql VALUES(1, 1);
-- all of them should fail -- all of them should fail
SELECT * FROM test_parameterized_sql_function(1); SELECT * FROM test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function(1);
SELECT test_parameterized_sql_function_in_subquery_where(1); SELECT test_parameterized_sql_function_in_subquery_where(1);
-- postgres behaves slightly differently for the following
-- query where the target list is empty
SELECT test_parameterized_sql_function(1);
-- test that sql function calls are treated as multi-statement transactions -- test that sql function calls are treated as multi-statement transactions
-- and are rolled back properly. Single-row inserts for not-replicated tables -- and are rolled back properly. Single-row inserts for not-replicated tables
-- don't go over 2PC if they are not part of a bigger transaction. -- don't go over 2PC if they are not part of a bigger transaction.

View File

@ -145,6 +145,30 @@ $$ LANGUAGE SQL;
SELECT declares_cursor(5); SELECT declares_cursor(5);
-- Test DECLARE CURSOR .. WITH HOLD without parameter
CREATE OR REPLACE FUNCTION declares_cursor_2()
RETURNS void AS $$
DECLARE c2 CURSOR WITH HOLD FOR SELECT * FROM cursor_me;
$$ LANGUAGE SQL;
SELECT declares_cursor_2();
-- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key
CREATE OR REPLACE FUNCTION declares_cursor_3(p int)
RETURNS void AS $$
DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1;
$$ LANGUAGE SQL;
SELECT declares_cursor_3(19);
-- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner
CREATE OR REPLACE FUNCTION declares_cursor_4(p int)
RETURNS void AS $$
DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1;
$$ LANGUAGE SQL;
SELECT declares_cursor_4(19);
CREATE OR REPLACE FUNCTION cursor_plpgsql(p int) CREATE OR REPLACE FUNCTION cursor_plpgsql(p int)
RETURNS SETOF int AS $$ RETURNS SETOF int AS $$
DECLARE DECLARE