mirror of https://github.com/citusdata/citus.git
Avoid query deparse and planning of shard query in local execution.
If a fast past query resolves to a shard that is local to the node planning the query, a shortcut can be taken so that the OID of the shard is plugged into the parse tree, which is then put through the Postgres planner. The task query uses that plan instead of deparsing and compiling a shard query.colm/single_shard_local_exec
parent
5deaf9a616
commit
3cbcf40cea
|
@ -686,7 +686,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
|
|||
relationShardList,
|
||||
placementList,
|
||||
shardId,
|
||||
isLocalTableModification);
|
||||
isLocalTableModification,
|
||||
false);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
{
|
||||
int taskNumParams = numParams;
|
||||
Oid *taskParameterTypes = parameterTypes;
|
||||
int taskType = GetTaskQueryType(task);
|
||||
|
||||
if (task->parametersInQueryStringResolved)
|
||||
{
|
||||
|
@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
* for concatenated strings, we set queryStringList so that we can access
|
||||
* each query string.
|
||||
*/
|
||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
|
||||
if (taskType == TASK_QUERY_TEXT_LIST)
|
||||
{
|
||||
List *queryStringList = task->taskQuery.data.queryStringList;
|
||||
totalRowsProcessed +=
|
||||
|
@ -342,11 +343,12 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
continue;
|
||||
}
|
||||
|
||||
if (taskType != TASK_QUERY_LOCAL_PLAN)
|
||||
{
|
||||
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
||||
taskParameterTypes,
|
||||
taskNumParams);
|
||||
|
||||
|
||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||
|
||||
/*
|
||||
|
@ -359,6 +361,14 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
*/
|
||||
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(DEBUG2, (errmsg(
|
||||
"Local executor: Using task's cached local plan for task %u",
|
||||
task->taskId)));
|
||||
localPlan = TaskQueryLocalPlan(task);
|
||||
}
|
||||
}
|
||||
|
||||
char *shardQueryString = NULL;
|
||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT)
|
||||
|
|
|
@ -439,6 +439,27 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
|
|||
}
|
||||
|
||||
|
||||
void
|
||||
SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan)
|
||||
{
|
||||
Assert(localPlan != NULL);
|
||||
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
|
||||
task->taskQuery.data.localCompiled = (LocalCompilation *) palloc0(
|
||||
sizeof(LocalCompilation));
|
||||
task->taskQuery.data.localCompiled->query = query;
|
||||
task->taskQuery.data.localCompiled->plan = localPlan;
|
||||
task->queryCount = 1;
|
||||
}
|
||||
|
||||
|
||||
PlannedStmt *
|
||||
TaskQueryLocalPlan(Task *task)
|
||||
{
|
||||
Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN);
|
||||
return task->taskQuery.data.localCompiled->plan;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeparseTaskQuery is a general way of deparsing a query based on a task.
|
||||
*/
|
||||
|
@ -524,6 +545,26 @@ TaskQueryString(Task *task)
|
|||
{
|
||||
return task->taskQuery.data.queryStringLazy;
|
||||
}
|
||||
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
|
||||
{
|
||||
Query *query = task->taskQuery.data.localCompiled->query;
|
||||
Assert(query != NULL);
|
||||
|
||||
/*
|
||||
* Use the query of the local compilation to generate the
|
||||
* query string. For local compiled tasks, the query is retained
|
||||
* for this purpose, which may be EXPLAIN ANALYZing the task, or
|
||||
* command logging. Generating the query string on the fly is
|
||||
* acceptable because the plan of the local compilation is used
|
||||
* for query execution.
|
||||
*/
|
||||
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
|
||||
query));
|
||||
UpdateRelationToShardNames((Node *) query, task->relationShardList);
|
||||
MemoryContextSwitchTo(previousContext);
|
||||
return AnnotateQuery(DeparseTaskQuery(task, query),
|
||||
task->partitionKeyValue, task->colocationId);
|
||||
}
|
||||
|
||||
Query *jobQueryReferenceForLazyDeparsing =
|
||||
task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
|
||||
|
|
|
@ -135,13 +135,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
|
|||
Const *resultFormatConst);
|
||||
static List * OuterPlanParamsList(PlannerInfo *root);
|
||||
static List * CopyPlanParamList(List *originalPlanParamList);
|
||||
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
|
||||
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(
|
||||
FastPathRestrictionContext *fastPathContext);
|
||||
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
|
||||
static void PopPlannerRestrictionContext(void);
|
||||
static void ResetPlannerRestrictionContext(
|
||||
PlannerRestrictionContext *plannerRestrictionContext);
|
||||
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
||||
Node *distributionKeyValue);
|
||||
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
|
||||
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
||||
int rteIdCounter);
|
||||
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
||||
|
@ -166,7 +166,7 @@ distributed_planner(Query *parse,
|
|||
{
|
||||
bool needsDistributedPlanning = false;
|
||||
bool fastPathRouterQuery = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
FastPathRestrictionContext fastPathContext = { 0 };
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||
|
||||
|
@ -191,8 +191,7 @@ distributed_planner(Query *parse,
|
|||
&maybeHasForeignDistributedTable);
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
|
||||
|
||||
fastPathRouterQuery = FastPathRouterQuery(parse, &fastPathContext);
|
||||
if (maybeHasForeignDistributedTable)
|
||||
{
|
||||
WarnIfListHasForeignDistributedTable(rangeTableList);
|
||||
|
@ -247,8 +246,9 @@ distributed_planner(Query *parse,
|
|||
*/
|
||||
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
|
||||
|
||||
/* create a restriction context and put it at the end if context list */
|
||||
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
||||
/* create a restriction context and put it at the end of context list */
|
||||
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(
|
||||
&fastPathContext);
|
||||
|
||||
/*
|
||||
* We keep track of how many times we've recursed into the planner, primarily
|
||||
|
@ -264,7 +264,7 @@ distributed_planner(Query *parse,
|
|||
{
|
||||
if (fastPathRouterQuery)
|
||||
{
|
||||
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
|
||||
result = PlanFastPathDistributedStmt(&planContext);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -649,30 +649,21 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
|
|||
* the FastPathPlanner.
|
||||
*/
|
||||
static PlannedStmt *
|
||||
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
||||
Node *distributionKeyValue)
|
||||
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
|
||||
{
|
||||
FastPathRestrictionContext *fastPathContext =
|
||||
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
||||
Assert(fastPathContext != NULL);
|
||||
Assert(fastPathContext->fastPathRouterQuery);
|
||||
|
||||
planContext->plannerRestrictionContext->fastPathRestrictionContext->
|
||||
fastPathRouterQuery = true;
|
||||
FastPathPreprocessParseTree(planContext->query);
|
||||
|
||||
if (distributionKeyValue == NULL)
|
||||
if (!fastPathContext->delayFastPathPlanning)
|
||||
{
|
||||
/* nothing to record */
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Const))
|
||||
{
|
||||
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Param))
|
||||
{
|
||||
fastPathContext->distributionKeyHasParam = true;
|
||||
}
|
||||
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
}
|
||||
|
||||
return CreateDistributedPlannedStmt(planContext);
|
||||
}
|
||||
|
@ -803,6 +794,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
|||
RaiseDeferredError(distributedPlan->planningError, ERROR);
|
||||
}
|
||||
|
||||
CheckAndBuildDelayedFastPathPlan(planContext, distributedPlan);
|
||||
|
||||
/* remember the plan's identifier for identifying subplans */
|
||||
distributedPlan->planId = planId;
|
||||
|
||||
|
@ -2407,13 +2400,15 @@ CopyPlanParamList(List *originalPlanParamList)
|
|||
|
||||
|
||||
/*
|
||||
* CreateAndPushPlannerRestrictionContext creates a new relation restriction context
|
||||
* and a new join context, inserts it to the beginning of the
|
||||
* plannerRestrictionContextList. Finally, the planner restriction context is
|
||||
* inserted to the beginning of the plannerRestrictionContextList and it is returned.
|
||||
* CreateAndPushPlannerRestrictionContext creates a new planner restriction
|
||||
* context with an empty relation restriction context and an empty join and
|
||||
* a copy of the given fast path restriction context (if present). Finally,
|
||||
* the planner restriction context is inserted to the beginning of the
|
||||
* global plannerRestrictionContextList and it is returned.
|
||||
*/
|
||||
static PlannerRestrictionContext *
|
||||
CreateAndPushPlannerRestrictionContext(void)
|
||||
CreateAndPushPlannerRestrictionContext(
|
||||
FastPathRestrictionContext *fastPathRestrictionContext)
|
||||
{
|
||||
PlannerRestrictionContext *plannerRestrictionContext =
|
||||
palloc0(sizeof(PlannerRestrictionContext));
|
||||
|
@ -2427,6 +2422,21 @@ CreateAndPushPlannerRestrictionContext(void)
|
|||
plannerRestrictionContext->fastPathRestrictionContext =
|
||||
palloc0(sizeof(FastPathRestrictionContext));
|
||||
|
||||
if (fastPathRestrictionContext != NULL)
|
||||
{
|
||||
/* copy the given fast path restriction context */
|
||||
FastPathRestrictionContext *plannersFastPathCtx =
|
||||
plannerRestrictionContext->fastPathRestrictionContext;
|
||||
plannersFastPathCtx->fastPathRouterQuery =
|
||||
fastPathRestrictionContext->fastPathRouterQuery;
|
||||
plannersFastPathCtx->distributionKeyValue =
|
||||
fastPathRestrictionContext->distributionKeyValue;
|
||||
plannersFastPathCtx->distributionKeyHasParam =
|
||||
fastPathRestrictionContext->distributionKeyHasParam;
|
||||
plannersFastPathCtx->delayFastPathPlanning =
|
||||
fastPathRestrictionContext->delayFastPathPlanning;
|
||||
}
|
||||
|
||||
plannerRestrictionContext->memoryContext = CurrentMemoryContext;
|
||||
|
||||
/* we'll apply logical AND as we add tables */
|
||||
|
|
|
@ -43,6 +43,7 @@
|
|||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
#include "distributed/citus_clauses.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/insert_select_planner.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -53,6 +54,7 @@
|
|||
#include "distributed/shardinterval_utils.h"
|
||||
|
||||
bool EnableFastPathRouterPlanner = true;
|
||||
bool EnableFastPathLocalExecutor = true;
|
||||
|
||||
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
|
||||
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
|
||||
|
@ -61,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
|||
Var *column,
|
||||
Node **distributionKeyValue);
|
||||
|
||||
void
|
||||
FastPathPreprocessParseTree(Query *parse)
|
||||
{
|
||||
/*
|
||||
* Citus planner relies on some of the transformations on constant
|
||||
* evaluation on the parse tree.
|
||||
*/
|
||||
parse->targetList =
|
||||
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
||||
parse->jointree->quals =
|
||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FastPathPlanner is intended to be used instead of standard_planner() for trivial
|
||||
|
@ -73,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
|||
PlannedStmt *
|
||||
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
|
||||
{
|
||||
/*
|
||||
* Citus planner relies on some of the transformations on constant
|
||||
* evaluation on the parse tree.
|
||||
*/
|
||||
parse->targetList =
|
||||
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
||||
parse->jointree->quals =
|
||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||
|
||||
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
||||
|
||||
return result;
|
||||
|
@ -112,10 +118,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
|||
Plan *plan = &scanNode->plan;
|
||||
#endif
|
||||
|
||||
Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
|
||||
|
||||
Assert(FastPathRouterQuery(parse, &distKey));
|
||||
|
||||
/* there is only a single relation rte */
|
||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||
scanNode->scan.scanrelid = 1;
|
||||
|
@ -150,26 +152,78 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
InitializeFastPathContext(FastPathRestrictionContext *fastPathContext,
|
||||
Node *distributionKeyValue,
|
||||
bool canAvoidDeparse,
|
||||
Query *query)
|
||||
{
|
||||
Assert(fastPathContext != NULL);
|
||||
Assert(!fastPathContext->fastPathRouterQuery);
|
||||
Assert(!fastPathContext->delayFastPathPlanning);
|
||||
|
||||
/*
|
||||
* We're looking at a fast path query, so we can fill the
|
||||
* fastPathContext with relevant details.
|
||||
*/
|
||||
fastPathContext->fastPathRouterQuery = true;
|
||||
if (distributionKeyValue == NULL)
|
||||
{
|
||||
/* nothing to record */
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Const))
|
||||
{
|
||||
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Param))
|
||||
{
|
||||
fastPathContext->distributionKeyHasParam = true;
|
||||
}
|
||||
|
||||
if (EnableFastPathLocalExecutor)
|
||||
{
|
||||
/*
|
||||
* This fast path query may be executed by the local executor.
|
||||
* We need to delay the fast path planning until we know if the
|
||||
* shard is local or not. Make a final check for volatile
|
||||
* functions in the query tree to determine if we should delay
|
||||
* the fast path planning.
|
||||
*/
|
||||
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
|
||||
!FindNodeMatchingCheckFunction(
|
||||
(Node *) query,
|
||||
CitusIsVolatileFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FastPathRouterQuery gets a query and returns true if the query is eligible for
|
||||
* being a fast path router query.
|
||||
* being a fast path router query. It also fills the given fastPathContext with
|
||||
* details about the query such as the distribution key value (if available),
|
||||
* whether the distribution key is a parameter, and the range table entry for the
|
||||
* table being queried.
|
||||
* The requirements for the fast path query can be listed below:
|
||||
*
|
||||
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
|
||||
* - The query should touch only a single hash distributed or reference table
|
||||
* - The distribution with equality operator should be in the WHERE clause
|
||||
* and it should be ANDed with any other filters. Also, the distribution
|
||||
* key should only exists once in the WHERE clause. So basically,
|
||||
* key should only exist once in the WHERE clause. So basically,
|
||||
* SELECT ... FROM dist_table WHERE dist_key = X
|
||||
* If the filter is a const, distributionKeyValue is set
|
||||
* - All INSERT statements (including multi-row INSERTs) as long as the commands
|
||||
* don't have any sublinks/CTEs etc
|
||||
* -
|
||||
*/
|
||||
bool
|
||||
FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
||||
FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
||||
{
|
||||
FromExpr *joinTree = query->jointree;
|
||||
Node *quals = NULL;
|
||||
bool isFastPath = false;
|
||||
bool canAvoidDeparse = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
|
||||
if (!EnableFastPathRouterPlanner)
|
||||
{
|
||||
|
@ -201,6 +255,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
|||
else if (query->commandType == CMD_INSERT)
|
||||
{
|
||||
/* we don't need to do any further checks, all INSERTs are fast-path */
|
||||
InitializeFastPathContext(fastPathContext, NULL, true, query);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -232,18 +287,23 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
|||
Var *distributionKey = PartitionColumn(distributedTableId, 1);
|
||||
if (!distributionKey)
|
||||
{
|
||||
return true;
|
||||
/* Local execution may avoid a deparse on single shard distributed tables */
|
||||
canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry,
|
||||
SINGLE_SHARD_DISTRIBUTED);
|
||||
isFastPath = true;
|
||||
}
|
||||
|
||||
/* WHERE clause should not be empty for distributed tables */
|
||||
if (joinTree == NULL ||
|
||||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
|
||||
NULL))
|
||||
if (!isFastPath)
|
||||
{
|
||||
canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE);
|
||||
|
||||
if (joinTree == NULL ||
|
||||
(joinTree->quals == NULL && !canAvoidDeparse))
|
||||
{
|
||||
/* no quals, not a fast path query */
|
||||
return false;
|
||||
}
|
||||
|
||||
/* convert list of expressions into expression tree for further processing */
|
||||
quals = joinTree->quals;
|
||||
if (quals != NULL && IsA(quals, List))
|
||||
{
|
||||
|
@ -264,13 +324,18 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
|||
* This is to simplify both of the individual checks and omit various edge cases
|
||||
* that might arise with multiple distribution keys in the quals.
|
||||
*/
|
||||
if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) &&
|
||||
!ColumnAppearsMultipleTimes(quals, distributionKey))
|
||||
{
|
||||
return true;
|
||||
isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey,
|
||||
&distributionKeyValue) &&
|
||||
!ColumnAppearsMultipleTimes(quals, distributionKey));
|
||||
}
|
||||
|
||||
return false;
|
||||
if (isFastPath)
|
||||
{
|
||||
InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse,
|
||||
query);
|
||||
}
|
||||
|
||||
return isFastPath;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "access/stratnum.h"
|
||||
#include "access/tupdesc.h"
|
||||
#include "access/tupdesc_details.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_opfamily.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
|
@ -34,6 +36,7 @@
|
|||
#include "optimizer/pathnode.h"
|
||||
#include "optimizer/paths.h"
|
||||
#include "optimizer/planmain.h"
|
||||
#include "optimizer/planner.h"
|
||||
#include "optimizer/restrictinfo.h"
|
||||
#include "parser/parse_oper.h"
|
||||
#include "parser/parsetree.h"
|
||||
|
@ -81,6 +84,7 @@
|
|||
#include "distributed/relay_utility.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/shardinterval_utils.h"
|
||||
|
||||
/* intermediate value for INSERT processing */
|
||||
|
@ -164,7 +168,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId,
|
|||
List *relationShardList, List *placementList,
|
||||
uint64 shardId, bool parametersInQueryResolved,
|
||||
bool isLocalTableModification, Const *partitionKeyValue,
|
||||
int colocationId);
|
||||
int colocationId, bool delayedFastPath);
|
||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||
TaskAssignmentPolicyType
|
||||
|
@ -173,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
|||
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
||||
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
||||
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
||||
|
||||
static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
|
||||
|
||||
/*
|
||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||
|
@ -1940,7 +1944,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
{
|
||||
GenerateSingleShardRouterTaskList(job, relationShardList,
|
||||
placementList, shardId,
|
||||
isLocalTableModification);
|
||||
isLocalTableModification,
|
||||
fastPathRestrictionContext->
|
||||
delayFastPathPlanning);
|
||||
}
|
||||
|
||||
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
||||
|
@ -1948,6 +1954,258 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckAttributesMatch checks if the attributes of the Citus table and the shard
|
||||
* table match.
|
||||
*
|
||||
* It is used to ensure that the shard table has the same schema as the Citus
|
||||
* table before replacing the Citus table OID with the shard table OID in the
|
||||
* parse tree we (Citus planner) recieved from Postgres.
|
||||
*/
|
||||
static
|
||||
bool
|
||||
CheckAttributesMatch(Oid citusTableId, Oid shardTableId)
|
||||
{
|
||||
Relation citusR, shardR;
|
||||
bool same_schema = false;
|
||||
|
||||
citusR = RelationIdGetRelation(citusTableId);
|
||||
shardR = RelationIdGetRelation(shardTableId);
|
||||
|
||||
if (RelationIsValid(citusR) && RelationIsValid(shardR))
|
||||
{
|
||||
TupleDesc citusTupDesc = citusR->rd_att;
|
||||
TupleDesc shardTupDesc = shardR->rd_att;
|
||||
|
||||
if (citusTupDesc->natts == shardTupDesc->natts)
|
||||
{
|
||||
/*
|
||||
* Do an attribute-by-attribute comparison. This is borrowed from
|
||||
* the Postgres function equalTupleDescs(), which we cannot use
|
||||
* because the citus table and shard table have different composite
|
||||
* types.
|
||||
*/
|
||||
same_schema = true;
|
||||
for (int i = 0; i < citusTupDesc->natts && same_schema; i++)
|
||||
{
|
||||
Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i);
|
||||
Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i);
|
||||
|
||||
if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->atttypid != attr2->atttypid)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->atttypmod != attr2->atttypmod)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->attcollation != attr2->attcollation)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
|
||||
/* Record types derived from tables could have dropped fields. */
|
||||
if (attr1->attisdropped != attr2->attisdropped)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RelationClose(citusR);
|
||||
RelationClose(shardR);
|
||||
return same_schema;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckAndBuildDelayedFastPathPlan() - if the query being planned is a fast
|
||||
* path query, not marked for deferred pruning and the placement for the task
|
||||
* is not a dummy placement then if the placement is local to this node we can
|
||||
* take a shortcut of replacing the OID of the citus table with the OID of the
|
||||
* shard in the query tree and plan that directly, instead of deparsing the
|
||||
* parse tree to a SQL query on the shard and parsing and planning that in
|
||||
* the local executor. Instead, the local executor can use the plan created
|
||||
* here.
|
||||
*/
|
||||
void
|
||||
CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
||||
DistributedPlan *plan)
|
||||
{
|
||||
FastPathRestrictionContext *fastPathContext =
|
||||
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
||||
|
||||
if (!fastPathContext->delayFastPathPlanning)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
Job *job = plan->workerJob;
|
||||
Assert(job != NULL);
|
||||
|
||||
if (job->deferredPruning)
|
||||
{
|
||||
/* Execution time pruning => don't know which shard at this point */
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
return;
|
||||
}
|
||||
|
||||
List *tasks = job->taskList;
|
||||
Assert(list_length(tasks) == 1);
|
||||
Task *task = (Task *) linitial(tasks);
|
||||
List *placements = task->taskPlacementList;
|
||||
Assert(list_length(placements) > 0);
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements);
|
||||
|
||||
bool isLocalExecution = !IsDummyPlacement(primaryPlacement) &&
|
||||
(primaryPlacement->groupId == localGroupId);
|
||||
bool canBuildLocalPlan = true;
|
||||
|
||||
if (isLocalExecution)
|
||||
{
|
||||
List *relationShards = task->relationShardList;
|
||||
Assert(list_length(relationShards) == 1);
|
||||
RelationShard *relationShard = (RelationShard *) linitial(relationShards);
|
||||
Assert(relationShard->shardId == primaryPlacement->shardId);
|
||||
|
||||
canBuildLocalPlan = ConvertToQueryOnShard(planContext->query,
|
||||
relationShard->relationId,
|
||||
relationShard->shardId);
|
||||
if (canBuildLocalPlan)
|
||||
{
|
||||
/* Plan the query with the new shard relation id */
|
||||
planContext->plan = standard_planner(planContext->query, NULL,
|
||||
planContext->cursorOptions,
|
||||
planContext->boundParams);
|
||||
SetTaskQueryPlan(task, job->jobQuery, planContext->plan);
|
||||
|
||||
ereport(DEBUG2, (errmsg(
|
||||
"Fast-path router query: created local execution plan "
|
||||
"to avoid deparse to and compile of shard query")));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Either the shard is not local to this node, or it was not safe to replace
|
||||
* the OIDs in the parse tree; in any case we fall back to generating the shard
|
||||
* query and compiling that.
|
||||
*/
|
||||
Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan));
|
||||
|
||||
/* Fall back to fast path planner and generating SQL query on the shard */
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConvertToQueryOnShard() converts the given query on a citus table (identified by
|
||||
* citusTableOid) to a query on a shard (identified by shardId).
|
||||
*
|
||||
* The function assumes that the query is a "fast path" query - it has only one
|
||||
* RangeTblEntry and one RTEPermissionInfo.
|
||||
*
|
||||
* It acquires the same lock on the shard that was acquired on the citus table
|
||||
* by the Postgres parser. It checks that the attribute numbers and metadata of
|
||||
* the shard table and citus table are identical - otherwise it is not safe
|
||||
* to proceed with this shortcut. Assuming the attributes do match, the actual
|
||||
* conversion involves changing the target list entries that reference the
|
||||
* citus table's oid to reference the shard's relation id instead. Finally,
|
||||
* it changes the RangeTblEntry's relid to the shard's relation id and (PG16+)
|
||||
* changes the RTEPermissionInfo's relid to the shard's relation id also.
|
||||
* At this point the Query is ready for the postgres planner.
|
||||
*/
|
||||
static bool
|
||||
ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
||||
{
|
||||
Assert(list_length(query->rtable) == 1);
|
||||
RangeTblEntry *citusTableRte = (RangeTblEntry *) linitial(query->rtable);
|
||||
Assert(citusTableRte->relid == citusTableOid);
|
||||
Assert(list_length(query->rteperminfos) == 1);
|
||||
|
||||
const char *citusTableName = get_rel_name(citusTableOid);
|
||||
Assert(citusTableName != NULL);
|
||||
|
||||
/* construct shard relation name */
|
||||
char *shardRelationName = pstrdup(citusTableName);
|
||||
AppendShardIdToName(&shardRelationName, shardId);
|
||||
|
||||
/* construct the schema name */
|
||||
char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid));
|
||||
|
||||
/* now construct a range variable for the shard */
|
||||
RangeVar shardRangeVar = {
|
||||
.relname = shardRelationName,
|
||||
.schemaname = schemaName,
|
||||
.inh = citusTableRte->inh,
|
||||
.relpersistence = RELPERSISTENCE_PERMANENT,
|
||||
};
|
||||
|
||||
/* Must apply the same lock to the shard that was applied to the citus table */
|
||||
Oid shardRelationId = RangeVarGetRelidExtended(&shardRangeVar,
|
||||
citusTableRte->rellockmode,
|
||||
0, NULL, NULL); /* todo - use suitable callback for perms check? */
|
||||
|
||||
/* Verify that the attributes of citus table and shard table match */
|
||||
if (!CheckAttributesMatch(citusTableOid, shardRelationId))
|
||||
{
|
||||
/* There is a difference between the attributes of the citus
|
||||
* table and the shard table. This can happen if there is a DROP
|
||||
* COLUMN on the citus table. In this case, we cannot
|
||||
* convert the query to a shard query, so clean up and return.
|
||||
*/
|
||||
UnlockRelationOid(shardRelationId, citusTableRte->rellockmode);
|
||||
ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg(
|
||||
"Router planner fast path cannot modify parse tree for local execution: shard table \"%s.%s\" does not match the "
|
||||
"distributed table \"%s.%s\"",
|
||||
schemaName, shardRelationName, schemaName,
|
||||
citusTableName)));
|
||||
pfree(shardRelationName);
|
||||
pfree(schemaName);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Change the target list entries that reference the original citus table's relation id */
|
||||
ListCell *lc = NULL;
|
||||
foreach(lc, query->targetList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(lc);
|
||||
if (targetEntry->resorigtbl == citusTableOid)
|
||||
{
|
||||
targetEntry->resorigtbl = shardRelationId;
|
||||
}
|
||||
}
|
||||
|
||||
/* Change the range table entry's oid to that of the shard's */
|
||||
Assert(shardRelationId != InvalidOid);
|
||||
citusTableRte->relid = shardRelationId;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||
|
||||
/* Change the range table permission oid to that of the shard's (PG16+) */
|
||||
Assert(list_length(query->rteperminfos) == 1);
|
||||
RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos);
|
||||
rtePermInfo->relid = shardRelationId;
|
||||
#endif
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateSingleShardRouterTaskList is a wrapper around other corresponding task
|
||||
* list generation functions specific to single shard selects and modifications.
|
||||
|
@ -1957,7 +2215,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
void
|
||||
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||
List *placementList, uint64 shardId, bool
|
||||
isLocalTableModification)
|
||||
isLocalTableModification, bool delayedFastPath)
|
||||
{
|
||||
Query *originalQuery = job->jobQuery;
|
||||
|
||||
|
@ -1970,7 +2228,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification,
|
||||
job->partitionKeyValue, job->colocationId);
|
||||
job->partitionKeyValue, job->colocationId,
|
||||
delayedFastPath);
|
||||
|
||||
/*
|
||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||
|
@ -2001,7 +2260,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
|||
shardId,
|
||||
job->parametersInJobQueryResolved,
|
||||
isLocalTableModification,
|
||||
job->partitionKeyValue, job->colocationId);
|
||||
job->partitionKeyValue, job->colocationId,
|
||||
delayedFastPath);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2096,7 +2356,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
List *placementList, uint64 shardId,
|
||||
bool parametersInQueryResolved,
|
||||
bool isLocalTableModification, Const *partitionKeyValue,
|
||||
int colocationId)
|
||||
int colocationId, bool delayedFastPath)
|
||||
{
|
||||
TaskType taskType = READ_TASK;
|
||||
char replicationModel = 0;
|
||||
|
@ -2168,7 +2428,10 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
|||
task->taskPlacementList = placementList;
|
||||
task->partitionKeyValue = partitionKeyValue;
|
||||
task->colocationId = colocationId;
|
||||
if (!delayedFastPath)
|
||||
{
|
||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||
}
|
||||
task->anchorShardId = shardId;
|
||||
task->jobId = jobId;
|
||||
task->relationShardList = relationShardList;
|
||||
|
@ -2449,10 +2712,15 @@ PlanRouterQuery(Query *originalQuery,
|
|||
|
||||
/*
|
||||
* If this is an UPDATE or DELETE query which requires coordinator evaluation,
|
||||
* don't try update shard names, and postpone that to execution phase.
|
||||
* don't try update shard names, and postpone that to execution phase. Also, if
|
||||
* this is a delayed fast path query, we don't update the shard names
|
||||
* either, as the shard names will be updated in the fast path query planner.
|
||||
*/
|
||||
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
|
||||
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)))
|
||||
bool delayedFastPath =
|
||||
plannerRestrictionContext->fastPathRestrictionContext->delayFastPathPlanning;
|
||||
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)) &&
|
||||
!delayedFastPath)
|
||||
{
|
||||
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
||||
}
|
||||
|
|
|
@ -212,6 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void);
|
|||
static const char * LocalPoolSizeGucShowHook(void);
|
||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||
source);
|
||||
static bool WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source);
|
||||
static void CitusAuthHook(Port *port, int status);
|
||||
static bool IsSuperuser(char *userName);
|
||||
static void AdjustDynamicLibraryPathForCdcDecoders(void);
|
||||
|
@ -1377,6 +1378,17 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_local_execution_local_plan",
|
||||
gettext_noop("Enables the planner to avoid a query deparse and planning if "
|
||||
"the shard is local to the current node."),
|
||||
NULL,
|
||||
&EnableFastPathLocalExecutor,
|
||||
true,
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||
WarnIfLocalExecutionDisabled, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_local_reference_table_foreign_keys",
|
||||
gettext_noop("Enables foreign keys from/to local tables"),
|
||||
|
@ -2802,6 +2814,21 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
|
|||
}
|
||||
|
||||
|
||||
static bool
|
||||
WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source)
|
||||
{
|
||||
if (*newval == true && EnableLocalExecution == false)
|
||||
{
|
||||
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg(
|
||||
"citus.enable_local_execution must be set in order for "
|
||||
"citus.enable_local_execution_local_plan to be effective.")));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NoticeIfSubqueryPushdownEnabled prints a notice when a user sets
|
||||
* citus.subquery_pushdown to ON. It doesn't print the notice if the
|
||||
|
|
|
@ -287,6 +287,15 @@ CopyTaskQuery(Task *newnode, Task *from)
|
|||
break;
|
||||
}
|
||||
|
||||
case TASK_QUERY_LOCAL_PLAN:
|
||||
{
|
||||
newnode->taskQuery.data.localCompiled =
|
||||
(LocalCompilation *) palloc0(sizeof(LocalCompilation));
|
||||
COPY_NODE_FIELD(taskQuery.data.localCompiled->plan);
|
||||
COPY_NODE_FIELD(taskQuery.data.localCompiled->query);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
break;
|
||||
|
|
|
@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
|||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||
extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan);
|
||||
extern char * TaskQueryString(Task *task);
|
||||
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
|
||||
extern char * TaskQueryStringAtIndex(Task *task, int index);
|
||||
extern int GetTaskQueryType(Task *task);
|
||||
extern void AddInsertAliasIfNeeded(Query *query);
|
||||
|
|
|
@ -99,6 +99,12 @@ typedef struct FastPathRestrictionContext
|
|||
* Set to true when distKey = Param; in the queryTree
|
||||
*/
|
||||
bool distributionKeyHasParam;
|
||||
|
||||
/*
|
||||
* Indicates to hold off calling the fast path planner until its
|
||||
* known if the shard is local or not.
|
||||
*/
|
||||
bool delayFastPathPlanning;
|
||||
} FastPathRestrictionContext;
|
||||
|
||||
typedef struct PlannerRestrictionContext
|
||||
|
|
|
@ -174,9 +174,16 @@ typedef enum TaskQueryType
|
|||
TASK_QUERY_NULL,
|
||||
TASK_QUERY_TEXT,
|
||||
TASK_QUERY_OBJECT,
|
||||
TASK_QUERY_TEXT_LIST
|
||||
TASK_QUERY_TEXT_LIST,
|
||||
TASK_QUERY_LOCAL_PLAN,
|
||||
} TaskQueryType;
|
||||
|
||||
typedef struct LocalCompilation
|
||||
{
|
||||
PlannedStmt *plan; /* the local plan for this task */
|
||||
Query *query; /* query to deparse for EXPLAIN ANALYZE or local command logging */
|
||||
} LocalCompilation;
|
||||
|
||||
typedef struct TaskQuery
|
||||
{
|
||||
TaskQueryType queryType;
|
||||
|
@ -219,6 +226,15 @@ typedef struct TaskQuery
|
|||
* when we want to access each query string.
|
||||
*/
|
||||
List *queryStringList;
|
||||
|
||||
/*
|
||||
* For tasks that can be executed locally, this field contains the
|
||||
* local plan for the task. This is only set when the shard that the
|
||||
* task is assigned to is local to the node that executes the task.
|
||||
* The query field is used to deparse the query for EXPLAIN ANALYZE
|
||||
* or local command logging.
|
||||
*/
|
||||
LocalCompilation *localCompiled; /* only applies to local tasks */
|
||||
}data;
|
||||
}TaskQuery;
|
||||
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
|
||||
extern bool EnableRouterExecution;
|
||||
extern bool EnableFastPathRouterPlanner;
|
||||
extern bool EnableFastPathLocalExecutor;
|
||||
|
||||
extern bool EnableNonColocatedRouterQueryPushdown;
|
||||
|
||||
|
@ -91,16 +92,19 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
|
|||
List *relationShardList,
|
||||
List *placementList,
|
||||
uint64 shardId,
|
||||
bool isLocalTableModification);
|
||||
bool isLocalTableModification,
|
||||
bool delayedFastPath);
|
||||
|
||||
/*
|
||||
* FastPathPlanner is a subset of router planner, that's why we prefer to
|
||||
* keep the external function here.
|
||||
*/extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse);
|
||||
|
||||
extern void FastPathPreprocessParseTree(Query *parse);
|
||||
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
|
||||
boundParams);
|
||||
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue);
|
||||
extern bool FastPathRouterQuery(Query *query,
|
||||
FastPathRestrictionContext *fastPathContext);
|
||||
extern bool JoinConditionIsOnFalse(List *relOptInfo);
|
||||
extern Oid ResultRelationOidForQuery(Query *query);
|
||||
extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,
|
||||
|
@ -120,5 +124,7 @@ extern Job * RouterJob(Query *originalQuery,
|
|||
DeferredErrorMessage **planningError);
|
||||
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
|
||||
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
|
||||
extern void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
||||
DistributedPlan *plan);
|
||||
|
||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||
|
|
|
@ -0,0 +1,236 @@
|
|||
-- Test local execution with local plan in a sharded environment.
|
||||
-- This is an enhancement to local execution where instead of deparsing
|
||||
-- and compiling the shard query, the planner replaces the OID of the
|
||||
-- distributed table with the OID of the local shard in the parse tree
|
||||
-- and plans that.
|
||||
--
|
||||
-- https://github.com/citusdata/citus/pull/8035
|
||||
CREATE SCHEMA local_shard_execution_local_plan;
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
SET citus.next_shard_id TO 86000000;
|
||||
-- Test row-based sharding
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||
SELECT create_distributed_table('test_tbl', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||
setseed
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_tbl
|
||||
SELECT (random()*20)::int AS a,
|
||||
(random()*20)::int AS b,
|
||||
random()*10000.0 AS data_f
|
||||
FROM generate_series(1, 10000);
|
||||
-- Put the shard on worker 1 to ensure consistent test output across different schedules
|
||||
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1
|
||||
SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
wait_for_resource_cleanup
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- This query resolves to a single shard (aka fast path)
|
||||
-- which is located on worker_1; with client_min_messages
|
||||
-- at DEBUG2 we see a message that the planner is avoiding
|
||||
-- query deparse and plan
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
SET citus.enable_local_execution_local_plan TO OFF;
|
||||
-- With local execution local plan disabled, the same query
|
||||
-- does query deparse and planning of the shard query and
|
||||
-- provides the same results
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- Run the same query on the other worker - the local
|
||||
-- execution path is not taken because the shard is not
|
||||
-- local to this worker
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA local_shard_execution_local_plan CASCADE;
|
||||
NOTICE: drop cascades to table local_shard_execution_local_plan.test_tbl
|
||||
-- Now test local execution with local plan for a schema sharded table.
|
||||
SET citus.enable_schema_based_sharding to on;
|
||||
CREATE SCHEMA schema_sharding_test;
|
||||
SET search_path TO schema_sharding_test;
|
||||
SET citus.next_shard_id TO 87000000;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||
setseed
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO test_tbl
|
||||
SELECT (random()*20)::int AS a,
|
||||
(random()*20)::int AS b,
|
||||
random()*10000.0 AS data_f
|
||||
FROM generate_series(1, 10000);
|
||||
-- Put the shard on worker 2 to ensure consistent test output across different schedules
|
||||
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2
|
||||
SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||
citus_move_shard_placement
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
wait_for_resource_cleanup
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :worker_1_port
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- Run the test query on worker_1; with schema based sharding
|
||||
-- the data is not local to this worker so local execution
|
||||
-- path is not taken.
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- Run the test query on worker_2; with schema based sharding
|
||||
-- the data is local to this worker so local execution
|
||||
-- path is taken, and the planner avoids query deparse and
|
||||
-- planning of the shard query.
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
SET citus.enable_local_execution_local_plan TO OFF;
|
||||
-- Run the test query on worker_2 but with local execution
|
||||
-- local plan disabled; now the planner does query deparse
|
||||
-- and planning of the shard query.
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||
b | avg | min | max | count
|
||||
---------------------------------------------------------------------
|
||||
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||
(5 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA schema_sharding_test CASCADE;
|
||||
NOTICE: drop cascades to table schema_sharding_test.test_tbl
|
||||
RESET ALL;
|
|
@ -2410,8 +2410,10 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo
|
|||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -2420,8 +2422,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
|||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -2430,8 +2434,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
|||
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
INSERT INTO event_responses VALUES (16, 666, 'maybe')
|
||||
ON CONFLICT (event_id, user_id)
|
||||
DO UPDATE SET response = EXCLUDED.response RETURNING *;
|
||||
|
@ -2529,8 +2535,10 @@ SET citus.log_remote_commands TO ON;
|
|||
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 2
|
||||
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
event_id | user_id | response
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
|
|
@ -193,8 +193,102 @@ execute p4(8);
|
|||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||
execute p4(8);
|
||||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||
-- the dropped column means the attribute check between the distributed table and
|
||||
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||
-- for local execution", indicating that router planning has detected the difference.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :master_port
|
||||
-- one another combination is that the shell table
|
||||
-- one other combination is that the shell table
|
||||
-- has a dropped column but not the shard, via rebalance operation
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
ALTER TABLE t1 DROP COLUMN a;
|
||||
|
@ -204,6 +298,12 @@ SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localho
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
wait_for_resource_cleanup
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
-- show the dropped columns
|
||||
|
@ -331,6 +431,108 @@ execute p3(8);
|
|||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||
execute p3(8);
|
||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||
-- for this scenario (rebalance) also.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1
|
||||
|
|
|
@ -70,6 +70,7 @@ test: metadata_sync_helpers
|
|||
|
||||
test: issue_6592
|
||||
test: executor_local_failure
|
||||
test: local_execution_local_plan
|
||||
|
||||
# test that no tests leaked intermediate results. This should always be last
|
||||
test: ensure_no_intermediate_data_leak
|
||||
|
|
|
@ -0,0 +1,149 @@
|
|||
-- Test local execution with local plan in a sharded environment.
|
||||
-- This is an enhancement to local execution where instead of deparsing
|
||||
-- and compiling the shard query, the planner replaces the OID of the
|
||||
-- distributed table with the OID of the local shard in the parse tree
|
||||
-- and plans that.
|
||||
--
|
||||
-- https://github.com/citusdata/citus/pull/8035
|
||||
|
||||
CREATE SCHEMA local_shard_execution_local_plan;
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
|
||||
SET citus.next_shard_id TO 86000000;
|
||||
|
||||
-- Test row-based sharding
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||
SELECT create_distributed_table('test_tbl', 'a');
|
||||
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||
INSERT INTO test_tbl
|
||||
SELECT (random()*20)::int AS a,
|
||||
(random()*20)::int AS b,
|
||||
random()*10000.0 AS data_f
|
||||
FROM generate_series(1, 10000);
|
||||
|
||||
-- Put the shard on worker 1 to ensure consistent test output across different schedules
|
||||
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1
|
||||
SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
-- This query resolves to a single shard (aka fast path)
|
||||
-- which is located on worker_1; with client_min_messages
|
||||
-- at DEBUG2 we see a message that the planner is avoiding
|
||||
-- query deparse and plan
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
SET citus.enable_local_execution_local_plan TO OFF;
|
||||
|
||||
-- With local execution local plan disabled, the same query
|
||||
-- does query deparse and planning of the shard query and
|
||||
-- provides the same results
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO local_shard_execution_local_plan;
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
-- Run the same query on the other worker - the local
|
||||
-- execution path is not taken because the shard is not
|
||||
-- local to this worker
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
DROP SCHEMA local_shard_execution_local_plan CASCADE;
|
||||
|
||||
-- Now test local execution with local plan for a schema sharded table.
|
||||
|
||||
SET citus.enable_schema_based_sharding to on;
|
||||
CREATE SCHEMA schema_sharding_test;
|
||||
SET search_path TO schema_sharding_test;
|
||||
|
||||
SET citus.next_shard_id TO 87000000;
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||
INSERT INTO test_tbl
|
||||
SELECT (random()*20)::int AS a,
|
||||
(random()*20)::int AS b,
|
||||
random()*10000.0 AS data_f
|
||||
FROM generate_series(1, 10000);
|
||||
|
||||
-- Put the shard on worker 2 to ensure consistent test output across different schedules
|
||||
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2
|
||||
SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
-- Run the test query on worker_1; with schema based sharding
|
||||
-- the data is not local to this worker so local execution
|
||||
-- path is not taken.
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
\c - - - :worker_2_port
|
||||
|
||||
SET client_min_messages TO DEBUG2;
|
||||
SET citus.log_remote_commands TO ON;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
-- Run the test query on worker_2; with schema based sharding
|
||||
-- the data is local to this worker so local execution
|
||||
-- path is taken, and the planner avoids query deparse and
|
||||
-- planning of the shard query.
|
||||
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
SET citus.enable_local_execution_local_plan TO OFF;
|
||||
|
||||
-- Run the test query on worker_2 but with local execution
|
||||
-- local plan disabled; now the planner does query deparse
|
||||
-- and planning of the shard query.
|
||||
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||
FROM schema_sharding_test.test_tbl
|
||||
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||
GROUP BY b
|
||||
ORDER BY b;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
DROP SCHEMA schema_sharding_test CASCADE;
|
||||
RESET ALL;
|
|
@ -78,14 +78,37 @@ execute p4(8);
|
|||
execute p4(8);
|
||||
execute p4(8);
|
||||
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||
-- the dropped column means the attribute check between the distributed table and
|
||||
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||
-- for local execution", indicating that router planning has detected the difference.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
-- one another combination is that the shell table
|
||||
-- one other combination is that the shell table
|
||||
-- has a dropped column but not the shard, via rebalance operation
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
ALTER TABLE t1 DROP COLUMN a;
|
||||
|
||||
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
|
@ -132,5 +155,25 @@ execute p3(8);
|
|||
execute p3(8);
|
||||
execute p3(8);
|
||||
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||
-- for this scenario (rebalance) also.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||
|
|
Loading…
Reference in New Issue