mirror of https://github.com/citusdata/citus.git
Merge 3cbcf40cea
into 5deaf9a616
commit
5ba9e99dc3
|
@ -686,7 +686,8 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
|
||||||
relationShardList,
|
relationShardList,
|
||||||
placementList,
|
placementList,
|
||||||
shardId,
|
shardId,
|
||||||
isLocalTableModification);
|
isLocalTableModification,
|
||||||
|
false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -313,6 +313,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
{
|
{
|
||||||
int taskNumParams = numParams;
|
int taskNumParams = numParams;
|
||||||
Oid *taskParameterTypes = parameterTypes;
|
Oid *taskParameterTypes = parameterTypes;
|
||||||
|
int taskType = GetTaskQueryType(task);
|
||||||
|
|
||||||
if (task->parametersInQueryStringResolved)
|
if (task->parametersInQueryStringResolved)
|
||||||
{
|
{
|
||||||
|
@ -330,7 +331,7 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
* for concatenated strings, we set queryStringList so that we can access
|
* for concatenated strings, we set queryStringList so that we can access
|
||||||
* each query string.
|
* each query string.
|
||||||
*/
|
*/
|
||||||
if (GetTaskQueryType(task) == TASK_QUERY_TEXT_LIST)
|
if (taskType == TASK_QUERY_TEXT_LIST)
|
||||||
{
|
{
|
||||||
List *queryStringList = task->taskQuery.data.queryStringList;
|
List *queryStringList = task->taskQuery.data.queryStringList;
|
||||||
totalRowsProcessed +=
|
totalRowsProcessed +=
|
||||||
|
@ -342,22 +343,31 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
if (taskType != TASK_QUERY_LOCAL_PLAN)
|
||||||
taskParameterTypes,
|
{
|
||||||
taskNumParams);
|
Query *shardQuery = ParseQueryString(TaskQueryString(task),
|
||||||
|
taskParameterTypes,
|
||||||
|
taskNumParams);
|
||||||
|
|
||||||
|
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||||
|
|
||||||
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
/*
|
||||||
|
* Altough the shardQuery is local to this node, we prefer planner()
|
||||||
/*
|
* over standard_planner(). The primary reason for that is Citus itself
|
||||||
* Altough the shardQuery is local to this node, we prefer planner()
|
* is not very tolarent standard_planner() calls that doesn't go through
|
||||||
* over standard_planner(). The primary reason for that is Citus itself
|
* distributed_planner() because of the way that restriction hooks are
|
||||||
* is not very tolarent standard_planner() calls that doesn't go through
|
* implemented. So, let planner to call distributed_planner() which
|
||||||
* distributed_planner() because of the way that restriction hooks are
|
* eventually calls standard_planner().
|
||||||
* implemented. So, let planner to call distributed_planner() which
|
*/
|
||||||
* eventually calls standard_planner().
|
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo);
|
||||||
*/
|
}
|
||||||
localPlan = planner(shardQuery, NULL, cursorOptions, paramListInfo);
|
else
|
||||||
|
{
|
||||||
|
ereport(DEBUG2, (errmsg(
|
||||||
|
"Local executor: Using task's cached local plan for task %u",
|
||||||
|
task->taskId)));
|
||||||
|
localPlan = TaskQueryLocalPlan(task);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char *shardQueryString = NULL;
|
char *shardQueryString = NULL;
|
||||||
|
|
|
@ -439,6 +439,27 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void
|
||||||
|
SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan)
|
||||||
|
{
|
||||||
|
Assert(localPlan != NULL);
|
||||||
|
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
|
||||||
|
task->taskQuery.data.localCompiled = (LocalCompilation *) palloc0(
|
||||||
|
sizeof(LocalCompilation));
|
||||||
|
task->taskQuery.data.localCompiled->query = query;
|
||||||
|
task->taskQuery.data.localCompiled->plan = localPlan;
|
||||||
|
task->queryCount = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
PlannedStmt *
|
||||||
|
TaskQueryLocalPlan(Task *task)
|
||||||
|
{
|
||||||
|
Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN);
|
||||||
|
return task->taskQuery.data.localCompiled->plan;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeparseTaskQuery is a general way of deparsing a query based on a task.
|
* DeparseTaskQuery is a general way of deparsing a query based on a task.
|
||||||
*/
|
*/
|
||||||
|
@ -524,6 +545,26 @@ TaskQueryString(Task *task)
|
||||||
{
|
{
|
||||||
return task->taskQuery.data.queryStringLazy;
|
return task->taskQuery.data.queryStringLazy;
|
||||||
}
|
}
|
||||||
|
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
|
||||||
|
{
|
||||||
|
Query *query = task->taskQuery.data.localCompiled->query;
|
||||||
|
Assert(query != NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Use the query of the local compilation to generate the
|
||||||
|
* query string. For local compiled tasks, the query is retained
|
||||||
|
* for this purpose, which may be EXPLAIN ANALYZing the task, or
|
||||||
|
* command logging. Generating the query string on the fly is
|
||||||
|
* acceptable because the plan of the local compilation is used
|
||||||
|
* for query execution.
|
||||||
|
*/
|
||||||
|
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
|
||||||
|
query));
|
||||||
|
UpdateRelationToShardNames((Node *) query, task->relationShardList);
|
||||||
|
MemoryContextSwitchTo(previousContext);
|
||||||
|
return AnnotateQuery(DeparseTaskQuery(task, query),
|
||||||
|
task->partitionKeyValue, task->colocationId);
|
||||||
|
}
|
||||||
|
|
||||||
Query *jobQueryReferenceForLazyDeparsing =
|
Query *jobQueryReferenceForLazyDeparsing =
|
||||||
task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
|
task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
|
||||||
|
|
|
@ -135,13 +135,13 @@ static void AdjustReadIntermediateResultsCostInternal(RelOptInfo *relOptInfo,
|
||||||
Const *resultFormatConst);
|
Const *resultFormatConst);
|
||||||
static List * OuterPlanParamsList(PlannerInfo *root);
|
static List * OuterPlanParamsList(PlannerInfo *root);
|
||||||
static List * CopyPlanParamList(List *originalPlanParamList);
|
static List * CopyPlanParamList(List *originalPlanParamList);
|
||||||
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(void);
|
static PlannerRestrictionContext * CreateAndPushPlannerRestrictionContext(
|
||||||
|
FastPathRestrictionContext *fastPathContext);
|
||||||
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
|
static PlannerRestrictionContext * CurrentPlannerRestrictionContext(void);
|
||||||
static void PopPlannerRestrictionContext(void);
|
static void PopPlannerRestrictionContext(void);
|
||||||
static void ResetPlannerRestrictionContext(
|
static void ResetPlannerRestrictionContext(
|
||||||
PlannerRestrictionContext *plannerRestrictionContext);
|
PlannerRestrictionContext *plannerRestrictionContext);
|
||||||
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *planContext);
|
||||||
Node *distributionKeyValue);
|
|
||||||
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
|
||||||
int rteIdCounter);
|
int rteIdCounter);
|
||||||
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
static RTEListProperties * GetRTEListProperties(List *rangeTableList);
|
||||||
|
@ -166,7 +166,7 @@ distributed_planner(Query *parse,
|
||||||
{
|
{
|
||||||
bool needsDistributedPlanning = false;
|
bool needsDistributedPlanning = false;
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
Node *distributionKeyValue = NULL;
|
FastPathRestrictionContext fastPathContext = { 0 };
|
||||||
|
|
||||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||||
|
|
||||||
|
@ -191,8 +191,7 @@ distributed_planner(Query *parse,
|
||||||
&maybeHasForeignDistributedTable);
|
&maybeHasForeignDistributedTable);
|
||||||
if (needsDistributedPlanning)
|
if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue);
|
fastPathRouterQuery = FastPathRouterQuery(parse, &fastPathContext);
|
||||||
|
|
||||||
if (maybeHasForeignDistributedTable)
|
if (maybeHasForeignDistributedTable)
|
||||||
{
|
{
|
||||||
WarnIfListHasForeignDistributedTable(rangeTableList);
|
WarnIfListHasForeignDistributedTable(rangeTableList);
|
||||||
|
@ -247,8 +246,9 @@ distributed_planner(Query *parse,
|
||||||
*/
|
*/
|
||||||
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
|
HideCitusDependentObjectsOnQueriesOfPgMetaTables((Node *) parse, NULL);
|
||||||
|
|
||||||
/* create a restriction context and put it at the end if context list */
|
/* create a restriction context and put it at the end of context list */
|
||||||
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext();
|
planContext.plannerRestrictionContext = CreateAndPushPlannerRestrictionContext(
|
||||||
|
&fastPathContext);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We keep track of how many times we've recursed into the planner, primarily
|
* We keep track of how many times we've recursed into the planner, primarily
|
||||||
|
@ -264,7 +264,7 @@ distributed_planner(Query *parse,
|
||||||
{
|
{
|
||||||
if (fastPathRouterQuery)
|
if (fastPathRouterQuery)
|
||||||
{
|
{
|
||||||
result = PlanFastPathDistributedStmt(&planContext, distributionKeyValue);
|
result = PlanFastPathDistributedStmt(&planContext);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -649,30 +649,21 @@ IsMultiTaskPlan(DistributedPlan *distributedPlan)
|
||||||
* the FastPathPlanner.
|
* the FastPathPlanner.
|
||||||
*/
|
*/
|
||||||
static PlannedStmt *
|
static PlannedStmt *
|
||||||
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext,
|
PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
|
||||||
Node *distributionKeyValue)
|
|
||||||
{
|
{
|
||||||
FastPathRestrictionContext *fastPathContext =
|
FastPathRestrictionContext *fastPathContext =
|
||||||
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
||||||
|
Assert(fastPathContext != NULL);
|
||||||
|
Assert(fastPathContext->fastPathRouterQuery);
|
||||||
|
|
||||||
planContext->plannerRestrictionContext->fastPathRestrictionContext->
|
FastPathPreprocessParseTree(planContext->query);
|
||||||
fastPathRouterQuery = true;
|
|
||||||
|
|
||||||
if (distributionKeyValue == NULL)
|
if (!fastPathContext->delayFastPathPlanning)
|
||||||
{
|
{
|
||||||
/* nothing to record */
|
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||||
|
planContext->query,
|
||||||
|
planContext->boundParams);
|
||||||
}
|
}
|
||||||
else if (IsA(distributionKeyValue, Const))
|
|
||||||
{
|
|
||||||
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
|
||||||
}
|
|
||||||
else if (IsA(distributionKeyValue, Param))
|
|
||||||
{
|
|
||||||
fastPathContext->distributionKeyHasParam = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query,
|
|
||||||
planContext->boundParams);
|
|
||||||
|
|
||||||
return CreateDistributedPlannedStmt(planContext);
|
return CreateDistributedPlannedStmt(planContext);
|
||||||
}
|
}
|
||||||
|
@ -803,6 +794,8 @@ CreateDistributedPlannedStmt(DistributedPlanningContext *planContext)
|
||||||
RaiseDeferredError(distributedPlan->planningError, ERROR);
|
RaiseDeferredError(distributedPlan->planningError, ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CheckAndBuildDelayedFastPathPlan(planContext, distributedPlan);
|
||||||
|
|
||||||
/* remember the plan's identifier for identifying subplans */
|
/* remember the plan's identifier for identifying subplans */
|
||||||
distributedPlan->planId = planId;
|
distributedPlan->planId = planId;
|
||||||
|
|
||||||
|
@ -2407,13 +2400,15 @@ CopyPlanParamList(List *originalPlanParamList)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateAndPushPlannerRestrictionContext creates a new relation restriction context
|
* CreateAndPushPlannerRestrictionContext creates a new planner restriction
|
||||||
* and a new join context, inserts it to the beginning of the
|
* context with an empty relation restriction context and an empty join and
|
||||||
* plannerRestrictionContextList. Finally, the planner restriction context is
|
* a copy of the given fast path restriction context (if present). Finally,
|
||||||
* inserted to the beginning of the plannerRestrictionContextList and it is returned.
|
* the planner restriction context is inserted to the beginning of the
|
||||||
|
* global plannerRestrictionContextList and it is returned.
|
||||||
*/
|
*/
|
||||||
static PlannerRestrictionContext *
|
static PlannerRestrictionContext *
|
||||||
CreateAndPushPlannerRestrictionContext(void)
|
CreateAndPushPlannerRestrictionContext(
|
||||||
|
FastPathRestrictionContext *fastPathRestrictionContext)
|
||||||
{
|
{
|
||||||
PlannerRestrictionContext *plannerRestrictionContext =
|
PlannerRestrictionContext *plannerRestrictionContext =
|
||||||
palloc0(sizeof(PlannerRestrictionContext));
|
palloc0(sizeof(PlannerRestrictionContext));
|
||||||
|
@ -2427,6 +2422,21 @@ CreateAndPushPlannerRestrictionContext(void)
|
||||||
plannerRestrictionContext->fastPathRestrictionContext =
|
plannerRestrictionContext->fastPathRestrictionContext =
|
||||||
palloc0(sizeof(FastPathRestrictionContext));
|
palloc0(sizeof(FastPathRestrictionContext));
|
||||||
|
|
||||||
|
if (fastPathRestrictionContext != NULL)
|
||||||
|
{
|
||||||
|
/* copy the given fast path restriction context */
|
||||||
|
FastPathRestrictionContext *plannersFastPathCtx =
|
||||||
|
plannerRestrictionContext->fastPathRestrictionContext;
|
||||||
|
plannersFastPathCtx->fastPathRouterQuery =
|
||||||
|
fastPathRestrictionContext->fastPathRouterQuery;
|
||||||
|
plannersFastPathCtx->distributionKeyValue =
|
||||||
|
fastPathRestrictionContext->distributionKeyValue;
|
||||||
|
plannersFastPathCtx->distributionKeyHasParam =
|
||||||
|
fastPathRestrictionContext->distributionKeyHasParam;
|
||||||
|
plannersFastPathCtx->delayFastPathPlanning =
|
||||||
|
fastPathRestrictionContext->delayFastPathPlanning;
|
||||||
|
}
|
||||||
|
|
||||||
plannerRestrictionContext->memoryContext = CurrentMemoryContext;
|
plannerRestrictionContext->memoryContext = CurrentMemoryContext;
|
||||||
|
|
||||||
/* we'll apply logical AND as we add tables */
|
/* we'll apply logical AND as we add tables */
|
||||||
|
|
|
@ -43,6 +43,7 @@
|
||||||
|
|
||||||
#include "pg_version_constants.h"
|
#include "pg_version_constants.h"
|
||||||
|
|
||||||
|
#include "distributed/citus_clauses.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/insert_select_planner.h"
|
#include "distributed/insert_select_planner.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
@ -53,6 +54,7 @@
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
|
||||||
bool EnableFastPathRouterPlanner = true;
|
bool EnableFastPathRouterPlanner = true;
|
||||||
|
bool EnableFastPathLocalExecutor = true;
|
||||||
|
|
||||||
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
|
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
|
||||||
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
|
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
|
||||||
|
@ -61,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
||||||
Var *column,
|
Var *column,
|
||||||
Node **distributionKeyValue);
|
Node **distributionKeyValue);
|
||||||
|
|
||||||
|
void
|
||||||
|
FastPathPreprocessParseTree(Query *parse)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Citus planner relies on some of the transformations on constant
|
||||||
|
* evaluation on the parse tree.
|
||||||
|
*/
|
||||||
|
parse->targetList =
|
||||||
|
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
||||||
|
parse->jointree->quals =
|
||||||
|
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FastPathPlanner is intended to be used instead of standard_planner() for trivial
|
* FastPathPlanner is intended to be used instead of standard_planner() for trivial
|
||||||
|
@ -73,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
|
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
|
||||||
{
|
{
|
||||||
/*
|
|
||||||
* Citus planner relies on some of the transformations on constant
|
|
||||||
* evaluation on the parse tree.
|
|
||||||
*/
|
|
||||||
parse->targetList =
|
|
||||||
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
|
||||||
parse->jointree->quals =
|
|
||||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
|
||||||
|
|
||||||
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -112,10 +118,6 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
Plan *plan = &scanNode->plan;
|
Plan *plan = &scanNode->plan;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
|
|
||||||
|
|
||||||
Assert(FastPathRouterQuery(parse, &distKey));
|
|
||||||
|
|
||||||
/* there is only a single relation rte */
|
/* there is only a single relation rte */
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_16
|
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||||
scanNode->scan.scanrelid = 1;
|
scanNode->scan.scanrelid = 1;
|
||||||
|
@ -150,26 +152,78 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
InitializeFastPathContext(FastPathRestrictionContext *fastPathContext,
|
||||||
|
Node *distributionKeyValue,
|
||||||
|
bool canAvoidDeparse,
|
||||||
|
Query *query)
|
||||||
|
{
|
||||||
|
Assert(fastPathContext != NULL);
|
||||||
|
Assert(!fastPathContext->fastPathRouterQuery);
|
||||||
|
Assert(!fastPathContext->delayFastPathPlanning);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We're looking at a fast path query, so we can fill the
|
||||||
|
* fastPathContext with relevant details.
|
||||||
|
*/
|
||||||
|
fastPathContext->fastPathRouterQuery = true;
|
||||||
|
if (distributionKeyValue == NULL)
|
||||||
|
{
|
||||||
|
/* nothing to record */
|
||||||
|
}
|
||||||
|
else if (IsA(distributionKeyValue, Const))
|
||||||
|
{
|
||||||
|
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
||||||
|
}
|
||||||
|
else if (IsA(distributionKeyValue, Param))
|
||||||
|
{
|
||||||
|
fastPathContext->distributionKeyHasParam = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (EnableFastPathLocalExecutor)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This fast path query may be executed by the local executor.
|
||||||
|
* We need to delay the fast path planning until we know if the
|
||||||
|
* shard is local or not. Make a final check for volatile
|
||||||
|
* functions in the query tree to determine if we should delay
|
||||||
|
* the fast path planning.
|
||||||
|
*/
|
||||||
|
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
|
||||||
|
!FindNodeMatchingCheckFunction(
|
||||||
|
(Node *) query,
|
||||||
|
CitusIsVolatileFunction);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FastPathRouterQuery gets a query and returns true if the query is eligible for
|
* FastPathRouterQuery gets a query and returns true if the query is eligible for
|
||||||
* being a fast path router query.
|
* being a fast path router query. It also fills the given fastPathContext with
|
||||||
|
* details about the query such as the distribution key value (if available),
|
||||||
|
* whether the distribution key is a parameter, and the range table entry for the
|
||||||
|
* table being queried.
|
||||||
* The requirements for the fast path query can be listed below:
|
* The requirements for the fast path query can be listed below:
|
||||||
*
|
*
|
||||||
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
|
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
|
||||||
* - The query should touch only a single hash distributed or reference table
|
* - The query should touch only a single hash distributed or reference table
|
||||||
* - The distribution with equality operator should be in the WHERE clause
|
* - The distribution with equality operator should be in the WHERE clause
|
||||||
* and it should be ANDed with any other filters. Also, the distribution
|
* and it should be ANDed with any other filters. Also, the distribution
|
||||||
* key should only exists once in the WHERE clause. So basically,
|
* key should only exist once in the WHERE clause. So basically,
|
||||||
* SELECT ... FROM dist_table WHERE dist_key = X
|
* SELECT ... FROM dist_table WHERE dist_key = X
|
||||||
* If the filter is a const, distributionKeyValue is set
|
* If the filter is a const, distributionKeyValue is set
|
||||||
* - All INSERT statements (including multi-row INSERTs) as long as the commands
|
* - All INSERT statements (including multi-row INSERTs) as long as the commands
|
||||||
* don't have any sublinks/CTEs etc
|
* don't have any sublinks/CTEs etc
|
||||||
|
* -
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
||||||
{
|
{
|
||||||
FromExpr *joinTree = query->jointree;
|
FromExpr *joinTree = query->jointree;
|
||||||
Node *quals = NULL;
|
Node *quals = NULL;
|
||||||
|
bool isFastPath = false;
|
||||||
|
bool canAvoidDeparse = false;
|
||||||
|
Node *distributionKeyValue = NULL;
|
||||||
|
|
||||||
if (!EnableFastPathRouterPlanner)
|
if (!EnableFastPathRouterPlanner)
|
||||||
{
|
{
|
||||||
|
@ -201,6 +255,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
||||||
else if (query->commandType == CMD_INSERT)
|
else if (query->commandType == CMD_INSERT)
|
||||||
{
|
{
|
||||||
/* we don't need to do any further checks, all INSERTs are fast-path */
|
/* we don't need to do any further checks, all INSERTs are fast-path */
|
||||||
|
InitializeFastPathContext(fastPathContext, NULL, true, query);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,45 +287,55 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
||||||
Var *distributionKey = PartitionColumn(distributedTableId, 1);
|
Var *distributionKey = PartitionColumn(distributedTableId, 1);
|
||||||
if (!distributionKey)
|
if (!distributionKey)
|
||||||
{
|
{
|
||||||
return true;
|
/* Local execution may avoid a deparse on single shard distributed tables */
|
||||||
|
canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry,
|
||||||
|
SINGLE_SHARD_DISTRIBUTED);
|
||||||
|
isFastPath = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* WHERE clause should not be empty for distributed tables */
|
if (!isFastPath)
|
||||||
if (joinTree == NULL ||
|
|
||||||
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
|
|
||||||
NULL))
|
|
||||||
{
|
{
|
||||||
return false;
|
canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE);
|
||||||
|
|
||||||
|
if (joinTree == NULL ||
|
||||||
|
(joinTree->quals == NULL && !canAvoidDeparse))
|
||||||
|
{
|
||||||
|
/* no quals, not a fast path query */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
quals = joinTree->quals;
|
||||||
|
if (quals != NULL && IsA(quals, List))
|
||||||
|
{
|
||||||
|
quals = (Node *) make_ands_explicit((List *) quals);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Distribution column must be used in a simple equality match check and it must be
|
||||||
|
* place at top level conjunction operator. In simple words, we should have
|
||||||
|
* WHERE dist_key = VALUE [AND ....];
|
||||||
|
*
|
||||||
|
* We're also not allowing any other appearances of the distribution key in the quals.
|
||||||
|
*
|
||||||
|
* Overall the logic might sound fuzzy since it involves two individual checks:
|
||||||
|
* (a) Check for top level AND operator with one side being "dist_key = const"
|
||||||
|
* (b) Only allow single appearance of "dist_key" in the quals
|
||||||
|
*
|
||||||
|
* This is to simplify both of the individual checks and omit various edge cases
|
||||||
|
* that might arise with multiple distribution keys in the quals.
|
||||||
|
*/
|
||||||
|
isFastPath = (ConjunctionContainsColumnFilter(quals, distributionKey,
|
||||||
|
&distributionKeyValue) &&
|
||||||
|
!ColumnAppearsMultipleTimes(quals, distributionKey));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* convert list of expressions into expression tree for further processing */
|
if (isFastPath)
|
||||||
quals = joinTree->quals;
|
|
||||||
if (quals != NULL && IsA(quals, List))
|
|
||||||
{
|
{
|
||||||
quals = (Node *) make_ands_explicit((List *) quals);
|
InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse,
|
||||||
|
query);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
return isFastPath;
|
||||||
* Distribution column must be used in a simple equality match check and it must be
|
|
||||||
* place at top level conjunction operator. In simple words, we should have
|
|
||||||
* WHERE dist_key = VALUE [AND ....];
|
|
||||||
*
|
|
||||||
* We're also not allowing any other appearances of the distribution key in the quals.
|
|
||||||
*
|
|
||||||
* Overall the logic might sound fuzzy since it involves two individual checks:
|
|
||||||
* (a) Check for top level AND operator with one side being "dist_key = const"
|
|
||||||
* (b) Only allow single appearance of "dist_key" in the quals
|
|
||||||
*
|
|
||||||
* This is to simplify both of the individual checks and omit various edge cases
|
|
||||||
* that might arise with multiple distribution keys in the quals.
|
|
||||||
*/
|
|
||||||
if (ConjunctionContainsColumnFilter(quals, distributionKey, distributionKeyValue) &&
|
|
||||||
!ColumnAppearsMultipleTimes(quals, distributionKey))
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
|
||||||
#include "access/stratnum.h"
|
#include "access/stratnum.h"
|
||||||
|
#include "access/tupdesc.h"
|
||||||
|
#include "access/tupdesc_details.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/pg_opfamily.h"
|
#include "catalog/pg_opfamily.h"
|
||||||
#include "catalog/pg_proc.h"
|
#include "catalog/pg_proc.h"
|
||||||
|
@ -34,6 +36,7 @@
|
||||||
#include "optimizer/pathnode.h"
|
#include "optimizer/pathnode.h"
|
||||||
#include "optimizer/paths.h"
|
#include "optimizer/paths.h"
|
||||||
#include "optimizer/planmain.h"
|
#include "optimizer/planmain.h"
|
||||||
|
#include "optimizer/planner.h"
|
||||||
#include "optimizer/restrictinfo.h"
|
#include "optimizer/restrictinfo.h"
|
||||||
#include "parser/parse_oper.h"
|
#include "parser/parse_oper.h"
|
||||||
#include "parser/parsetree.h"
|
#include "parser/parsetree.h"
|
||||||
|
@ -81,6 +84,7 @@
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
|
#include "distributed/shard_utils.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
|
||||||
/* intermediate value for INSERT processing */
|
/* intermediate value for INSERT processing */
|
||||||
|
@ -164,7 +168,7 @@ static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId, bool parametersInQueryResolved,
|
uint64 shardId, bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification, Const *partitionKeyValue,
|
bool isLocalTableModification, Const *partitionKeyValue,
|
||||||
int colocationId);
|
int colocationId, bool delayedFastPath);
|
||||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
TaskAssignmentPolicyType
|
TaskAssignmentPolicyType
|
||||||
|
@ -173,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
||||||
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
||||||
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
||||||
|
static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||||
|
@ -1940,7 +1944,9 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
{
|
{
|
||||||
GenerateSingleShardRouterTaskList(job, relationShardList,
|
GenerateSingleShardRouterTaskList(job, relationShardList,
|
||||||
placementList, shardId,
|
placementList, shardId,
|
||||||
isLocalTableModification);
|
isLocalTableModification,
|
||||||
|
fastPathRestrictionContext->
|
||||||
|
delayFastPathPlanning);
|
||||||
}
|
}
|
||||||
|
|
||||||
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
job->requiresCoordinatorEvaluation = requiresCoordinatorEvaluation;
|
||||||
|
@ -1948,6 +1954,258 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CheckAttributesMatch checks if the attributes of the Citus table and the shard
|
||||||
|
* table match.
|
||||||
|
*
|
||||||
|
* It is used to ensure that the shard table has the same schema as the Citus
|
||||||
|
* table before replacing the Citus table OID with the shard table OID in the
|
||||||
|
* parse tree we (Citus planner) recieved from Postgres.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
bool
|
||||||
|
CheckAttributesMatch(Oid citusTableId, Oid shardTableId)
|
||||||
|
{
|
||||||
|
Relation citusR, shardR;
|
||||||
|
bool same_schema = false;
|
||||||
|
|
||||||
|
citusR = RelationIdGetRelation(citusTableId);
|
||||||
|
shardR = RelationIdGetRelation(shardTableId);
|
||||||
|
|
||||||
|
if (RelationIsValid(citusR) && RelationIsValid(shardR))
|
||||||
|
{
|
||||||
|
TupleDesc citusTupDesc = citusR->rd_att;
|
||||||
|
TupleDesc shardTupDesc = shardR->rd_att;
|
||||||
|
|
||||||
|
if (citusTupDesc->natts == shardTupDesc->natts)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Do an attribute-by-attribute comparison. This is borrowed from
|
||||||
|
* the Postgres function equalTupleDescs(), which we cannot use
|
||||||
|
* because the citus table and shard table have different composite
|
||||||
|
* types.
|
||||||
|
*/
|
||||||
|
same_schema = true;
|
||||||
|
for (int i = 0; i < citusTupDesc->natts && same_schema; i++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i);
|
||||||
|
Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i);
|
||||||
|
|
||||||
|
if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
|
||||||
|
{
|
||||||
|
same_schema = false;
|
||||||
|
}
|
||||||
|
if (attr1->atttypid != attr2->atttypid)
|
||||||
|
{
|
||||||
|
same_schema = false;
|
||||||
|
}
|
||||||
|
if (attr1->atttypmod != attr2->atttypmod)
|
||||||
|
{
|
||||||
|
same_schema = false;
|
||||||
|
}
|
||||||
|
if (attr1->attcollation != attr2->attcollation)
|
||||||
|
{
|
||||||
|
same_schema = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Record types derived from tables could have dropped fields. */
|
||||||
|
if (attr1->attisdropped != attr2->attisdropped)
|
||||||
|
{
|
||||||
|
same_schema = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
RelationClose(citusR);
|
||||||
|
RelationClose(shardR);
|
||||||
|
return same_schema;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CheckAndBuildDelayedFastPathPlan() - if the query being planned is a fast
|
||||||
|
* path query, not marked for deferred pruning and the placement for the task
|
||||||
|
* is not a dummy placement then if the placement is local to this node we can
|
||||||
|
* take a shortcut of replacing the OID of the citus table with the OID of the
|
||||||
|
* shard in the query tree and plan that directly, instead of deparsing the
|
||||||
|
* parse tree to a SQL query on the shard and parsing and planning that in
|
||||||
|
* the local executor. Instead, the local executor can use the plan created
|
||||||
|
* here.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
||||||
|
DistributedPlan *plan)
|
||||||
|
{
|
||||||
|
FastPathRestrictionContext *fastPathContext =
|
||||||
|
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
||||||
|
|
||||||
|
if (!fastPathContext->delayFastPathPlanning)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Job *job = plan->workerJob;
|
||||||
|
Assert(job != NULL);
|
||||||
|
|
||||||
|
if (job->deferredPruning)
|
||||||
|
{
|
||||||
|
/* Execution time pruning => don't know which shard at this point */
|
||||||
|
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||||
|
planContext->query,
|
||||||
|
planContext->boundParams);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List *tasks = job->taskList;
|
||||||
|
Assert(list_length(tasks) == 1);
|
||||||
|
Task *task = (Task *) linitial(tasks);
|
||||||
|
List *placements = task->taskPlacementList;
|
||||||
|
Assert(list_length(placements) > 0);
|
||||||
|
int32 localGroupId = GetLocalGroupId();
|
||||||
|
ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements);
|
||||||
|
|
||||||
|
bool isLocalExecution = !IsDummyPlacement(primaryPlacement) &&
|
||||||
|
(primaryPlacement->groupId == localGroupId);
|
||||||
|
bool canBuildLocalPlan = true;
|
||||||
|
|
||||||
|
if (isLocalExecution)
|
||||||
|
{
|
||||||
|
List *relationShards = task->relationShardList;
|
||||||
|
Assert(list_length(relationShards) == 1);
|
||||||
|
RelationShard *relationShard = (RelationShard *) linitial(relationShards);
|
||||||
|
Assert(relationShard->shardId == primaryPlacement->shardId);
|
||||||
|
|
||||||
|
canBuildLocalPlan = ConvertToQueryOnShard(planContext->query,
|
||||||
|
relationShard->relationId,
|
||||||
|
relationShard->shardId);
|
||||||
|
if (canBuildLocalPlan)
|
||||||
|
{
|
||||||
|
/* Plan the query with the new shard relation id */
|
||||||
|
planContext->plan = standard_planner(planContext->query, NULL,
|
||||||
|
planContext->cursorOptions,
|
||||||
|
planContext->boundParams);
|
||||||
|
SetTaskQueryPlan(task, job->jobQuery, planContext->plan);
|
||||||
|
|
||||||
|
ereport(DEBUG2, (errmsg(
|
||||||
|
"Fast-path router query: created local execution plan "
|
||||||
|
"to avoid deparse to and compile of shard query")));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Either the shard is not local to this node, or it was not safe to replace
|
||||||
|
* the OIDs in the parse tree; in any case we fall back to generating the shard
|
||||||
|
* query and compiling that.
|
||||||
|
*/
|
||||||
|
Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan));
|
||||||
|
|
||||||
|
/* Fall back to fast path planner and generating SQL query on the shard */
|
||||||
|
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||||
|
planContext->query,
|
||||||
|
planContext->boundParams);
|
||||||
|
UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList);
|
||||||
|
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ConvertToQueryOnShard() converts the given query on a citus table (identified by
|
||||||
|
* citusTableOid) to a query on a shard (identified by shardId).
|
||||||
|
*
|
||||||
|
* The function assumes that the query is a "fast path" query - it has only one
|
||||||
|
* RangeTblEntry and one RTEPermissionInfo.
|
||||||
|
*
|
||||||
|
* It acquires the same lock on the shard that was acquired on the citus table
|
||||||
|
* by the Postgres parser. It checks that the attribute numbers and metadata of
|
||||||
|
* the shard table and citus table are identical - otherwise it is not safe
|
||||||
|
* to proceed with this shortcut. Assuming the attributes do match, the actual
|
||||||
|
* conversion involves changing the target list entries that reference the
|
||||||
|
* citus table's oid to reference the shard's relation id instead. Finally,
|
||||||
|
* it changes the RangeTblEntry's relid to the shard's relation id and (PG16+)
|
||||||
|
* changes the RTEPermissionInfo's relid to the shard's relation id also.
|
||||||
|
* At this point the Query is ready for the postgres planner.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
||||||
|
{
|
||||||
|
Assert(list_length(query->rtable) == 1);
|
||||||
|
RangeTblEntry *citusTableRte = (RangeTblEntry *) linitial(query->rtable);
|
||||||
|
Assert(citusTableRte->relid == citusTableOid);
|
||||||
|
Assert(list_length(query->rteperminfos) == 1);
|
||||||
|
|
||||||
|
const char *citusTableName = get_rel_name(citusTableOid);
|
||||||
|
Assert(citusTableName != NULL);
|
||||||
|
|
||||||
|
/* construct shard relation name */
|
||||||
|
char *shardRelationName = pstrdup(citusTableName);
|
||||||
|
AppendShardIdToName(&shardRelationName, shardId);
|
||||||
|
|
||||||
|
/* construct the schema name */
|
||||||
|
char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid));
|
||||||
|
|
||||||
|
/* now construct a range variable for the shard */
|
||||||
|
RangeVar shardRangeVar = {
|
||||||
|
.relname = shardRelationName,
|
||||||
|
.schemaname = schemaName,
|
||||||
|
.inh = citusTableRte->inh,
|
||||||
|
.relpersistence = RELPERSISTENCE_PERMANENT,
|
||||||
|
};
|
||||||
|
|
||||||
|
/* Must apply the same lock to the shard that was applied to the citus table */
|
||||||
|
Oid shardRelationId = RangeVarGetRelidExtended(&shardRangeVar,
|
||||||
|
citusTableRte->rellockmode,
|
||||||
|
0, NULL, NULL); /* todo - use suitable callback for perms check? */
|
||||||
|
|
||||||
|
/* Verify that the attributes of citus table and shard table match */
|
||||||
|
if (!CheckAttributesMatch(citusTableOid, shardRelationId))
|
||||||
|
{
|
||||||
|
/* There is a difference between the attributes of the citus
|
||||||
|
* table and the shard table. This can happen if there is a DROP
|
||||||
|
* COLUMN on the citus table. In this case, we cannot
|
||||||
|
* convert the query to a shard query, so clean up and return.
|
||||||
|
*/
|
||||||
|
UnlockRelationOid(shardRelationId, citusTableRte->rellockmode);
|
||||||
|
ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg(
|
||||||
|
"Router planner fast path cannot modify parse tree for local execution: shard table \"%s.%s\" does not match the "
|
||||||
|
"distributed table \"%s.%s\"",
|
||||||
|
schemaName, shardRelationName, schemaName,
|
||||||
|
citusTableName)));
|
||||||
|
pfree(shardRelationName);
|
||||||
|
pfree(schemaName);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Change the target list entries that reference the original citus table's relation id */
|
||||||
|
ListCell *lc = NULL;
|
||||||
|
foreach(lc, query->targetList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(lc);
|
||||||
|
if (targetEntry->resorigtbl == citusTableOid)
|
||||||
|
{
|
||||||
|
targetEntry->resorigtbl = shardRelationId;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Change the range table entry's oid to that of the shard's */
|
||||||
|
Assert(shardRelationId != InvalidOid);
|
||||||
|
citusTableRte->relid = shardRelationId;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_16
|
||||||
|
|
||||||
|
/* Change the range table permission oid to that of the shard's (PG16+) */
|
||||||
|
Assert(list_length(query->rteperminfos) == 1);
|
||||||
|
RTEPermissionInfo *rtePermInfo = (RTEPermissionInfo *) linitial(query->rteperminfos);
|
||||||
|
rtePermInfo->relid = shardRelationId;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* GenerateSingleShardRouterTaskList is a wrapper around other corresponding task
|
* GenerateSingleShardRouterTaskList is a wrapper around other corresponding task
|
||||||
* list generation functions specific to single shard selects and modifications.
|
* list generation functions specific to single shard selects and modifications.
|
||||||
|
@ -1957,7 +2215,7 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
||||||
void
|
void
|
||||||
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
List *placementList, uint64 shardId, bool
|
List *placementList, uint64 shardId, bool
|
||||||
isLocalTableModification)
|
isLocalTableModification, bool delayedFastPath)
|
||||||
{
|
{
|
||||||
Query *originalQuery = job->jobQuery;
|
Query *originalQuery = job->jobQuery;
|
||||||
|
|
||||||
|
@ -1970,7 +2228,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
isLocalTableModification,
|
isLocalTableModification,
|
||||||
job->partitionKeyValue, job->colocationId);
|
job->partitionKeyValue, job->colocationId,
|
||||||
|
delayedFastPath);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||||
|
@ -2001,7 +2260,8 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
isLocalTableModification,
|
isLocalTableModification,
|
||||||
job->partitionKeyValue, job->colocationId);
|
job->partitionKeyValue, job->colocationId,
|
||||||
|
delayedFastPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2096,7 +2356,7 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId,
|
List *placementList, uint64 shardId,
|
||||||
bool parametersInQueryResolved,
|
bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification, Const *partitionKeyValue,
|
bool isLocalTableModification, Const *partitionKeyValue,
|
||||||
int colocationId)
|
int colocationId, bool delayedFastPath)
|
||||||
{
|
{
|
||||||
TaskType taskType = READ_TASK;
|
TaskType taskType = READ_TASK;
|
||||||
char replicationModel = 0;
|
char replicationModel = 0;
|
||||||
|
@ -2168,7 +2428,10 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
task->taskPlacementList = placementList;
|
task->taskPlacementList = placementList;
|
||||||
task->partitionKeyValue = partitionKeyValue;
|
task->partitionKeyValue = partitionKeyValue;
|
||||||
task->colocationId = colocationId;
|
task->colocationId = colocationId;
|
||||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
if (!delayedFastPath)
|
||||||
|
{
|
||||||
|
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||||
|
}
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->jobId = jobId;
|
task->jobId = jobId;
|
||||||
task->relationShardList = relationShardList;
|
task->relationShardList = relationShardList;
|
||||||
|
@ -2449,10 +2712,15 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If this is an UPDATE or DELETE query which requires coordinator evaluation,
|
* If this is an UPDATE or DELETE query which requires coordinator evaluation,
|
||||||
* don't try update shard names, and postpone that to execution phase.
|
* don't try update shard names, and postpone that to execution phase. Also, if
|
||||||
|
* this is a delayed fast path query, we don't update the shard names
|
||||||
|
* either, as the shard names will be updated in the fast path query planner.
|
||||||
*/
|
*/
|
||||||
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
|
bool isUpdateOrDelete = UpdateOrDeleteOrMergeQuery(originalQuery);
|
||||||
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)))
|
bool delayedFastPath =
|
||||||
|
plannerRestrictionContext->fastPathRestrictionContext->delayFastPathPlanning;
|
||||||
|
if (!(isUpdateOrDelete && RequiresCoordinatorEvaluation(originalQuery)) &&
|
||||||
|
!delayedFastPath)
|
||||||
{
|
{
|
||||||
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
UpdateRelationToShardNames((Node *) originalQuery, *relationShardList);
|
||||||
}
|
}
|
||||||
|
|
|
@ -212,6 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void);
|
||||||
static const char * LocalPoolSizeGucShowHook(void);
|
static const char * LocalPoolSizeGucShowHook(void);
|
||||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||||
source);
|
source);
|
||||||
|
static bool WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source);
|
||||||
static void CitusAuthHook(Port *port, int status);
|
static void CitusAuthHook(Port *port, int status);
|
||||||
static bool IsSuperuser(char *userName);
|
static bool IsSuperuser(char *userName);
|
||||||
static void AdjustDynamicLibraryPathForCdcDecoders(void);
|
static void AdjustDynamicLibraryPathForCdcDecoders(void);
|
||||||
|
@ -1377,6 +1378,17 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_local_execution_local_plan",
|
||||||
|
gettext_noop("Enables the planner to avoid a query deparse and planning if "
|
||||||
|
"the shard is local to the current node."),
|
||||||
|
NULL,
|
||||||
|
&EnableFastPathLocalExecutor,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
|
||||||
|
WarnIfLocalExecutionDisabled, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_local_reference_table_foreign_keys",
|
"citus.enable_local_reference_table_foreign_keys",
|
||||||
gettext_noop("Enables foreign keys from/to local tables"),
|
gettext_noop("Enables foreign keys from/to local tables"),
|
||||||
|
@ -2802,6 +2814,21 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source)
|
||||||
|
{
|
||||||
|
if (*newval == true && EnableLocalExecution == false)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
errmsg(
|
||||||
|
"citus.enable_local_execution must be set in order for "
|
||||||
|
"citus.enable_local_execution_local_plan to be effective.")));
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NoticeIfSubqueryPushdownEnabled prints a notice when a user sets
|
* NoticeIfSubqueryPushdownEnabled prints a notice when a user sets
|
||||||
* citus.subquery_pushdown to ON. It doesn't print the notice if the
|
* citus.subquery_pushdown to ON. It doesn't print the notice if the
|
||||||
|
|
|
@ -287,6 +287,15 @@ CopyTaskQuery(Task *newnode, Task *from)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case TASK_QUERY_LOCAL_PLAN:
|
||||||
|
{
|
||||||
|
newnode->taskQuery.data.localCompiled =
|
||||||
|
(LocalCompilation *) palloc0(sizeof(LocalCompilation));
|
||||||
|
COPY_NODE_FIELD(taskQuery.data.localCompiled->plan);
|
||||||
|
COPY_NODE_FIELD(taskQuery.data.localCompiled->query);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -27,7 +27,9 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
||||||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||||
|
extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan);
|
||||||
extern char * TaskQueryString(Task *task);
|
extern char * TaskQueryString(Task *task);
|
||||||
|
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
|
||||||
extern char * TaskQueryStringAtIndex(Task *task, int index);
|
extern char * TaskQueryStringAtIndex(Task *task, int index);
|
||||||
extern int GetTaskQueryType(Task *task);
|
extern int GetTaskQueryType(Task *task);
|
||||||
extern void AddInsertAliasIfNeeded(Query *query);
|
extern void AddInsertAliasIfNeeded(Query *query);
|
||||||
|
|
|
@ -99,6 +99,12 @@ typedef struct FastPathRestrictionContext
|
||||||
* Set to true when distKey = Param; in the queryTree
|
* Set to true when distKey = Param; in the queryTree
|
||||||
*/
|
*/
|
||||||
bool distributionKeyHasParam;
|
bool distributionKeyHasParam;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Indicates to hold off calling the fast path planner until its
|
||||||
|
* known if the shard is local or not.
|
||||||
|
*/
|
||||||
|
bool delayFastPathPlanning;
|
||||||
} FastPathRestrictionContext;
|
} FastPathRestrictionContext;
|
||||||
|
|
||||||
typedef struct PlannerRestrictionContext
|
typedef struct PlannerRestrictionContext
|
||||||
|
|
|
@ -174,9 +174,16 @@ typedef enum TaskQueryType
|
||||||
TASK_QUERY_NULL,
|
TASK_QUERY_NULL,
|
||||||
TASK_QUERY_TEXT,
|
TASK_QUERY_TEXT,
|
||||||
TASK_QUERY_OBJECT,
|
TASK_QUERY_OBJECT,
|
||||||
TASK_QUERY_TEXT_LIST
|
TASK_QUERY_TEXT_LIST,
|
||||||
|
TASK_QUERY_LOCAL_PLAN,
|
||||||
} TaskQueryType;
|
} TaskQueryType;
|
||||||
|
|
||||||
|
typedef struct LocalCompilation
|
||||||
|
{
|
||||||
|
PlannedStmt *plan; /* the local plan for this task */
|
||||||
|
Query *query; /* query to deparse for EXPLAIN ANALYZE or local command logging */
|
||||||
|
} LocalCompilation;
|
||||||
|
|
||||||
typedef struct TaskQuery
|
typedef struct TaskQuery
|
||||||
{
|
{
|
||||||
TaskQueryType queryType;
|
TaskQueryType queryType;
|
||||||
|
@ -219,6 +226,15 @@ typedef struct TaskQuery
|
||||||
* when we want to access each query string.
|
* when we want to access each query string.
|
||||||
*/
|
*/
|
||||||
List *queryStringList;
|
List *queryStringList;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For tasks that can be executed locally, this field contains the
|
||||||
|
* local plan for the task. This is only set when the shard that the
|
||||||
|
* task is assigned to is local to the node that executes the task.
|
||||||
|
* The query field is used to deparse the query for EXPLAIN ANALYZE
|
||||||
|
* or local command logging.
|
||||||
|
*/
|
||||||
|
LocalCompilation *localCompiled; /* only applies to local tasks */
|
||||||
}data;
|
}data;
|
||||||
}TaskQuery;
|
}TaskQuery;
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@
|
||||||
|
|
||||||
extern bool EnableRouterExecution;
|
extern bool EnableRouterExecution;
|
||||||
extern bool EnableFastPathRouterPlanner;
|
extern bool EnableFastPathRouterPlanner;
|
||||||
|
extern bool EnableFastPathLocalExecutor;
|
||||||
|
|
||||||
extern bool EnableNonColocatedRouterQueryPushdown;
|
extern bool EnableNonColocatedRouterQueryPushdown;
|
||||||
|
|
||||||
|
@ -91,16 +92,19 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
|
||||||
List *relationShardList,
|
List *relationShardList,
|
||||||
List *placementList,
|
List *placementList,
|
||||||
uint64 shardId,
|
uint64 shardId,
|
||||||
bool isLocalTableModification);
|
bool isLocalTableModification,
|
||||||
|
bool delayedFastPath);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FastPathPlanner is a subset of router planner, that's why we prefer to
|
* FastPathPlanner is a subset of router planner, that's why we prefer to
|
||||||
* keep the external function here.
|
* keep the external function here.
|
||||||
*/extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse);
|
*/extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse);
|
||||||
|
|
||||||
|
extern void FastPathPreprocessParseTree(Query *parse);
|
||||||
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
|
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
|
||||||
boundParams);
|
boundParams);
|
||||||
extern bool FastPathRouterQuery(Query *query, Node **distributionKeyValue);
|
extern bool FastPathRouterQuery(Query *query,
|
||||||
|
FastPathRestrictionContext *fastPathContext);
|
||||||
extern bool JoinConditionIsOnFalse(List *relOptInfo);
|
extern bool JoinConditionIsOnFalse(List *relOptInfo);
|
||||||
extern Oid ResultRelationOidForQuery(Query *query);
|
extern Oid ResultRelationOidForQuery(Query *query);
|
||||||
extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,
|
extern DeferredErrorMessage * TargetlistAndFunctionsSupported(Oid resultRelationId,
|
||||||
|
@ -120,5 +124,7 @@ extern Job * RouterJob(Query *originalQuery,
|
||||||
DeferredErrorMessage **planningError);
|
DeferredErrorMessage **planningError);
|
||||||
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
|
extern bool ContainsOnlyLocalOrReferenceTables(RTEListProperties *rteProperties);
|
||||||
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractSourceResultRangeTableEntry(Query *query);
|
||||||
|
extern void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
||||||
|
DistributedPlan *plan);
|
||||||
|
|
||||||
#endif /* MULTI_ROUTER_PLANNER_H */
|
#endif /* MULTI_ROUTER_PLANNER_H */
|
||||||
|
|
|
@ -0,0 +1,236 @@
|
||||||
|
-- Test local execution with local plan in a sharded environment.
|
||||||
|
-- This is an enhancement to local execution where instead of deparsing
|
||||||
|
-- and compiling the shard query, the planner replaces the OID of the
|
||||||
|
-- distributed table with the OID of the local shard in the parse tree
|
||||||
|
-- and plans that.
|
||||||
|
--
|
||||||
|
-- https://github.com/citusdata/citus/pull/8035
|
||||||
|
CREATE SCHEMA local_shard_execution_local_plan;
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
SET citus.next_shard_id TO 86000000;
|
||||||
|
-- Test row-based sharding
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||||
|
SELECT create_distributed_table('test_tbl', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
setseed
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_tbl
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS data_f
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
-- Put the shard on worker 1 to ensure consistent test output across different schedules
|
||||||
|
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1
|
||||||
|
SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
wait_for_resource_cleanup
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
-- This query resolves to a single shard (aka fast path)
|
||||||
|
-- which is located on worker_1; with client_min_messages
|
||||||
|
-- at DEBUG2 we see a message that the planner is avoiding
|
||||||
|
-- query deparse and plan
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SET citus.enable_local_execution_local_plan TO OFF;
|
||||||
|
-- With local execution local plan disabled, the same query
|
||||||
|
-- does query deparse and planning of the shard query and
|
||||||
|
-- provides the same results
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
-- Run the same query on the other worker - the local
|
||||||
|
-- execution path is not taken because the shard is not
|
||||||
|
-- local to this worker
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM local_shard_execution_local_plan.test_tbl_86000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP SCHEMA local_shard_execution_local_plan CASCADE;
|
||||||
|
NOTICE: drop cascades to table local_shard_execution_local_plan.test_tbl
|
||||||
|
-- Now test local execution with local plan for a schema sharded table.
|
||||||
|
SET citus.enable_schema_based_sharding to on;
|
||||||
|
CREATE SCHEMA schema_sharding_test;
|
||||||
|
SET search_path TO schema_sharding_test;
|
||||||
|
SET citus.next_shard_id TO 87000000;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
setseed
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_tbl
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS data_f
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
-- Put the shard on worker 2 to ensure consistent test output across different schedules
|
||||||
|
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2
|
||||||
|
SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
wait_for_resource_cleanup
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
-- Run the test query on worker_1; with schema based sharding
|
||||||
|
-- the data is not local to this worker so local execution
|
||||||
|
-- path is not taken.
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
NOTICE: issuing SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
-- Run the test query on worker_2; with schema based sharding
|
||||||
|
-- the data is local to this worker so local execution
|
||||||
|
-- path is taken, and the planner avoids query deparse and
|
||||||
|
-- planning of the shard query.
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
|
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SET citus.enable_local_execution_local_plan TO OFF;
|
||||||
|
-- Run the test query on worker_2 but with local execution
|
||||||
|
-- local plan disabled; now the planner does query deparse
|
||||||
|
-- and planning of the shard query.
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
NOTICE: executing the command locally: SELECT b, avg(data_f) AS avg, min(data_f) AS min, max(data_f) AS max, count(1) AS count FROM schema_sharding_test.test_tbl_87000000 test_tbl WHERE ((a OPERATOR(pg_catalog.=) 8) AND (b OPERATOR(pg_catalog.=) ANY (ARRAY[1, 3, 5, 8, 13, 21]))) GROUP BY b ORDER BY b
|
||||||
|
b | avg | min | max | count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 4930.97455169836 | 130.09338419238 | 9961.37766951669 | 21
|
||||||
|
3 | 5587.38637430731 | 1230.07668620184 | 9937.96225230491 | 23
|
||||||
|
5 | 3987.47953221387 | 437.362539823312 | 9729.29912509372 | 25
|
||||||
|
8 | 5028.45408903437 | 593.546207093687 | 9869.93823005882 | 27
|
||||||
|
13 | 3900.7835426648 | 510.078935445757 | 7784.07104505068 | 18
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
DROP SCHEMA schema_sharding_test CASCADE;
|
||||||
|
NOTICE: drop cascades to table schema_sharding_test.test_tbl
|
||||||
|
RESET ALL;
|
|
@ -2410,8 +2410,10 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo
|
||||||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||||
DEBUG: Distributed planning for a fast-path router query
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
DEBUG: query has a single distribution column value: 16
|
DEBUG: query has a single distribution column value: 16
|
||||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
|
@ -2420,8 +2422,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
||||||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||||
DEBUG: Distributed planning for a fast-path router query
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
DEBUG: query has a single distribution column value: 16
|
DEBUG: query has a single distribution column value: 16
|
||||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
count
|
count
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
1
|
1
|
||||||
|
@ -2430,8 +2434,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
||||||
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
|
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
|
||||||
DEBUG: Distributed planning for a fast-path router query
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
DEBUG: query has a single distribution column value: 16
|
DEBUG: query has a single distribution column value: 16
|
||||||
NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
INSERT INTO event_responses VALUES (16, 666, 'maybe')
|
INSERT INTO event_responses VALUES (16, 666, 'maybe')
|
||||||
ON CONFLICT (event_id, user_id)
|
ON CONFLICT (event_id, user_id)
|
||||||
DO UPDATE SET response = EXCLUDED.response RETURNING *;
|
DO UPDATE SET response = EXCLUDED.response RETURNING *;
|
||||||
|
@ -2529,8 +2535,10 @@ SET citus.log_remote_commands TO ON;
|
||||||
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
|
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
|
||||||
DEBUG: Distributed planning for a fast-path router query
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||||
DEBUG: query has a single distribution column value: 2
|
DEBUG: query has a single distribution column value: 2
|
||||||
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
|
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
|
||||||
|
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||||
event_id | user_id | response
|
event_id | user_id | response
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
|
@ -193,8 +193,102 @@ execute p4(8);
|
||||||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||||
execute p4(8);
|
execute p4(8);
|
||||||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||||
|
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||||
|
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||||
|
-- the dropped column means the attribute check between the distributed table and
|
||||||
|
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||||
|
-- for local execution", indicating that router planning has detected the difference.
|
||||||
|
--
|
||||||
|
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||||
|
SET client_min_messages to DEBUG2;
|
||||||
|
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
execute p5(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- one another combination is that the shell table
|
-- one other combination is that the shell table
|
||||||
-- has a dropped column but not the shard, via rebalance operation
|
-- has a dropped column but not the shard, via rebalance operation
|
||||||
SET search_path TO local_shard_execution_dropped_column;
|
SET search_path TO local_shard_execution_dropped_column;
|
||||||
ALTER TABLE t1 DROP COLUMN a;
|
ALTER TABLE t1 DROP COLUMN a;
|
||||||
|
@ -204,6 +298,12 @@ SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localho
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
wait_for_resource_cleanup
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
SET search_path TO local_shard_execution_dropped_column;
|
SET search_path TO local_shard_execution_dropped_column;
|
||||||
-- show the dropped columns
|
-- show the dropped columns
|
||||||
|
@ -331,6 +431,108 @@ execute p3(8);
|
||||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||||
execute p3(8);
|
execute p3(8);
|
||||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||||
|
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||||
|
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||||
|
-- for this scenario (rebalance) also.
|
||||||
|
--
|
||||||
|
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||||
|
SET client_min_messages to DEBUG2;
|
||||||
|
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
DEBUG: Distributed planning for a fast-path router query
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||||
|
DEBUG: query has a single distribution column value: 8
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
execute p4(5);
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||||
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1
|
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1
|
||||||
|
|
|
@ -70,6 +70,7 @@ test: metadata_sync_helpers
|
||||||
|
|
||||||
test: issue_6592
|
test: issue_6592
|
||||||
test: executor_local_failure
|
test: executor_local_failure
|
||||||
|
test: local_execution_local_plan
|
||||||
|
|
||||||
# test that no tests leaked intermediate results. This should always be last
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
test: ensure_no_intermediate_data_leak
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
-- Test local execution with local plan in a sharded environment.
|
||||||
|
-- This is an enhancement to local execution where instead of deparsing
|
||||||
|
-- and compiling the shard query, the planner replaces the OID of the
|
||||||
|
-- distributed table with the OID of the local shard in the parse tree
|
||||||
|
-- and plans that.
|
||||||
|
--
|
||||||
|
-- https://github.com/citusdata/citus/pull/8035
|
||||||
|
|
||||||
|
CREATE SCHEMA local_shard_execution_local_plan;
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 86000000;
|
||||||
|
|
||||||
|
-- Test row-based sharding
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||||
|
SELECT create_distributed_table('test_tbl', 'a');
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
INSERT INTO test_tbl
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS data_f
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
|
||||||
|
-- Put the shard on worker 1 to ensure consistent test output across different schedules
|
||||||
|
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 1
|
||||||
|
SELECT citus_move_shard_placement(86000000, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'block_writes');
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- This query resolves to a single shard (aka fast path)
|
||||||
|
-- which is located on worker_1; with client_min_messages
|
||||||
|
-- at DEBUG2 we see a message that the planner is avoiding
|
||||||
|
-- query deparse and plan
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
SET citus.enable_local_execution_local_plan TO OFF;
|
||||||
|
|
||||||
|
-- With local execution local plan disabled, the same query
|
||||||
|
-- does query deparse and planning of the shard query and
|
||||||
|
-- provides the same results
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO local_shard_execution_local_plan;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- Run the same query on the other worker - the local
|
||||||
|
-- execution path is not taken because the shard is not
|
||||||
|
-- local to this worker
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
DROP SCHEMA local_shard_execution_local_plan CASCADE;
|
||||||
|
|
||||||
|
-- Now test local execution with local plan for a schema sharded table.
|
||||||
|
|
||||||
|
SET citus.enable_schema_based_sharding to on;
|
||||||
|
CREATE SCHEMA schema_sharding_test;
|
||||||
|
SET search_path TO schema_sharding_test;
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 87000000;
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE test_tbl (a int, b int, data_f double precision);
|
||||||
|
SELECT setseed(0.42); -- make the random data inserted deterministic
|
||||||
|
INSERT INTO test_tbl
|
||||||
|
SELECT (random()*20)::int AS a,
|
||||||
|
(random()*20)::int AS b,
|
||||||
|
random()*10000.0 AS data_f
|
||||||
|
FROM generate_series(1, 10000);
|
||||||
|
|
||||||
|
-- Put the shard on worker 2 to ensure consistent test output across different schedules
|
||||||
|
SET client_min_messages to ERROR; -- suppress warning if shard is already on worker 2
|
||||||
|
SELECT citus_move_shard_placement(87000000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- Run the test query on worker_1; with schema based sharding
|
||||||
|
-- the data is not local to this worker so local execution
|
||||||
|
-- path is not taken.
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
|
||||||
|
-- Run the test query on worker_2; with schema based sharding
|
||||||
|
-- the data is local to this worker so local execution
|
||||||
|
-- path is taken, and the planner avoids query deparse and
|
||||||
|
-- planning of the shard query.
|
||||||
|
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
SET citus.enable_local_execution_local_plan TO OFF;
|
||||||
|
|
||||||
|
-- Run the test query on worker_2 but with local execution
|
||||||
|
-- local plan disabled; now the planner does query deparse
|
||||||
|
-- and planning of the shard query.
|
||||||
|
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
|
||||||
|
FROM schema_sharding_test.test_tbl
|
||||||
|
WHERE a = 8 AND b IN (1,3,5,8,13,21)
|
||||||
|
GROUP BY b
|
||||||
|
ORDER BY b;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
DROP SCHEMA schema_sharding_test CASCADE;
|
||||||
|
RESET ALL;
|
|
@ -78,14 +78,37 @@ execute p4(8);
|
||||||
execute p4(8);
|
execute p4(8);
|
||||||
execute p4(8);
|
execute p4(8);
|
||||||
|
|
||||||
|
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||||
|
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||||
|
-- the dropped column means the attribute check between the distributed table and
|
||||||
|
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||||
|
-- for local execution", indicating that router planning has detected the difference.
|
||||||
|
--
|
||||||
|
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||||
|
|
||||||
|
SET client_min_messages to DEBUG2;
|
||||||
|
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
execute p5(5);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
-- one another combination is that the shell table
|
-- one other combination is that the shell table
|
||||||
-- has a dropped column but not the shard, via rebalance operation
|
-- has a dropped column but not the shard, via rebalance operation
|
||||||
SET search_path TO local_shard_execution_dropped_column;
|
SET search_path TO local_shard_execution_dropped_column;
|
||||||
ALTER TABLE t1 DROP COLUMN a;
|
ALTER TABLE t1 DROP COLUMN a;
|
||||||
|
|
||||||
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
|
||||||
|
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
SET search_path TO local_shard_execution_dropped_column;
|
SET search_path TO local_shard_execution_dropped_column;
|
||||||
|
@ -132,5 +155,25 @@ execute p3(8);
|
||||||
execute p3(8);
|
execute p3(8);
|
||||||
execute p3(8);
|
execute p3(8);
|
||||||
|
|
||||||
|
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||||
|
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||||
|
-- for this scenario (rebalance) also.
|
||||||
|
--
|
||||||
|
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||||
|
|
||||||
|
SET client_min_messages to DEBUG2;
|
||||||
|
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
execute p4(5);
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue