POC: add support for schema-based sharding

add GUC (disabled by default) for remote short path
colm/sshard-poc
Colm McHugh 2025-06-05 16:08:06 +00:00
parent 97cbfb33f0
commit 61962fe35c
4 changed files with 35 additions and 9 deletions

View File

@ -54,6 +54,8 @@
bool EnableFastPathRouterPlanner = true; bool EnableFastPathRouterPlanner = true;
bool EnableSingShardFastPathPOC = true; bool EnableSingShardFastPathPOC = true;
bool EnableSingShardFastRemotePOC = false;
static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey); static bool ColumnAppearsMultipleTimes(Node *quals, Var *distributionKey);
static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn, static bool DistKeyInSimpleOpExpression(Expr *clause, Var *distColumn,
@ -171,6 +173,7 @@ FastPathRouterQuery(Query *query, const char *query_string,
bool isFastPath = false; bool isFastPath = false;
bool isDistributedTable = false; bool isDistributedTable = false;
Node *distributionKeyValue = NULL; Node *distributionKeyValue = NULL;
RangeTblEntry *rangeTableEntry = NULL;
if (!EnableFastPathRouterPlanner) if (!EnableFastPathRouterPlanner)
{ {
@ -213,7 +216,7 @@ FastPathRouterQuery(Query *query, const char *query_string,
return false; return false;
} }
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
if (rangeTableEntry->rtekind != RTE_RELATION) if (rangeTableEntry->rtekind != RTE_RELATION)
{ {
return false; return false;
@ -235,6 +238,9 @@ FastPathRouterQuery(Query *query, const char *query_string,
Var *distributionKey = PartitionColumn(distributedTableId, 1); Var *distributionKey = PartitionColumn(distributedTableId, 1);
if (!distributionKey) if (!distributionKey)
{ {
isDistributedTable = IsCitusTableTypeCacheEntry(cacheEntry,
SINGLE_SHARD_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
isFastPath = true; isFastPath = true;
} }
@ -294,8 +300,9 @@ returnFastPath:
if (EnableSingShardFastPathPOC) if (EnableSingShardFastPathPOC)
{ {
Assert(rangeTableEntry != NULL);
fastPathContext->distTableRte = rangeTableEntry; fastPathContext->distTableRte = rangeTableEntry;
fastPathContext->delayFastPathPlanning = isDistributedTable; /*&& !fastPathContext->distributionKeyHasParam; */ fastPathContext->delayFastPathPlanning = isDistributedTable;
/* If the dist key is parameterized the query will use the plan cache (todo: verify) */ /* If the dist key is parameterized the query will use the plan cache (todo: verify) */
fastPathContext->clientQueryString = query_string; fastPathContext->clientQueryString = query_string;

View File

@ -2039,7 +2039,7 @@ RouterJobFastPath(DistributedPlanningContext *planContext,
Query *originalQuery = planContext->originalQuery; Query *originalQuery = planContext->originalQuery;
bool isMultiShardQuery = false, shardsPresent = false, bool isMultiShardQuery = false, shardsPresent = false,
distTableWithShardKey = false, isLocalExecution = false; isLocalExecution = false;
Const *partitionKeyValue = NULL; Const *partitionKeyValue = NULL;
Const *distributionKeyValue = fastPathContext->distributionKeyValue; Const *distributionKeyValue = fastPathContext->distributionKeyValue;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
@ -2066,9 +2066,6 @@ RouterJobFastPath(DistributedPlanningContext *planContext,
Assert(cacheEntry->relationId == shard->relationId); Assert(cacheEntry->relationId == shard->relationId);
Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE)); Assert(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE));
distTableWithShardKey = HasDistributionKeyCacheEntry(cacheEntry);
Assert(distTableWithShardKey);
List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals, List *taskPlacementList = CreateTaskPlacementListForShardIntervals(shardIntervals,
true, false, true, false,
false); false);
@ -2127,12 +2124,19 @@ RouterJobFastPath(DistributedPlanningContext *planContext,
} }
else else
{ {
SetTaskQueryString(task, (char *) fastPathContext->clientQueryString);
/* Call fast path query planner, Save plan in planContext->plan */ /* Call fast path query planner, Save plan in planContext->plan */
planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query, planContext->query,
planContext->boundParams); planContext->boundParams);
if (EnableSingShardFastRemotePOC)
{
SetTaskQueryString(task, (char *) fastPathContext->clientQueryString);
}
else
{
UpdateRelationToShardNames((Node *) job->jobQuery, relationShards);
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
}
} }
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->jobId = job->jobId; task->jobId = job->jobId;
@ -2165,9 +2169,12 @@ ReplaceShardRelationId(Query *query, Oid citusTableOid, Oid shardId)
char *shardRelationName = pstrdup(citusTableName); char *shardRelationName = pstrdup(citusTableName);
AppendShardIdToName(&shardRelationName, shardId); AppendShardIdToName(&shardRelationName, shardId);
/* construct the schema name */
char *schemaName = get_namespace_name(get_rel_namespace(citusTableOid));
RangeVar shardRangeVar = { RangeVar shardRangeVar = {
.relname = shardRelationName, .relname = shardRelationName,
.schemaname = NULL, /* todo - should initialize this ? get_rel_namespace(shardRelationId), */ .schemaname = schemaName,
.inh = rte->inh, .inh = rte->inh,
.relpersistence = RELPERSISTENCE_PERMANENT, .relpersistence = RELPERSISTENCE_PERMANENT,
}; };

View File

@ -1366,6 +1366,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_single_shard_poc_remote",
gettext_noop("Enables execution shortcuts for single shard "
"queries in the proof of concept mode - remote case."),
NULL,
&EnableSingShardFastRemotePOC,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.enable_single_shard_poc", "citus.enable_single_shard_poc",
gettext_noop("Enables execution shortcuts for single shard " gettext_noop("Enables execution shortcuts for single shard "

View File

@ -29,6 +29,7 @@
extern bool EnableRouterExecution; extern bool EnableRouterExecution;
extern bool EnableFastPathRouterPlanner; extern bool EnableFastPathRouterPlanner;
extern bool EnableSingShardFastPathPOC; extern bool EnableSingShardFastPathPOC;
extern bool EnableSingShardFastRemotePOC;
extern bool EnableNonColocatedRouterQueryPushdown; extern bool EnableNonColocatedRouterQueryPushdown;