mirror of https://github.com/citusdata/citus.git
Check that attributes are the same in citus and shard tables.
Retain query in task for EXPLAIN ANALYZE and debug messages. Add debug messages so that tests in `local_shard_execution` reflect that the shortcut is applied (happy path) and tests in and `local_shard_execution_dropped_columns` reflect that the short cut cannot be applied.
parent
55a68b6ec5
commit
b6b1dfe6bb
|
@ -363,6 +363,9 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
}
|
||||
else
|
||||
{
|
||||
ereport(DEBUG2, (errmsg(
|
||||
"Local executor: Using task's cached local plan for task %u",
|
||||
task->taskId)));
|
||||
localPlan = TaskQueryLocalPlan(task);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -440,11 +440,12 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
|
|||
|
||||
|
||||
void
|
||||
SetTaskQueryPlan(Task *task, PlannedStmt *localPlan)
|
||||
SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan)
|
||||
{
|
||||
Assert(localPlan != NULL);
|
||||
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
|
||||
task->taskQuery.data.localPlan = localPlan;
|
||||
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
|
||||
task->localPlan = localPlan;
|
||||
task->queryCount = 1;
|
||||
}
|
||||
|
||||
|
@ -453,7 +454,7 @@ PlannedStmt *
|
|||
TaskQueryLocalPlan(Task *task)
|
||||
{
|
||||
Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN);
|
||||
return task->taskQuery.data.localPlan;
|
||||
return task->localPlan;
|
||||
}
|
||||
|
||||
|
||||
|
@ -515,8 +516,6 @@ TaskQueryStringAtIndex(Task *task, int index)
|
|||
}
|
||||
|
||||
|
||||
static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'";
|
||||
|
||||
/*
|
||||
* TaskQueryString generates task query string text if missing.
|
||||
*
|
||||
|
@ -546,7 +545,14 @@ TaskQueryString(Task *task)
|
|||
}
|
||||
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
|
||||
{
|
||||
return qry_unavailable_msg;
|
||||
Query *query = task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
|
||||
Assert(query != NULL);
|
||||
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
|
||||
query));
|
||||
UpdateRelationToShardNames((Node *) query, task->relationShardList);
|
||||
MemoryContextSwitchTo(previousContext);
|
||||
return AnnotateQuery(DeparseTaskQuery(task, query),
|
||||
task->partitionKeyValue, task->colocationId);
|
||||
}
|
||||
|
||||
Query *jobQueryReferenceForLazyDeparsing =
|
||||
|
|
|
@ -653,6 +653,10 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
|
|||
{
|
||||
FastPathRestrictionContext *fastPathContext =
|
||||
planContext->plannerRestrictionContext->fastPathRestrictionContext;
|
||||
Assert(fastPathContext != NULL);
|
||||
Assert(fastPathContext->fastPathRouterQuery);
|
||||
|
||||
FastPathPreprocessParseTree(planContext->query);
|
||||
|
||||
if (!fastPathContext->delayFastPathPlanning)
|
||||
{
|
||||
|
@ -2421,9 +2425,16 @@ CreateAndPushPlannerRestrictionContext(
|
|||
if (fastPathRestrictionContext != NULL)
|
||||
{
|
||||
/* copy the given fast path restriction context */
|
||||
memcpy(plannerRestrictionContext->fastPathRestrictionContext,
|
||||
fastPathRestrictionContext,
|
||||
sizeof(FastPathRestrictionContext));
|
||||
FastPathRestrictionContext *plannersFastPathCtx =
|
||||
plannerRestrictionContext->fastPathRestrictionContext;
|
||||
plannersFastPathCtx->fastPathRouterQuery =
|
||||
fastPathRestrictionContext->fastPathRouterQuery;
|
||||
plannersFastPathCtx->distributionKeyValue =
|
||||
fastPathRestrictionContext->distributionKeyValue;
|
||||
plannersFastPathCtx->distributionKeyHasParam =
|
||||
fastPathRestrictionContext->distributionKeyHasParam;
|
||||
plannersFastPathCtx->delayFastPathPlanning =
|
||||
fastPathRestrictionContext->delayFastPathPlanning;
|
||||
}
|
||||
|
||||
plannerRestrictionContext->memoryContext = CurrentMemoryContext;
|
||||
|
|
|
@ -63,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
|||
Var *column,
|
||||
Node **distributionKeyValue);
|
||||
|
||||
void
|
||||
FastPathPreprocessParseTree(Query *parse)
|
||||
{
|
||||
/*
|
||||
* Citus planner relies on some of the transformations on constant
|
||||
* evaluation on the parse tree.
|
||||
*/
|
||||
parse->targetList =
|
||||
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
||||
parse->jointree->quals =
|
||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FastPathPlanner is intended to be used instead of standard_planner() for trivial
|
||||
|
@ -75,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node,
|
|||
PlannedStmt *
|
||||
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
|
||||
{
|
||||
/*
|
||||
* Citus planner relies on some of the transformations on constant
|
||||
* evaluation on the parse tree.
|
||||
*/
|
||||
parse->targetList =
|
||||
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
|
||||
parse->jointree->quals =
|
||||
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
|
||||
|
||||
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
|
||||
|
||||
return result;
|
||||
|
@ -148,6 +152,51 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
InitializeFastPathContext(FastPathRestrictionContext *fastPathContext,
|
||||
Node *distributionKeyValue,
|
||||
bool canAvoidDeparse,
|
||||
Query *query)
|
||||
{
|
||||
Assert(fastPathContext != NULL);
|
||||
Assert(!fastPathContext->fastPathRouterQuery);
|
||||
Assert(!fastPathContext->delayFastPathPlanning);
|
||||
|
||||
/*
|
||||
* We're looking at a fast path query, so we can fill the
|
||||
* fastPathContext with relevant details.
|
||||
*/
|
||||
fastPathContext->fastPathRouterQuery = true;
|
||||
if (distributionKeyValue == NULL)
|
||||
{
|
||||
/* nothing to record */
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Const))
|
||||
{
|
||||
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Param))
|
||||
{
|
||||
fastPathContext->distributionKeyHasParam = true;
|
||||
}
|
||||
|
||||
if (EnableFastPathLocalExecutor)
|
||||
{
|
||||
/*
|
||||
* This fast path query may be executed by the local executor.
|
||||
* We need to delay the fast path planning until we know if the
|
||||
* shard is local or not. Make a final check for volatile
|
||||
* functions in the query tree to determine if we should delay
|
||||
* the fast path planning.
|
||||
*/
|
||||
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
|
||||
!FindNodeMatchingCheckFunction(
|
||||
(Node *) query,
|
||||
CitusIsVolatileFunction);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FastPathRouterQuery gets a query and returns true if the query is eligible for
|
||||
* being a fast path router query. It also fills the given fastPathContext with
|
||||
|
@ -175,7 +224,6 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
|||
bool isFastPath = false;
|
||||
bool canAvoidDeparse = false;
|
||||
Node *distributionKeyValue = NULL;
|
||||
RangeTblEntry *rangeTableEntry = NULL;
|
||||
|
||||
if (!EnableFastPathRouterPlanner)
|
||||
{
|
||||
|
@ -207,8 +255,8 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
|||
else if (query->commandType == CMD_INSERT)
|
||||
{
|
||||
/* we don't need to do any further checks, all INSERTs are fast-path */
|
||||
isFastPath = true;
|
||||
goto returnFastPath;
|
||||
InitializeFastPathContext(fastPathContext, NULL, true, query);
|
||||
return true;
|
||||
}
|
||||
|
||||
/* make sure that the only range table in FROM clause */
|
||||
|
@ -217,7 +265,7 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
|||
return false;
|
||||
}
|
||||
|
||||
rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
|
||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
|
||||
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||
{
|
||||
return false;
|
||||
|
@ -281,52 +329,10 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
|
|||
!ColumnAppearsMultipleTimes(quals, distributionKey));
|
||||
}
|
||||
|
||||
returnFastPath:
|
||||
|
||||
if (isFastPath)
|
||||
{
|
||||
Assert(fastPathContext != NULL);
|
||||
Assert(!fastPathContext->fastPathRouterQuery);
|
||||
Assert(!fastPathContext->delayFastPathPlanning);
|
||||
|
||||
/*
|
||||
* We're looking at a fast path query, so we can fill the
|
||||
* fastPathContext with relevant details.
|
||||
*/
|
||||
fastPathContext->fastPathRouterQuery = true;
|
||||
if (distributionKeyValue == NULL)
|
||||
{
|
||||
/* nothing to record */
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Const))
|
||||
{
|
||||
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
|
||||
}
|
||||
else if (IsA(distributionKeyValue, Param))
|
||||
{
|
||||
fastPathContext->distributionKeyHasParam = true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Note the range table entry for the table we're querying.
|
||||
*/
|
||||
Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT);
|
||||
fastPathContext->distTableRte = rangeTableEntry;
|
||||
|
||||
if (EnableFastPathLocalExecutor)
|
||||
{
|
||||
/*
|
||||
* This fast path query may be executed by the local executor.
|
||||
* We need to delay the fast path planning until we know if the
|
||||
* shard is local or not. Make a final check for volatile
|
||||
* functions in the query tree to determine if we should delay
|
||||
* the fast path planning.
|
||||
*/
|
||||
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
|
||||
!FindNodeMatchingCheckFunction(
|
||||
(Node *) query,
|
||||
CitusIsVolatileFunction);
|
||||
}
|
||||
InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse,
|
||||
query);
|
||||
}
|
||||
|
||||
return isFastPath;
|
||||
|
|
|
@ -16,6 +16,8 @@
|
|||
#include "postgres.h"
|
||||
|
||||
#include "access/stratnum.h"
|
||||
#include "access/tupdesc.h"
|
||||
#include "access/tupdesc_details.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_opfamily.h"
|
||||
#include "catalog/pg_proc.h"
|
||||
|
@ -175,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
|||
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
|
||||
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
|
||||
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
|
||||
static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
|
||||
static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
|
||||
|
||||
/*
|
||||
* CreateRouterPlan attempts to create a router executor plan for the given
|
||||
|
@ -1952,6 +1954,75 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CheckAttributesMatch checks if the attributes of the Citus table and the shard
|
||||
* table match.
|
||||
*
|
||||
* It is used to ensure that the shard table has the same schema as the Citus
|
||||
* table before replacing the Citus table OID with the shard table OID in the
|
||||
* parse tree we (Citus planner) recieved from Postgres.
|
||||
*/
|
||||
static
|
||||
bool
|
||||
CheckAttributesMatch(Oid citusTableId, Oid shardTableId)
|
||||
{
|
||||
Relation citusR, shardR;
|
||||
bool same_schema = false;
|
||||
|
||||
citusR = RelationIdGetRelation(citusTableId);
|
||||
shardR = RelationIdGetRelation(shardTableId);
|
||||
|
||||
if (RelationIsValid(citusR) && RelationIsValid(shardR))
|
||||
{
|
||||
TupleDesc citusTupDesc = citusR->rd_att;
|
||||
TupleDesc shardTupDesc = shardR->rd_att;
|
||||
|
||||
if (citusTupDesc->natts == shardTupDesc->natts)
|
||||
{
|
||||
/*
|
||||
* Do an attribute-by-attribute comparison. This is borrowed from
|
||||
* the Postgres function equalTupleDescs(), which we cannot use
|
||||
* because the citus table and shard table have different composite
|
||||
* types.
|
||||
*/
|
||||
same_schema = true;
|
||||
for (int i = 0; i < citusTupDesc->natts && same_schema; i++)
|
||||
{
|
||||
Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i);
|
||||
Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i);
|
||||
|
||||
if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->atttypid != attr2->atttypid)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->atttypmod != attr2->atttypmod)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
if (attr1->attcollation != attr2->attcollation)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
|
||||
/* Record types derived from tables could have dropped fields. */
|
||||
if (attr1->attisdropped != attr2->attisdropped)
|
||||
{
|
||||
same_schema = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
RelationClose(citusR);
|
||||
RelationClose(shardR);
|
||||
return same_schema;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
||||
DistributedPlan *plan)
|
||||
|
@ -1969,7 +2040,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
|||
|
||||
if (job->deferredPruning)
|
||||
{
|
||||
/* Call fast path query planner, Save plan in planContext->plan */
|
||||
/* Execution time pruning => don't know which shard at this point */
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
|
@ -1983,35 +2054,44 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
|||
Assert(list_length(placements) > 0);
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements);
|
||||
List *relationShards = task->relationShardList;
|
||||
Assert(list_length(relationShards) == 1);
|
||||
bool isLocalExecution = (primaryPlacement->groupId == localGroupId);
|
||||
|
||||
bool isLocalExecution = !IsDummyPlacement(primaryPlacement) &&
|
||||
(primaryPlacement->groupId == localGroupId);
|
||||
bool canBuildLocalPlan = true;
|
||||
|
||||
if (isLocalExecution)
|
||||
{
|
||||
ConvertToQueryOnShard(planContext->query,
|
||||
fastPathContext->distTableRte->relid,
|
||||
primaryPlacement->shardId);
|
||||
List *relationShards = task->relationShardList;
|
||||
Assert(list_length(relationShards) == 1);
|
||||
RelationShard *relationShard = (RelationShard *) linitial(relationShards);
|
||||
Assert(relationShard->shardId == primaryPlacement->shardId);
|
||||
|
||||
/* Plan the query with the new shard relation id */
|
||||
/* Save plan in planContext->plan */
|
||||
planContext->plan = standard_planner(planContext->query, NULL,
|
||||
planContext->cursorOptions,
|
||||
planContext->boundParams);
|
||||
SetTaskQueryPlan(task, planContext->plan);
|
||||
canBuildLocalPlan = ConvertToQueryOnShard(planContext->query,
|
||||
relationShard->relationId,
|
||||
relationShard->shardId);
|
||||
if (canBuildLocalPlan)
|
||||
{
|
||||
/* Plan the query with the new shard relation id */
|
||||
planContext->plan = standard_planner(planContext->query, NULL,
|
||||
planContext->cursorOptions,
|
||||
planContext->boundParams);
|
||||
SetTaskQueryPlan(task, job->jobQuery, planContext->plan);
|
||||
|
||||
ereport(DEBUG2, (errmsg("Local plan for fast-path router "
|
||||
"query")));
|
||||
}
|
||||
else
|
||||
{
|
||||
/* Call fast path query planner, Save plan in planContext->plan */
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
UpdateRelationToShardNames((Node *) job->jobQuery, relationShards);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
|
||||
ereport(DEBUG2, (errmsg(
|
||||
"Fast-path router query: created local execution plan "
|
||||
"to avoid deparse to and compile of shard query")));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan));
|
||||
|
||||
/* Fall back to fast path planner and generating SQL query on the shard */
|
||||
planContext->plan = FastPathPlanner(planContext->originalQuery,
|
||||
planContext->query,
|
||||
planContext->boundParams);
|
||||
UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList);
|
||||
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
|
||||
}
|
||||
|
||||
|
||||
|
@ -2029,7 +2109,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
|
|||
* changes the RTEPermissionInfo's relid to the shard's relation id also.
|
||||
* At this point the Query is ready for the postgres planner.
|
||||
*/
|
||||
static Query *
|
||||
static bool
|
||||
ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
||||
{
|
||||
Assert(list_length(query->rtable) == 1);
|
||||
|
@ -2060,6 +2140,27 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
|||
citusTableRte->rellockmode,
|
||||
0, NULL, NULL); /* todo - use suitable callback for perms check? */
|
||||
|
||||
/* Verify that the attributes of citus table and shard table match */
|
||||
if (!CheckAttributesMatch(citusTableOid, shardRelationId))
|
||||
{
|
||||
/* There is a difference between the attributes of the citus
|
||||
* table and the shard table. This can happen if there is a DROP
|
||||
* COLUMN on the citus table. In this case, we cannot
|
||||
* convert the query to a shard query, so clean up and return.
|
||||
*/
|
||||
UnlockRelationOid(shardRelationId, citusTableRte->rellockmode);
|
||||
ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||
errmsg(
|
||||
"Router planner fast path cannot modify parse tree for local execution: shard table \"%s.%s\" does not match the "
|
||||
"distributed table \"%s.%s\"",
|
||||
schemaName, shardRelationName, schemaName,
|
||||
citusTableName)));
|
||||
pfree(shardRelationName);
|
||||
pfree(schemaName);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/* Change the target list entries that reference the original citus table's relation id */
|
||||
ListCell *lc = NULL;
|
||||
foreach(lc, query->targetList)
|
||||
|
@ -2071,7 +2172,6 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/* Change the range table entry's oid to that of the shard's */
|
||||
Assert(shardRelationId != InvalidOid);
|
||||
citusTableRte->relid = shardRelationId;
|
||||
|
@ -2084,7 +2184,7 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
|
|||
rtePermInfo->relid = shardRelationId;
|
||||
#endif
|
||||
|
||||
return query;
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1379,7 +1379,7 @@ RegisterCitusConfigVariables(void)
|
|||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomBoolVariable(
|
||||
"citus.enable_fast_path_local_execution",
|
||||
"citus.enable_local_execution_fast_path",
|
||||
gettext_noop("Enables the planner to avoid a query deparse and planning if "
|
||||
"the shard is local to the current node."),
|
||||
NULL,
|
||||
|
@ -2822,7 +2822,7 @@ ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source)
|
|||
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg(
|
||||
"citus.enable_local_execution must be set in order for "
|
||||
"citus.enable_fast_path_local_execution to be effective.")));
|
||||
"citus.enable_local_execution_fast_path to be effective.")));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -289,7 +289,8 @@ CopyTaskQuery(Task *newnode, Task *from)
|
|||
|
||||
case TASK_QUERY_LOCAL_PLAN:
|
||||
{
|
||||
COPY_NODE_FIELD(taskQuery.data.localPlan);
|
||||
COPY_NODE_FIELD(localPlan);
|
||||
COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
|
|||
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
|
||||
extern void SetTaskQueryString(Task *task, char *queryString);
|
||||
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
|
||||
extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan);
|
||||
extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan);
|
||||
extern char * TaskQueryString(Task *task);
|
||||
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
|
||||
extern char * TaskQueryStringAtIndex(Task *task, int index);
|
||||
|
|
|
@ -101,15 +101,10 @@ typedef struct FastPathRestrictionContext
|
|||
bool distributionKeyHasParam;
|
||||
|
||||
/*
|
||||
* Indicates to hold off on callning the fast path planner until its
|
||||
* known if the shard is local
|
||||
* Indicates to hold off calling the fast path planner until its
|
||||
* known if the shard is local or not.
|
||||
*/
|
||||
bool delayFastPathPlanning;
|
||||
|
||||
/*
|
||||
* Range table entry for the table we're querying
|
||||
*/
|
||||
RangeTblEntry *distTableRte;
|
||||
} FastPathRestrictionContext;
|
||||
|
||||
typedef struct PlannerRestrictionContext
|
||||
|
|
|
@ -220,8 +220,6 @@ typedef struct TaskQuery
|
|||
* when we want to access each query string.
|
||||
*/
|
||||
List *queryStringList;
|
||||
|
||||
PlannedStmt *localPlan; /* only applies to local tasks */
|
||||
}data;
|
||||
}TaskQuery;
|
||||
|
||||
|
@ -337,6 +335,8 @@ typedef struct Task
|
|||
|
||||
Const *partitionKeyValue;
|
||||
int colocationId;
|
||||
|
||||
PlannedStmt *localPlan; /* only applies to local tasks */
|
||||
} Task;
|
||||
|
||||
|
||||
|
|
|
@ -100,6 +100,7 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
|
|||
* keep the external function here.
|
||||
*/extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse);
|
||||
|
||||
extern void FastPathPreprocessParseTree(Query *parse);
|
||||
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
|
||||
boundParams);
|
||||
extern bool FastPathRouterQuery(Query *query,
|
||||
|
|
|
@ -2410,8 +2410,10 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo
|
|||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -2420,8 +2422,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
|||
SELECT count(*) FROM event_responses WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -2430,8 +2434,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
|||
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 16
|
||||
NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
INSERT INTO event_responses VALUES (16, 666, 'maybe')
|
||||
ON CONFLICT (event_id, user_id)
|
||||
DO UPDATE SET response = EXCLUDED.response RETURNING *;
|
||||
|
@ -2529,8 +2535,10 @@ SET citus.log_remote_commands TO ON;
|
|||
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
|
||||
DEBUG: query has a single distribution column value: 2
|
||||
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
|
||||
DEBUG: Local executor: Using task's cached local plan for task 0
|
||||
event_id | user_id | response
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
|
|
@ -193,8 +193,102 @@ execute p4(8);
|
|||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||
execute p4(8);
|
||||
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||
-- the dropped column means the attribute check between the distributed table and
|
||||
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||
-- for local execution", indicating that router planning has detected the difference.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
execute p5(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :master_port
|
||||
-- one another combination is that the shell table
|
||||
-- one other combination is that the shell table
|
||||
-- has a dropped column but not the shard, via rebalance operation
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
ALTER TABLE t1 DROP COLUMN a;
|
||||
|
@ -331,6 +425,108 @@ execute p3(8);
|
|||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||
execute p3(8);
|
||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||
-- for this scenario (rebalance) also.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
DEBUG: Distributed planning for a fast-path router query
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
|
||||
DEBUG: query has a single distribution column value: 8
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
execute p4(5);
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
RESET client_min_messages;
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1
|
||||
|
|
|
@ -78,9 +78,31 @@ execute p4(8);
|
|||
execute p4(8);
|
||||
execute p4(8);
|
||||
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID, as
|
||||
-- the dropped column means the attribute check between the distributed table and
|
||||
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
|
||||
-- for local execution", indicating that router planning has detected the difference.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
execute p5(5);
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
-- one another combination is that the shell table
|
||||
-- one other combination is that the shell table
|
||||
-- has a dropped column but not the shard, via rebalance operation
|
||||
SET search_path TO local_shard_execution_dropped_column;
|
||||
ALTER TABLE t1 DROP COLUMN a;
|
||||
|
@ -132,5 +154,25 @@ execute p3(8);
|
|||
execute p3(8);
|
||||
execute p3(8);
|
||||
|
||||
-- Test that "Avoid deparse and planning of shard query for local execution"
|
||||
-- does not take the fast path of modifying the parse tree with the shard OID
|
||||
-- for this scenario (rebalance) also.
|
||||
--
|
||||
-- (*) https://github.com/citusdata/citus/pull/8035
|
||||
|
||||
SET client_min_messages to DEBUG2;
|
||||
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
execute p4(5);
|
||||
RESET client_min_messages;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
|
||||
|
|
Loading…
Reference in New Issue