POC: fix a few bugs found in regress testing

colm/sshard-poc
Colm McHugh 2025-06-10 15:35:58 +00:00
parent 61962fe35c
commit c4a8d4c45e
5 changed files with 38 additions and 26 deletions

View File

@ -515,6 +515,8 @@ TaskQueryStringAtIndex(Task *task, int index)
} }
static char *plan_msg = "Task query optimized away";
/* /*
* TaskQueryString generates task query string text if missing. * TaskQueryString generates task query string text if missing.
* *
@ -542,6 +544,10 @@ TaskQueryString(Task *task)
{ {
return task->taskQuery.data.queryStringLazy; return task->taskQuery.data.queryStringLazy;
} }
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
{
return plan_msg;
}
Query *jobQueryReferenceForLazyDeparsing = Query *jobQueryReferenceForLazyDeparsing =
task->taskQuery.data.jobQueryReferenceForLazyDeparsing; task->taskQuery.data.jobQueryReferenceForLazyDeparsing;

View File

@ -1299,8 +1299,6 @@ CreateFastPathDistributedPlan(DistributedPlanningContext *planContext)
Query *originalQuery = planContext->originalQuery; Query *originalQuery = planContext->originalQuery;
Query *query = planContext->query; Query *query = planContext->query;
ParamListInfo boundParams = planContext->boundParams; ParamListInfo boundParams = planContext->boundParams;
PlannerRestrictionContext *plannerRestrictionContext =
planContext->plannerRestrictionContext;
bool hasUnresolvedParams = false; bool hasUnresolvedParams = false;
if (HasUnresolvedExternParamsWalker((Node *) originalQuery, if (HasUnresolvedExternParamsWalker((Node *) originalQuery,
@ -1533,7 +1531,7 @@ GetDistributedPlan(CustomScan *customScan)
Node *node = (Node *) linitial(customScan->custom_private); Node *node = (Node *) linitial(customScan->custom_private);
Assert(CitusIsA(node, DistributedPlan)); Assert(CitusIsA(node, DistributedPlan));
/* CheckNodeCopyAndSerialization(node); commented out for local perf profiling */ CheckNodeCopyAndSerialization(node);
DistributedPlan *distributedPlan = (DistributedPlan *) node; DistributedPlan *distributedPlan = (DistributedPlan *) node;

View File

@ -43,6 +43,7 @@
#include "pg_version_constants.h" #include "pg_version_constants.h"
#include "distributed/citus_clauses.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/insert_select_planner.h" #include "distributed/insert_select_planner.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -171,7 +172,7 @@ FastPathRouterQuery(Query *query, const char *query_string,
FromExpr *joinTree = query->jointree; FromExpr *joinTree = query->jointree;
Node *quals = NULL; Node *quals = NULL;
bool isFastPath = false; bool isFastPath = false;
bool isDistributedTable = false; bool canAvoidDeparse = false;
Node *distributionKeyValue = NULL; Node *distributionKeyValue = NULL;
RangeTblEntry *rangeTableEntry = NULL; RangeTblEntry *rangeTableEntry = NULL;
@ -206,7 +207,7 @@ FastPathRouterQuery(Query *query, const char *query_string,
{ {
/* we don't need to do any further checks, all INSERTs are fast-path */ /* we don't need to do any further checks, all INSERTs are fast-path */
isFastPath = true; isFastPath = true;
isDistributedTable = true; canAvoidDeparse = true;
goto returnFastPath; goto returnFastPath;
} }
@ -238,18 +239,21 @@ FastPathRouterQuery(Query *query, const char *query_string,
Var *distributionKey = PartitionColumn(distributedTableId, 1); Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey) if (!distributionKey)
{ {
isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry,
SINGLE_SHARD_DISTRIBUTED) || SINGLE_SHARD_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
/*(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) */
/* && (query->commandType == CMD_SELECT)) || */
false;
isFastPath = true; isFastPath = true;
} }
if (!isFastPath) if (!isFastPath)
{ {
isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE); canAvoidDeparse = IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE);
if (joinTree == NULL || if (joinTree == NULL ||
(joinTree->quals == NULL && !isDistributedTable)) (joinTree->quals == NULL && !canAvoidDeparse))
{ {
/* no quals, not a fast path query */ /* no quals, not a fast path query */
return false; return false;
@ -300,9 +304,12 @@ returnFastPath:
if (EnableSingShardFastPathPOC) if (EnableSingShardFastPathPOC)
{ {
Assert(rangeTableEntry != NULL); Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT);
fastPathContext->distTableRte = rangeTableEntry; fastPathContext->distTableRte = rangeTableEntry;
fastPathContext->delayFastPathPlanning = isDistributedTable; canAvoidDeparse = canAvoidDeparse &&
!FindNodeMatchingCheckFunction((Node *) query,
CitusIsVolatileFunction);
fastPathContext->delayFastPathPlanning = canAvoidDeparse;
/* If the dist key is parameterized the query will use the plan cache (todo: verify) */ /* If the dist key is parameterized the query will use the plan cache (todo: verify) */
fastPathContext->clientQueryString = query_string; fastPathContext->clientQueryString = query_string;

View File

@ -243,14 +243,13 @@ CreateFastPathRouterPlan(DistributedPlanningContext *planContext)
planContext->boundParams); planContext->boundParams);
} }
if (distributedPlan->planningError == NULL) distributedPlan->workerJob = job;
{ distributedPlan->combineQuery = NULL;
distributedPlan->workerJob = job; distributedPlan->expectResults = is_select || query->returningList != NIL;
distributedPlan->combineQuery = NULL; distributedPlan->targetRelationId = is_select ? InvalidOid :
distributedPlan->expectResults = is_select || query->returningList != NIL; ResultRelationOidForQuery(query);
distributedPlan->targetRelationId = is_select ? InvalidOid :
ResultRelationOidForQuery(query); ereport(DEBUG2, (errmsg("Creating router plan")));
}
/* todo: handle the case where planningError is not NULL */ /* todo: handle the case where planningError is not NULL */
} }
@ -2052,6 +2051,9 @@ RouterJobFastPath(DistributedPlanningContext *planContext,
Assert(!isMultiShardQuery); Assert(!isMultiShardQuery);
Assert(list_length(shardIntervals) == 1); Assert(list_length(shardIntervals) == 1);
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
"query")));
List *relationShards = RelationShardListForShardIntervalList(shardIntervals, List *relationShards = RelationShardListForShardIntervalList(shardIntervals,
&shardsPresent); &shardsPresent);
Assert(shardsPresent); Assert(shardsPresent);
@ -2064,12 +2066,13 @@ RouterJobFastPath(DistributedPlanningContext *planContext,
fastPathContext->distTableRte->relid); fastPathContext->distTableRte->relid);
Assert(cacheEntry != NULL); Assert(cacheEntry != NULL);
Assert(cacheEntry->relationId == shard->relationId); Assert(cacheEntry->relationId == shard->relationId);
Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE));
List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals, List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals,
true, false, shardsPresent,
true,
false); false);
Assert(list_length(taskPlacementList) == 1);
/* Assert(list_length(taskPlacementList) == 1); // not the case for reference tables */
ShardPlacement *primaryPlacement = ShardPlacement *primaryPlacement =
(ShardPlacement *) linitial(taskPlacementList); (ShardPlacement *) linitial(taskPlacementList);
Assert(primaryPlacement->shardId == shardId); Assert(primaryPlacement->shardId == shardId);

View File

@ -289,9 +289,7 @@ CopyTaskQuery(Task *newnode, Task *from)
case TASK_QUERY_LOCAL_PLAN: case TASK_QUERY_LOCAL_PLAN:
{ {
/*COPY_NODE_FIELD(taskQuery.data.localPlan); */ COPY_NODE_FIELD(taskQuery.data.localPlan);
/* This is a local planned statement, so shallow copy is enough */
COPY_SCALAR_FIELD(taskQuery.data.localPlan);
break; break;
} }