diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 968b22102..3f46161ae 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -39,6 +39,12 @@ static Node * DelayedErrorCreateScan(CustomScan *scan); /* functions that are common to different scans */ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags); +static void CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int + eflags); +static void HandleDeferredShardPruningForFastPathQueries( + DistributedPlan *distributedPlan); +static void HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan); + static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); @@ -114,13 +120,11 @@ RegisterCitusCustomScanMethods(void) * CitusBeginScan sets the coordinator backend initiated by Citus for queries using * that function as the BeginCustomScan callback. * - * The function also handles modification scan actions. + * The function also handles deferred shard pruning along with function evaluations. */ static void CitusBeginScan(CustomScanState *node, EState *estate, int eflags) { - DistributedPlan *distributedPlan = NULL; - MarkCitusInitiatedCoordinatorBackend(); CitusScanState *scanState = (CitusScanState *) node; @@ -129,11 +133,17 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) ExecInitResultSlot(&scanState->customScanState.ss.ps, &TTSOpsMinimalTuple); #endif - distributedPlan = scanState->distributedPlan; + DistributedPlan *distributedPlan = scanState->distributedPlan; + Job *workerJob = distributedPlan->workerJob; + if (workerJob && + (workerJob->requiresMasterEvaluation || workerJob->deferredPruning)) + { + CitusGenerateDeferredQueryStrings(node, estate, eflags); + } + if (distributedPlan->modLevel == ROW_MODIFY_READONLY || distributedPlan->insertSelectQuery != NULL) { - /* no more action required */ return; } @@ -175,6 +185,38 @@ CitusExecScan(CustomScanState *node) */ static void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) +{ + CitusScanState *scanState = (CitusScanState *) node; + DistributedPlan *distributedPlan = scanState->distributedPlan; + + /* + * If we have not already copied the plan into this context, do it now. + * Note that we could have copied the plan during CitusGenerateDeferredQueryStrings. + */ + if (GetMemoryChunkContext(distributedPlan) != CurrentMemoryContext) + { + distributedPlan = copyObject(distributedPlan); + + scanState->distributedPlan = distributedPlan; + } + + Job *workerJob = distributedPlan->workerJob; + List *taskList = workerJob->taskList; + + /* prevent concurrent placement changes */ + AcquireMetadataLocks(taskList); + + /* modify tasks are always assigned using first-replica policy */ + workerJob->taskList = FirstReplicaAssignTaskList(taskList); +} + + +/* + * CitusGenerateDeferredQueryStrings generates query strings at the start of the execution + * in two cases: when the query requires master evaluation and/or deferred shard pruning. + */ +static void +CitusGenerateDeferredQueryStrings(CustomScanState *node, EState *estate, int eflags) { CitusScanState *scanState = (CitusScanState *) node; @@ -188,12 +230,32 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) Job *workerJob = distributedPlan->workerJob; Query *jobQuery = workerJob->jobQuery; - List *taskList = workerJob->taskList; - if (workerJob->requiresMasterEvaluation) + /* we'd only get to this function with the following conditions */ + Assert(workerJob->requiresMasterEvaluation || workerJob->deferredPruning); + + PlanState *planState = &(scanState->customScanState.ss.ps); + EState *executorState = planState->state; + + /* citus only evaluates functions for modification queries */ + bool modifyQueryRequiresMasterEvaluation = + workerJob->requiresMasterEvaluation && jobQuery->commandType != CMD_SELECT; + + /* + * ExecuteMasterEvaluableFunctions handles both function evalation + * and parameter evaluation. Pruning is most likely deferred because + * there is a parameter on the distribution key. So, evaluate in both + * cases. + */ + bool shoudEvaluteFunctionsOrParams = + modifyQueryRequiresMasterEvaluation || workerJob->deferredPruning; + if (shoudEvaluteFunctionsOrParams) { - PlanState *planState = &(scanState->customScanState.ss.ps); - EState *executorState = planState->state; + distributedPlan = (scanState->distributedPlan); + scanState->distributedPlan = distributedPlan; + + workerJob = distributedPlan->workerJob; + jobQuery = workerJob->jobQuery; ExecuteMasterEvaluableFunctions(jobQuery, planState); @@ -204,37 +266,107 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) * parameter values, we set the parameter list to NULL. */ executorState->es_param_list_info = NULL; - - if (workerJob->deferredPruning) - { - DeferredErrorMessage *planningError = NULL; - - /* need to perform shard pruning, rebuild the task list from scratch */ - taskList = RouterInsertTaskList(jobQuery, &planningError); - - if (planningError != NULL) - { - RaiseDeferredError(planningError, ERROR); - } - - workerJob->taskList = taskList; - workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); - } - - RebuildQueryStrings(jobQuery, taskList); } - /* prevent concurrent placement changes */ - AcquireMetadataLocks(taskList); + /* + * After evaluating the function/parameters, we're done unless shard pruning + * is also deferred. + */ + if (!workerJob->deferredPruning) + { + RebuildQueryStrings(workerJob->jobQuery, workerJob->taskList); + + return; + } + + /* at this point, we're about to do the shard pruning */ + Assert(workerJob->deferredPruning); + if (jobQuery->commandType == CMD_INSERT) + { + HandleDeferredShardPruningForInserts(distributedPlan); + } + else + { + HandleDeferredShardPruningForFastPathQueries(distributedPlan); + } +} + + +/* + * HandleDeferredShardPruningForInserts does the shard pruning for INSERT + * queries and rebuilds the query strings. + */ +static void +HandleDeferredShardPruningForInserts(DistributedPlan *distributedPlan) +{ + Job *workerJob = distributedPlan->workerJob; + Query *jobQuery = workerJob->jobQuery; + DeferredErrorMessage *planningError = NULL; + + /* need to perform shard pruning, rebuild the task list from scratch */ + List *taskList = RouterInsertTaskList(jobQuery, &planningError); + + if (planningError != NULL) + { + RaiseDeferredError(planningError, ERROR); + } + + workerJob->taskList = taskList; + workerJob->partitionKeyValue = ExtractInsertPartitionKeyValue(jobQuery); + + RebuildQueryStrings(jobQuery, workerJob->taskList); +} + + +/* + * HandleDeferredShardPruningForFastPathQueries does the shard pruning for + * UPDATE/DELETE/SELECT fast path router queries and rebuilds the query strings. + */ +static void +HandleDeferredShardPruningForFastPathQueries(DistributedPlan *distributedPlan) +{ + Assert(distributedPlan->fastPathRouterPlan); + + Job *workerJob = distributedPlan->workerJob; + + bool isMultiShardQuery = false; + List *shardIntervalList = + TargetShardIntervalForFastPathQuery(workerJob->jobQuery, + &workerJob->partitionKeyValue, + &isMultiShardQuery, NULL); /* - * We are taking locks on partitions of partitioned tables. These locks are - * necessary for locking tables that appear in the SELECT part of the query. + * A fast-path router query can only yield multiple shards when the parameter + * cannot be resolved properly, which can be triggered by SQL function. */ - LockPartitionsInRelationList(distributedPlan->relationIdList, AccessShareLock); + if (isMultiShardQuery) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot perform distributed planning on this " + "query because parameterized queries for SQL " + "functions referencing distributed tables are " + "not supported"), + errhint("Consider using PL/pgSQL functions instead."))); + } - /* modify tasks are always assigned using first-replica policy */ - workerJob->taskList = FirstReplicaAssignTaskList(taskList); + bool shardsPresent = false; + List *relationShardList = + RelationShardListForShardIntervalList(shardIntervalList, &shardsPresent); + + UpdateRelationToShardNames((Node *) workerJob->jobQuery, relationShardList); + + List *placementList = + FindRouterWorkerList(shardIntervalList, shardsPresent, true); + uint64 shardId = INVALID_SHARD_ID; + + if (shardsPresent) + { + shardId = GetAnchorShardId(shardIntervalList); + } + + GenerateSingleShardRouterTaskList(workerJob, + relationShardList, + placementList, shardId); } @@ -287,7 +419,8 @@ CoordinatorInsertSelectCreateScan(CustomScan *scan) scanState->customScanState.ss.ps.type = T_CustomScanState; scanState->distributedPlan = GetDistributedPlan(scan); - scanState->customScanState.methods = &CoordinatorInsertSelectCustomExecMethods; + scanState->customScanState.methods = + &CoordinatorInsertSelectCustomExecMethods; return (Node *) scanState; } @@ -380,7 +513,8 @@ CitusReScan(CustomScanState *node) TupleDesc ScanStateGetTupleDescriptor(CitusScanState *scanState) { - return scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + return scanState->customScanState.ss.ps.ps_ResultTupleSlot-> + tts_tupleDescriptor; } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 5d017c37d..90a3750a3 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -122,7 +122,7 @@ static bool IsLocalReferenceTableJoin(Query *parse, List *rangeTableList); static bool QueryIsNotSimpleSelect(Node *node); static bool UpdateReferenceTablesWithShard(Node *node, void *context); static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Const *distributionKeyValue); + Node *distributionKeyValue); static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, List *rangeTableList, int rteIdCounter); @@ -136,7 +136,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) List *rangeTableList = ExtractRangeTableEntryList(parse); int rteIdCounter = 1; bool fastPathRouterQuery = false; - Const *distributionKeyValue = NULL; + Node *distributionKeyValue = NULL; DistributedPlanningContext planContext = { .query = parse, .cursorOptions = cursorOptions, @@ -551,12 +551,26 @@ IsModifyDistributedPlan(DistributedPlan *distributedPlan) */ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext, - Const *distributionKeyValue) + Node *distributionKeyValue) { + FastPathRestrictionContext *fastPathContext = + planContext->plannerRestrictionContext->fastPathRestrictionContext; + planContext->plannerRestrictionContext->fastPathRestrictionContext-> fastPathRouterQuery = true; - planContext->plannerRestrictionContext->fastPathRestrictionContext-> - distributionKeyValue = distributionKeyValue; + + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 8dc6f5225..fa1b51fae 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -59,9 +59,9 @@ bool EnableFastPathRouterPlanner = true; static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ConjunctionContainsColumnFilter(Node *node, Var *column, - Const **distributionKeyValue); + Node **distributionKeyValue); static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, - Const **distributionKeyValue); + Node **distributionKeyValue); /* @@ -75,21 +75,6 @@ static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) { - /* - * To support prepared statements for fast-path queries, we resolve the - * external parameters at this point. Note that this is normally done by - * eval_const_expr() in standard planner when the boundParams are avaliable. - * If not avaliable, as does for all other types of queries, Citus goes - * through the logic of increasing the cost of the plan and forcing - * PostgreSQL to pick custom plans. - * - * We're also only interested in resolving the quals since we'd want to - * do shard pruning based on the filter on the distribution column. - */ - originalQuery->jointree->quals = - ResolveExternalParams((Node *) originalQuery->jointree->quals, - copyParamList(boundParams)); - /* * Citus planner relies on some of the transformations on constant * evaluation on the parse tree. @@ -124,7 +109,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) SeqScan *seqScanNode = makeNode(SeqScan); Plan *plan = &seqScanNode->plan; - Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; + Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; AssertArg(FastPathRouterQuery(parse, &distKey)); @@ -171,7 +156,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * don't have any sublinks/CTEs etc */ bool -FastPathRouterQuery(Query *query, Const **distributionKeyValue) +FastPathRouterQuery(Query *query, Node **distributionKeyValue) { FromExpr *joinTree = query->jointree; Node *quals = NULL; @@ -307,7 +292,7 @@ ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey) * If the conjuction contains column filter which is const, distributionKeyValue is set. */ static bool -ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKeyValue) +ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue) { if (node == NULL) { @@ -369,7 +354,7 @@ ConjunctionContainsColumnFilter(Node *node, Var *column, Const **distributionKey * When a const is found, distributionKeyValue is set. */ static bool -DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionKeyValue) +DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Node **distributionKeyValue) { Node *leftOperand = NULL; Node *rightOperand = NULL; @@ -436,7 +421,11 @@ DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, Const **distributionK *distributionKeyValue == NULL) { /* if the vartypes do not match, let shard pruning handle it later */ - *distributionKeyValue = copyObject(constantClause); + *distributionKeyValue = (Node *) copyObject(constantClause); + } + else if (paramClause != NULL) + { + *distributionKeyValue = (Node *) copyObject(paramClause); } return distColumnExists; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 59c6325dd..bb1a975d2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -153,11 +153,6 @@ static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); -static uint64 GetAnchorShardId(List *relationShardList); -static List * TargetShardIntervalForFastPathQuery(Query *query, - Const **partitionValueConst, - bool *isMultiShardQuery, - Const *distributionKeyValue); static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); @@ -165,11 +160,11 @@ static bool RowLocksOnRelations(Node *node, List **rtiLockList); static List * SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId); +static List * RemoveCoordinatorPlacement(List *placementList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType taskAssignmentPolicy, List *placementList); -static List * RemoveCoordinatorPlacement(List *placementList); /* @@ -248,6 +243,10 @@ CreateModifyPlan(Query *originalQuery, Query *query, distributedPlan->hasReturning = true; } + distributedPlan->fastPathRouterPlan = + plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; + + return distributedPlan; } @@ -1634,9 +1633,6 @@ ExtractFirstDistributedTableId(Query *query) List *rangeTableList = query->rtable; ListCell *rangeTableCell = NULL; Oid distributedTableId = InvalidOid; - Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - - Assert(IsModifyCommand(query) || FastPathRouterQuery(query, &distKey)); foreach(rangeTableCell, rangeTableList) { @@ -1673,13 +1669,34 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon /* check if this query requires master evaluation */ bool requiresMasterEvaluation = RequiresMasterEvaluation(originalQuery); + FastPathRestrictionContext *fastPathRestrictionContext = + plannerRestrictionContext->fastPathRestrictionContext; + + /* + * We prefer to defer shard pruning/task generation to the + * execution when the parameter on the distribution key + * cannot be resolved. + */ + if (fastPathRestrictionContext->fastPathRouterQuery && + fastPathRestrictionContext->distributionKeyHasParam) + { + Job *job = CreateJob(originalQuery); + job->deferredPruning = true; + + ereport(DEBUG2, (errmsg("Deferred pruning for a fast-path router " + "query"))); + return job; + } + else + { + (*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext, + &placementList, &shardId, &relationShardList, + &prunedShardIntervalListList, + replacePrunedQueryWithDummy, + &isMultiShardModifyQuery, + &partitionKeyValue); + } - (*planningError) = PlanRouterQuery(originalQuery, plannerRestrictionContext, - &placementList, &shardId, &relationShardList, - &prunedShardIntervalListList, - replacePrunedQueryWithDummy, - &isMultiShardModifyQuery, - &partitionKeyValue); if (*planningError) { return NULL; @@ -1703,6 +1720,39 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon return job; } + if (isMultiShardModifyQuery) + { + job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId, + plannerRestrictionContext-> + relationRestrictionContext, + prunedShardIntervalListList, + MODIFY_TASK, + requiresMasterEvaluation); + } + else + { + GenerateSingleShardRouterTaskList(job, relationShardList, + placementList, shardId); + } + + job->requiresMasterEvaluation = requiresMasterEvaluation; + return job; +} + + +/* + * SingleShardRouterTaskList is a wrapper around other corresponding task + * list generation functions specific to single shard selects and modifications. + * + * The function updates the input job's taskList in-place. + */ +void +GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, + List *placementList, uint64 shardId) +{ + Query *originalQuery = job->jobQuery; + + if (originalQuery->commandType == CMD_SELECT) { job->taskList = SingleShardSelectTaskList(originalQuery, job->jobId, @@ -1724,15 +1774,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon placementList); } } - else if (isMultiShardModifyQuery) - { - job->taskList = QueryPushdownSqlTaskList(originalQuery, job->jobId, - plannerRestrictionContext-> - relationRestrictionContext, - prunedShardIntervalListList, - MODIFY_TASK, - requiresMasterEvaluation); - } else if (shardId == INVALID_SHARD_ID) { /* modification that prunes to 0 shards */ @@ -1744,9 +1785,6 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon relationShardList, placementList, shardId); } - - job->requiresMasterEvaluation = requiresMasterEvaluation; - return job; } @@ -2023,14 +2061,9 @@ PlanRouterQuery(Query *originalQuery, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, Const **partitionValueConst) { - static uint32 zeroShardQueryRoundRobin = 0; - bool isMultiShardQuery = false; DeferredErrorMessage *planningError = NULL; - ListCell *prunedShardIntervalListCell = NULL; - List *workerList = NIL; bool shardsPresent = false; - uint64 shardId = INVALID_SHARD_ID; CmdType commandType = originalQuery->commandType; bool fastPathRouterQuery = plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery; @@ -2065,7 +2098,7 @@ PlanRouterQuery(Query *originalQuery, return planningError; } - *prunedShardIntervalListList = list_make1(shardIntervalList); + *prunedShardIntervalListList = shardIntervalList; if (!isMultiShardQuery) { @@ -2111,29 +2144,18 @@ PlanRouterQuery(Query *originalQuery, } } - foreach(prunedShardIntervalListCell, *prunedShardIntervalListList) + *relationShardList = + RelationShardListForShardIntervalList(*prunedShardIntervalListList, + &shardsPresent); + + if (!shardsPresent && !replacePrunedQueryWithDummy) { - List *prunedShardIntervalList = (List *) lfirst(prunedShardIntervalListCell); - ListCell *shardIntervalCell = NULL; - - /* no shard is present or all shards are pruned out case will be handled later */ - if (prunedShardIntervalList == NIL) - { - continue; - } - - shardsPresent = true; - - foreach(shardIntervalCell, prunedShardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - RelationShard *relationShard = CitusMakeNode(RelationShard); - - relationShard->relationId = shardInterval->relationId; - relationShard->shardId = shardInterval->shardId; - - *relationShardList = lappend(*relationShardList, relationShard); - } + /* + * For INSERT ... SELECT, this query could be still a valid for some other target + * shard intervals. Thus, we should return empty list if there aren't any matching + * workers, so that the caller can decide what to do with this task. + */ + return NULL; } /* @@ -2149,48 +2171,11 @@ PlanRouterQuery(Query *originalQuery, } /* we need anchor shard id for select queries with router planner */ - shardId = GetAnchorShardId(*prunedShardIntervalListList); + uint64 shardId = GetAnchorShardId(*prunedShardIntervalListList); - /* - * Determine the worker that has all shard placements if a shard placement found. - * If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will - * still run the query but the result will be empty. We create a dummy shard - * placement for the first active worker. - */ - if (shardsPresent) - { - workerList = WorkersContainingAllShards(*prunedShardIntervalListList); - } - else if (replacePrunedQueryWithDummy) - { - List *workerNodeList = ActiveReadableWorkerNodeList(); - if (workerNodeList != NIL) - { - int workerNodeCount = list_length(workerNodeList); - int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; - WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, - workerNodeIndex); - ShardPlacement *dummyPlacement = - (ShardPlacement *) CitusMakeNode(ShardPlacement); - dummyPlacement->nodeName = workerNode->workerName; - dummyPlacement->nodePort = workerNode->workerPort; - dummyPlacement->nodeId = workerNode->nodeId; - dummyPlacement->groupId = workerNode->groupId; - - workerList = lappend(workerList, dummyPlacement); - - zeroShardQueryRoundRobin++; - } - } - else - { - /* - * For INSERT ... SELECT, this query could be still a valid for some other target - * shard intervals. Thus, we should return empty list if there aren't any matching - * workers, so that the caller can decide what to do with this task. - */ - return NULL; - } + List *workerList = + FindRouterWorkerList(*prunedShardIntervalListList, shardsPresent, + replacePrunedQueryWithDummy); if (workerList == NIL) { @@ -2219,6 +2204,89 @@ PlanRouterQuery(Query *originalQuery, } +List * +FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, + bool replacePrunedQueryWithDummy) +{ + static uint32 zeroShardQueryRoundRobin = 0; + + List *workerList = NIL; + + /* + * Determine the worker that has all shard placements if a shard placement found. + * If no shard placement exists and replacePrunedQueryWithDummy flag is set, we will + * still run the query but the result will be empty. We create a dummy shard + * placement for the first active worker. + */ + if (shardsPresent) + { + workerList = WorkersContainingAllShards(shardIntervalList); + } + else if (replacePrunedQueryWithDummy) + { + List *workerNodeList = ActiveReadableWorkerNodeList(); + if (workerNodeList != NIL) + { + int workerNodeCount = list_length(workerNodeList); + int workerNodeIndex = zeroShardQueryRoundRobin % workerNodeCount; + WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, + workerNodeIndex); + ShardPlacement *dummyPlacement = + (ShardPlacement *) CitusMakeNode(ShardPlacement); + dummyPlacement->nodeName = workerNode->workerName; + dummyPlacement->nodePort = workerNode->workerPort; + dummyPlacement->nodeId = workerNode->nodeId; + dummyPlacement->groupId = workerNode->groupId; + + workerList = lappend(workerList, dummyPlacement); + + zeroShardQueryRoundRobin++; + } + } + + return workerList; +} + + +/* + * RelationShardListForShardIntervalList is a utility function which gets a list of + * shardInterval, and returns a list of RelationShard. + */ +List * +RelationShardListForShardIntervalList(List *shardIntervalList, bool *shardsPresent) +{ + List *relationShardList = NIL; + ListCell *shardIntervalListCell = NULL; + + foreach(shardIntervalListCell, shardIntervalList) + { + List *prunedShardIntervalList = (List *) lfirst(shardIntervalListCell); + + /* no shard is present or all shards are pruned out case will be handled later */ + if (prunedShardIntervalList == NIL) + { + continue; + } + + *shardsPresent = true; + + ListCell *shardIntervalCell = NULL; + foreach(shardIntervalCell, prunedShardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + RelationShard *relationShard = CitusMakeNode(RelationShard); + + relationShard->relationId = shardInterval->relationId; + relationShard->shardId = shardInterval->shardId; + + relationShardList = lappend(relationShardList, relationShard); + } + } + + return relationShardList; +} + + /* * GetAnchorShardId returns the anchor shard id given relation shard list. * The desired anchor shard is found as follows: @@ -2229,7 +2297,7 @@ PlanRouterQuery(Query *originalQuery, * reference tables * - Return INVALID_SHARD_ID on empty lists */ -static uint64 +uint64 GetAnchorShardId(List *prunedShardIntervalListList) { ListCell *prunedShardIntervalListCell = NULL; @@ -2264,12 +2332,13 @@ GetAnchorShardId(List *prunedShardIntervalListList) /* * TargetShardIntervalForFastPathQuery gets a query which is in * the form defined by FastPathRouterQuery() and returns exactly - * one shard interval (see FastPathRouterQuery() for the detail). + * one list of a a one shard interval (see FastPathRouterQuery() + * for the detail). * * Also set the outgoing partition column value if requested via * partitionValueConst */ -static List * +List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, bool *isMultiShardQuery, Const *distributionKeyValue) { @@ -2286,7 +2355,9 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, /* set the outgoing partition column value if requested */ *partitionValueConst = distributionKeyValue; } - return list_make1(shardInterval); + List *shardIntervalList = list_make1(shardInterval); + + return list_make1(shardIntervalList); } Node *quals = query->jointree->quals; @@ -2297,8 +2368,8 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, &queryPartitionValueConst); /* we're only expecting single shard from a single table */ - Const *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; - Assert(FastPathRouterQuery(query, &distKey)); + Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; + Assert(FastPathRouterQuery(query, &distKey) || !EnableFastPathRouterPlanner); if (list_length(prunedShardIntervalList) > 1) { @@ -2311,7 +2382,7 @@ TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, *partitionValueConst = queryPartitionValueConst; } - return prunedShardIntervalList; + return list_make1(prunedShardIntervalList); } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 98d6f45a7..518191d67 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -96,6 +96,11 @@ typedef struct FastPathRestrictionContext * key contains parameter, so check for it before using. */ Const *distributionKeyValue; + + /* + * Set to true when distKey = Param; in the queryTree + */ + bool distributionKeyHasParam; }FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 650df0065..e0ca1e835 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -43,6 +43,10 @@ extern DeferredErrorMessage * PlanRouterQuery(Query *originalQuery, bool replacePrunedQueryWithDummy, bool *multiShardModifyQuery, Const **partitionValueConst); +extern List * RelationShardListForShardIntervalList(List *shardIntervalList, + bool *shardsPresent); +extern List * FindRouterWorkerList(List *shardIntervalList, bool shardsPresent, + bool replacePrunedQueryWithDummy); extern List * RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError); extern Const * ExtractInsertPartitionKeyValue(Query *query); extern List * TargetShardIntervalsForRestrictInfo(RelationRestrictionContext * @@ -71,6 +75,15 @@ extern void AddShardIntervalRestrictionToSelect(Query *subqery, extern bool UpdateOrDeleteQuery(Query *query); extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); +extern uint64 GetAnchorShardId(List *relationShardList); +extern List * TargetShardIntervalForFastPathQuery(Query *query, + Const **partitionValueConst, + bool *isMultiShardQuery, + Const *distributionKeyValue); +extern void GenerateSingleShardRouterTaskList(Job *job, + List *relationShardList, + List *placementList, uint64 shardId); + /* * FastPathPlanner is a subset of router planner, that's why we prefer to * keep the external function here. @@ -78,6 +91,7 @@ extern List * WorkersContainingAllShards(List *prunedShardIntervalsList); extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); -extern bool FastPathRouterQuery(Query *query, Const **distributionKeyValue); +extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue); + #endif /* MULTI_ROUTER_PLANNER_H */ diff --git a/src/test/regress/expected/fast_path_router_modify.out b/src/test/regress/expected/fast_path_router_modify.out index d995a1840..e155206d3 100644 --- a/src/test/regress/expected/fast_path_router_modify.out +++ b/src/test/regress/expected/fast_path_router_modify.out @@ -234,50 +234,43 @@ DETAIL: distribution column value: 1 PREPARE p1 (int, int, int) AS UPDATE modify_fast_path SET value_1 = value_1 + $1 WHERE key = $2 AND value_1 = $3; EXECUTE p1(1,1,1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE p1(2,2,2); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 2 EXECUTE p1(3,3,3); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 3 EXECUTE p1(4,4,4); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 4 EXECUTE p1(5,5,5); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 5 EXECUTE p1(6,6,6); -DEBUG: Router planner cannot handle multi-shard modify queries -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 6 +EXECUTE p1(7,7,7); CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$ BEGIN DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2; END; $$ LANGUAGE plpgsql; SELECT modify_fast_path_plpsql(1,1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 1 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -286,14 +279,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(2,2); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 2 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -302,14 +294,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(3,3); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 3 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -318,14 +309,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(4,4); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 4 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -334,14 +324,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(5,5); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 5 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -350,17 +339,13 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(6,6); -DEBUG: Router planner cannot handle multi-shard modify queries -CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" -PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Creating router plan CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement DEBUG: Plan is router executable -DETAIL: distribution column value: 6 CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql @@ -369,22 +354,231 @@ PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL stateme (1 row) SELECT modify_fast_path_plpsql(6,6); -DEBUG: Distributed planning for a fast-path router query -CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" -PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement -DEBUG: Creating router plan -CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" -PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement -DEBUG: Plan is router executable -DETAIL: distribution column value: 6 -CONTEXT: SQL statement "DELETE FROM modify_fast_path WHERE key = $1 AND value_1 = $2" -PL/pgSQL function modify_fast_path_plpsql(integer,integer) line 3 at SQL statement modify_fast_path_plpsql --------------------------------------------------------------------- (1 row) +-- prepared statements with zero shard +PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false; +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +EXECUTE prepared_zero_shard_select(1); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(2); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(3); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 3) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(4); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 4) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(5); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 5) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(6); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 6) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(7); +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 7) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_update(1); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(2); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(3); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(4); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(5); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(6); +DEBUG: Deferred pruning for a fast-path router query +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(7); +-- same test with fast-path disabled +SET citus.enable_fast_path_router_planner TO FALSE; +EXECUTE prepared_zero_shard_select(1); +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(2); +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) 2) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); +DEALLOCATE prepared_zero_shard_select; +DEALLOCATE prepared_zero_shard_update; +PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false; +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +EXECUTE prepared_zero_shard_select(1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(2); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(3); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(4); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(5); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 4 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(6); +DEBUG: Creating router plan +DEBUG: Plan is router executable +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_select(7); +NOTICE: issuing SELECT count(*) AS count FROM (SELECT NULL::integer AS key, NULL::integer AS value_1, NULL::text AS value_2 WHERE false) modify_fast_path(key, value_1, value_2) WHERE ((key OPERATOR(pg_catalog.=) $1) AND false) +DETAIL: on server postgres@localhost:xxxxx connectionId: 3 + count +--------------------------------------------------------------------- + 0 +(1 row) + +EXECUTE prepared_zero_shard_update(1); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(2); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(3); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(4); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(5); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(6); +DEBUG: Creating router plan +DEBUG: Plan is router executable +EXECUTE prepared_zero_shard_update(7); +-- same test with fast-path disabled +-- reset back to the original value, in case any new test comes after +RESET citus.enable_fast_path_router_planner; RESET client_min_messages; +RESET citus.log_remote_commands; DROP SCHEMA fast_path_router_modify CASCADE; NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table modify_fast_path diff --git a/src/test/regress/expected/multi_prune_shard_list.out b/src/test/regress/expected/multi_prune_shard_list.out index 02d1a72e9..c0fdb8324 100644 --- a/src/test/regress/expected/multi_prune_shard_list.out +++ b/src/test/regress/expected/multi_prune_shard_list.out @@ -227,6 +227,142 @@ SELECT * FROM coerce_hash WHERE id = 1.0::numeric; 1 | test value (1 row) +-- same queries with PREPARE +PREPARE coerce_bigint(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::bigint; +PREPARE coerce_numeric(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::numeric; +PREPARE coerce_numeric_2(numeric) AS SELECT * FROM coerce_hash WHERE id=$1; +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_bigint(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + +EXECUTE coerce_numeric_2(1); + id | value +--------------------------------------------------------------------- + 1 | test value +(1 row) + SET search_path TO public; DROP SCHEMA prune_shard_list CASCADE; NOTICE: drop cascades to 9 other objects diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 2a4e204dd..4a9ad5e73 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -1559,10 +1559,9 @@ PREPARE author_articles(int) as FROM articles_hash WHERE author_id = $1; EXECUTE author_articles(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1573,10 +1572,9 @@ DETAIL: distribution column value: 1 (5 rows) EXECUTE author_articles(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1587,10 +1585,9 @@ DETAIL: distribution column value: 1 (5 rows) EXECUTE author_articles(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1601,10 +1598,9 @@ DETAIL: distribution column value: 1 (5 rows) EXECUTE author_articles(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1615,10 +1611,9 @@ DETAIL: distribution column value: 1 (5 rows) EXECUTE author_articles(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1629,11 +1624,9 @@ DETAIL: distribution column value: 1 (5 rows) EXECUTE author_articles(1); -DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 id | author_id | title | word_count --------------------------------------------------------------------- 1 | 1 | arsenous | 9572 @@ -1709,7 +1702,7 @@ BEGIN END; $$ LANGUAGE plpgsql; SELECT author_articles_max_id(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1718,7 +1711,7 @@ DEBUG: Plan is router executable (1 row) SELECT author_articles_max_id(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1727,7 +1720,7 @@ DEBUG: Plan is router executable (1 row) SELECT author_articles_max_id(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1736,7 +1729,7 @@ DEBUG: Plan is router executable (1 row) SELECT author_articles_max_id(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1745,7 +1738,7 @@ DEBUG: Plan is router executable (1 row) SELECT author_articles_max_id(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1754,8 +1747,7 @@ DEBUG: Plan is router executable (1 row) SELECT author_articles_max_id(1); -DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable author_articles_max_id @@ -1849,7 +1841,7 @@ BEGIN END; $$ LANGUAGE plpgsql; SELECT * FROM author_articles_id_word_count(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1862,7 +1854,7 @@ DEBUG: Plan is router executable (5 rows) SELECT * FROM author_articles_id_word_count(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1875,7 +1867,7 @@ DEBUG: Plan is router executable (5 rows) SELECT * FROM author_articles_id_word_count(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1888,7 +1880,7 @@ DEBUG: Plan is router executable (5 rows) SELECT * FROM author_articles_id_word_count(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1901,7 +1893,7 @@ DEBUG: Plan is router executable (5 rows) SELECT * FROM author_articles_id_word_count(1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1914,8 +1906,7 @@ DEBUG: Plan is router executable (5 rows) SELECT * FROM author_articles_id_word_count(1); -DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable id | word_count @@ -1936,45 +1927,39 @@ INSERT INTO articles_hash EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries DEBUG: Collecting INSERT ... SELECT results on coordinator -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 -- one final interesting preperad statement -- where one of the filters is on the target list PREPARE fast_path_agg_filter(int, int) AS @@ -1984,61 +1969,54 @@ PREPARE fast_path_agg_filter(int, int) AS articles_hash WHERE author_id = $2; EXECUTE fast_path_agg_filter(1,1); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 1 count --------------------------------------------------------------------- 0 (1 row) EXECUTE fast_path_agg_filter(2,2); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 2 count --------------------------------------------------------------------- 0 (1 row) EXECUTE fast_path_agg_filter(3,3); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 3 count --------------------------------------------------------------------- 0 (1 row) EXECUTE fast_path_agg_filter(4,4); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 4 count --------------------------------------------------------------------- 0 (1 row) EXECUTE fast_path_agg_filter(5,5); -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 5 count --------------------------------------------------------------------- 0 (1 row) EXECUTE fast_path_agg_filter(6,6); -DEBUG: Router planner cannot handle multi-shard select queries -DEBUG: Distributed planning for a fast-path router query +DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable -DETAIL: distribution column value: 6 count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/multi_sql_function.out b/src/test/regress/expected/multi_sql_function.out index 4cd18c81c..c762e827d 100644 --- a/src/test/regress/expected/multi_sql_function.out +++ b/src/test/regress/expected/multi_sql_function.out @@ -322,16 +322,19 @@ INSERT INTO test_parameterized_sql VALUES(1, 1); SELECT * FROM test_parameterized_sql_function(1); ERROR: cannot perform distributed planning on this query because parameterized queries for SQL functions referencing distributed tables are not supported HINT: Consider using PL/pgSQL functions instead. -SELECT test_parameterized_sql_function(1); -ERROR: could not create distributed plan -DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. -HINT: Consider using PL/pgSQL functions instead. -CONTEXT: SQL function "test_parameterized_sql_function" statement 1 SELECT test_parameterized_sql_function_in_subquery_where(1); ERROR: could not create distributed plan DETAIL: Possibly this is caused by the use of parameters in SQL functions, which is not supported in Citus. HINT: Consider using PL/pgSQL functions instead. CONTEXT: SQL function "test_parameterized_sql_function_in_subquery_where" statement 1 +-- postgres behaves slightly differently for the following +-- query where the target list is empty +SELECT test_parameterized_sql_function(1); + test_parameterized_sql_function +--------------------------------------------------------------------- + 1 +(1 row) + -- test that sql function calls are treated as multi-statement transactions -- and are rolled back properly. Single-row inserts for not-replicated tables -- don't go over 2PC if they are not part of a bigger transaction. diff --git a/src/test/regress/expected/multi_utility_statements.out b/src/test/regress/expected/multi_utility_statements.out index 1a4daa418..b900a106d 100644 --- a/src/test/regress/expected/multi_utility_statements.out +++ b/src/test/regress/expected/multi_utility_statements.out @@ -261,7 +261,38 @@ RETURNS void AS $$ DECLARE c CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE x = $1; $$ LANGUAGE SQL; SELECT declares_cursor(5); + declares_cursor +--------------------------------------------------------------------- + +(1 row) + +-- Test DECLARE CURSOR .. WITH HOLD without parameter +CREATE OR REPLACE FUNCTION declares_cursor_2() +RETURNS void AS $$ + DECLARE c2 CURSOR WITH HOLD FOR SELECT * FROM cursor_me; +$$ LANGUAGE SQL; +SELECT declares_cursor_2(); + declares_cursor_2 +--------------------------------------------------------------------- + +(1 row) + +-- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key +CREATE OR REPLACE FUNCTION declares_cursor_3(p int) +RETURNS void AS $$ + DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; +$$ LANGUAGE SQL; +SELECT declares_cursor_3(19); ERROR: Cursors for queries on distributed tables with parameters are currently unsupported +-- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner +CREATE OR REPLACE FUNCTION declares_cursor_4(p int) +RETURNS void AS $$ + DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; +$$ LANGUAGE SQL; +SELECT declares_cursor_4(19); +ERROR: could not run distributed query with subquery outside the FROM, WHERE and HAVING clauses +HINT: Consider using an equality filter on the distributed table's partition column. +CONTEXT: SQL function "declares_cursor_4" statement 1 CREATE OR REPLACE FUNCTION cursor_plpgsql(p int) RETURNS SETOF int AS $$ DECLARE diff --git a/src/test/regress/sql/fast_path_router_modify.sql b/src/test/regress/sql/fast_path_router_modify.sql index 63ab5f3e7..6581f9be6 100644 --- a/src/test/regress/sql/fast_path_router_modify.sql +++ b/src/test/regress/sql/fast_path_router_modify.sql @@ -99,6 +99,7 @@ EXECUTE p1(3,3,3); EXECUTE p1(4,4,4); EXECUTE p1(5,5,5); EXECUTE p1(6,6,6); +EXECUTE p1(7,7,7); CREATE FUNCTION modify_fast_path_plpsql(int, int) RETURNS void as $$ BEGIN @@ -114,6 +115,62 @@ SELECT modify_fast_path_plpsql(5,5); SELECT modify_fast_path_plpsql(6,6); SELECT modify_fast_path_plpsql(6,6); -RESET client_min_messages; +-- prepared statements with zero shard +PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false; +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; +SET client_min_messages TO DEBUG2; +SET citus.log_remote_commands TO ON; +EXECUTE prepared_zero_shard_select(1); +EXECUTE prepared_zero_shard_select(2); +EXECUTE prepared_zero_shard_select(3); +EXECUTE prepared_zero_shard_select(4); +EXECUTE prepared_zero_shard_select(5); +EXECUTE prepared_zero_shard_select(6); +EXECUTE prepared_zero_shard_select(7); +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); +EXECUTE prepared_zero_shard_update(3); +EXECUTE prepared_zero_shard_update(4); +EXECUTE prepared_zero_shard_update(5); +EXECUTE prepared_zero_shard_update(6); +EXECUTE prepared_zero_shard_update(7); + +-- same test with fast-path disabled +SET citus.enable_fast_path_router_planner TO FALSE; + +EXECUTE prepared_zero_shard_select(1); +EXECUTE prepared_zero_shard_select(2); + +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); + +DEALLOCATE prepared_zero_shard_select; +DEALLOCATE prepared_zero_shard_update; + +PREPARE prepared_zero_shard_select(int) AS SELECT count(*) FROM modify_fast_path WHERE key = $1 AND false; +PREPARE prepared_zero_shard_update(int) AS UPDATE modify_fast_path SET value_1 = 1 WHERE key = $1 AND false; + +EXECUTE prepared_zero_shard_select(1); +EXECUTE prepared_zero_shard_select(2); +EXECUTE prepared_zero_shard_select(3); +EXECUTE prepared_zero_shard_select(4); +EXECUTE prepared_zero_shard_select(5); +EXECUTE prepared_zero_shard_select(6); +EXECUTE prepared_zero_shard_select(7); + +EXECUTE prepared_zero_shard_update(1); +EXECUTE prepared_zero_shard_update(2); +EXECUTE prepared_zero_shard_update(3); +EXECUTE prepared_zero_shard_update(4); +EXECUTE prepared_zero_shard_update(5); +EXECUTE prepared_zero_shard_update(6); +EXECUTE prepared_zero_shard_update(7); + +-- same test with fast-path disabled +-- reset back to the original value, in case any new test comes after +RESET citus.enable_fast_path_router_planner; + +RESET client_min_messages; +RESET citus.log_remote_commands; DROP SCHEMA fast_path_router_modify CASCADE; diff --git a/src/test/regress/sql/multi_prune_shard_list.sql b/src/test/regress/sql/multi_prune_shard_list.sql index 34df549ae..192768bfe 100644 --- a/src/test/regress/sql/multi_prune_shard_list.sql +++ b/src/test/regress/sql/multi_prune_shard_list.sql @@ -145,5 +145,37 @@ SELECT * FROM coerce_hash WHERE id = 1.0; SELECT * FROM coerce_hash WHERE id = 1.0::numeric; +-- same queries with PREPARE +PREPARE coerce_bigint(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::bigint; +PREPARE coerce_numeric(bigint) AS SELECT * FROM coerce_hash WHERE id=$1::numeric; +PREPARE coerce_numeric_2(numeric) AS SELECT * FROM coerce_hash WHERE id=$1; + +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); +EXECUTE coerce_bigint(1); + +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); +EXECUTE coerce_numeric(1); + + +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); +EXECUTE coerce_numeric_2(1); + + SET search_path TO public; DROP SCHEMA prune_shard_list CASCADE; diff --git a/src/test/regress/sql/multi_sql_function.sql b/src/test/regress/sql/multi_sql_function.sql index fc191b228..4b882608f 100644 --- a/src/test/regress/sql/multi_sql_function.sql +++ b/src/test/regress/sql/multi_sql_function.sql @@ -146,9 +146,12 @@ INSERT INTO test_parameterized_sql VALUES(1, 1); -- all of them should fail SELECT * FROM test_parameterized_sql_function(1); -SELECT test_parameterized_sql_function(1); SELECT test_parameterized_sql_function_in_subquery_where(1); +-- postgres behaves slightly differently for the following +-- query where the target list is empty +SELECT test_parameterized_sql_function(1); + -- test that sql function calls are treated as multi-statement transactions -- and are rolled back properly. Single-row inserts for not-replicated tables -- don't go over 2PC if they are not part of a bigger transaction. diff --git a/src/test/regress/sql/multi_utility_statements.sql b/src/test/regress/sql/multi_utility_statements.sql index 9a2b45f79..8a72c206d 100644 --- a/src/test/regress/sql/multi_utility_statements.sql +++ b/src/test/regress/sql/multi_utility_statements.sql @@ -145,6 +145,30 @@ $$ LANGUAGE SQL; SELECT declares_cursor(5); +-- Test DECLARE CURSOR .. WITH HOLD without parameter +CREATE OR REPLACE FUNCTION declares_cursor_2() +RETURNS void AS $$ + DECLARE c2 CURSOR WITH HOLD FOR SELECT * FROM cursor_me; +$$ LANGUAGE SQL; + +SELECT declares_cursor_2(); + +-- Test DECLARE CURSOR .. WITH HOLD with parameter on non-dist key +CREATE OR REPLACE FUNCTION declares_cursor_3(p int) +RETURNS void AS $$ + DECLARE c3 CURSOR WITH HOLD FOR SELECT * FROM cursor_me WHERE y = $1; +$$ LANGUAGE SQL; + +SELECT declares_cursor_3(19); + +-- Test DECLARE CURSOR .. WITH HOLD with parameter on dist key, but not fast-path planner +CREATE OR REPLACE FUNCTION declares_cursor_4(p int) +RETURNS void AS $$ + DECLARE c4 CURSOR WITH HOLD FOR SELECT *, (SELECT 1) FROM cursor_me WHERE x = $1; +$$ LANGUAGE SQL; + +SELECT declares_cursor_4(19); + CREATE OR REPLACE FUNCTION cursor_plpgsql(p int) RETURNS SETOF int AS $$ DECLARE