Check that attributes are the same in citus and shard tables.

Retain query in task for EXPLAIN ANALYZE and debug messages.
Add debug messages so that tests in `local_shard_execution`
reflect that the shortcut is applied (happy path) and tests in
and `local_shard_execution_dropped_columns` reflect that the
short cut cannot be applied.
Colm McHugh 2025-06-27 13:14:59 +00:00
parent 55a68b6ec5
commit b5be2c0961
17 changed files with 807 additions and 114 deletions

View File

@ -363,6 +363,9 @@ ExecuteLocalTaskListExtended(List *taskList,
}
else
{
ereport(DEBUG2, (errmsg(
"Local executor: Using task's cached local plan for task %u",
task->taskId)));
localPlan = TaskQueryLocalPlan(task);
}
}

View File

@ -440,11 +440,12 @@ SetTaskQueryStringList(Task *task, List *queryStringList)
void
SetTaskQueryPlan(Task *task, PlannedStmt *localPlan)
SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan)
{
Assert(localPlan != NULL);
task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN;
task->taskQuery.data.localPlan = localPlan;
task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query;
task->localPlan = localPlan;
task->queryCount = 1;
}
@ -453,7 +454,7 @@ PlannedStmt *
TaskQueryLocalPlan(Task *task)
{
Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN);
return task->taskQuery.data.localPlan;
return task->localPlan;
}
@ -515,8 +516,6 @@ TaskQueryStringAtIndex(Task *task, int index)
}
static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'";
/*
* TaskQueryString generates task query string text if missing.
*
@ -546,7 +545,14 @@ TaskQueryString(Task *task)
}
else if (taskQueryType == TASK_QUERY_LOCAL_PLAN)
{
return qry_unavailable_msg;
Query *query = task->taskQuery.data.jobQueryReferenceForLazyDeparsing;
Assert(query != NULL);
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
query));
UpdateRelationToShardNames((Node *) query, task->relationShardList);
MemoryContextSwitchTo(previousContext);
return AnnotateQuery(DeparseTaskQuery(task, query),
task->partitionKeyValue, task->colocationId);
}
Query *jobQueryReferenceForLazyDeparsing =

View File

@ -653,6 +653,10 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext)
{
FastPathRestrictionContext *fastPathContext =
planContext->plannerRestrictionContext->fastPathRestrictionContext;
Assert(fastPathContext != NULL);
Assert(fastPathContext->fastPathRouterQuery);
FastPathPreprocessParseTree(planContext->query);
if (!fastPathContext->delayFastPathPlanning)
{
@ -2421,9 +2425,16 @@ CreateAndPushPlannerRestrictionContext(
if (fastPathRestrictionContext != NULL)
{
/* copy the given fast path restriction context */
memcpy(plannerRestrictionContext->fastPathRestrictionContext,
fastPathRestrictionContext,
sizeof(FastPathRestrictionContext));
FastPathRestrictionContext *plannersFastPathCtx =
plannerRestrictionContext->fastPathRestrictionContext;
plannersFastPathCtx->fastPathRouterQuery =
fastPathRestrictionContext->fastPathRouterQuery;
plannersFastPathCtx->distributionKeyValue =
fastPathRestrictionContext->distributionKeyValue;
plannersFastPathCtx->distributionKeyHasParam =
fastPathRestrictionContext->distributionKeyHasParam;
plannersFastPathCtx->delayFastPathPlanning =
fastPathRestrictionContext->delayFastPathPlanning;
}
plannerRestrictionContext->memoryContext = CurrentMemoryContext;

View File

@ -63,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node,
Var *column,
Node **distributionKeyValue);
void
FastPathPreprocessParseTree(Query *parse)
{
/*
* Citus planner relies on some of the transformations on constant
* evaluation on the parse tree.
*/
parse->targetList =
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
parse->jointree->quals =
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
}
/*
* FastPathPlanner is intended to be used instead of standard_planner() for trivial
@ -75,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node,
PlannedStmt *
FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams)
{
/*
* Citus planner relies on some of the transformations on constant
* evaluation on the parse tree.
*/
parse->targetList =
(List *) eval_const_expressions(NULL, (Node *) parse->targetList);
parse->jointree->quals =
(Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals);
PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery);
return result;
@ -148,6 +152,51 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
}
static void
InitializeFastPathContext(FastPathRestrictionContext *fastPathContext,
Node *distributionKeyValue,
bool canAvoidDeparse,
Query *query)
{
Assert(fastPathContext != NULL);
Assert(!fastPathContext->fastPathRouterQuery);
Assert(!fastPathContext->delayFastPathPlanning);
/*
* We're looking at a fast path query, so we can fill the
* fastPathContext with relevant details.
*/
fastPathContext->fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
if (EnableFastPathLocalExecutor)
{
/*
* This fast path query may be executed by the local executor.
* We need to delay the fast path planning until we know if the
* shard is local or not. Make a final check for volatile
* functions in the query tree to determine if we should delay
* the fast path planning.
*/
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
!FindNodeMatchingCheckFunction(
(Node *) query,
CitusIsVolatileFunction);
}
}
/*
* FastPathRouterQuery gets a query and returns true if the query is eligible for
* being a fast path router query. It also fills the given fastPathContext with
@ -175,7 +224,6 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
bool isFastPath = false;
bool canAvoidDeparse = false;
Node *distributionKeyValue = NULL;
RangeTblEntry *rangeTableEntry = NULL;
if (!EnableFastPathRouterPlanner)
{
@ -207,8 +255,8 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
else if (query->commandType == CMD_INSERT)
{
/* we don't need to do any further checks, all INSERTs are fast-path */
isFastPath = true;
goto returnFastPath;
InitializeFastPathContext(fastPathContext, NULL, true, query);
return true;
}
/* make sure that the only range table in FROM clause */
@ -217,7 +265,7 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
return false;
}
rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable);
if (rangeTableEntry->rtekind != RTE_RELATION)
{
return false;
@ -281,52 +329,10 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext)
!ColumnAppearsMultipleTimes(quals, distributionKey));
}
returnFastPath:
if (isFastPath)
{
Assert(fastPathContext != NULL);
Assert(!fastPathContext->fastPathRouterQuery);
Assert(!fastPathContext->delayFastPathPlanning);
/*
* We're looking at a fast path query, so we can fill the
* fastPathContext with relevant details.
*/
fastPathContext->fastPathRouterQuery = true;
if (distributionKeyValue == NULL)
{
/* nothing to record */
}
else if (IsA(distributionKeyValue, Const))
{
fastPathContext->distributionKeyValue = (Const *) distributionKeyValue;
}
else if (IsA(distributionKeyValue, Param))
{
fastPathContext->distributionKeyHasParam = true;
}
/*
* Note the range table entry for the table we're querying.
*/
Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT);
fastPathContext->distTableRte = rangeTableEntry;
if (EnableFastPathLocalExecutor)
{
/*
* This fast path query may be executed by the local executor.
* We need to delay the fast path planning until we know if the
* shard is local or not. Make a final check for volatile
* functions in the query tree to determine if we should delay
* the fast path planning.
*/
fastPathContext->delayFastPathPlanning = canAvoidDeparse &&
!FindNodeMatchingCheckFunction(
(Node *) query,
CitusIsVolatileFunction);
}
InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse,
query);
}
return isFastPath;

View File

@ -16,6 +16,8 @@
#include "postgres.h"
#include "access/stratnum.h"
#include "access/tupdesc.h"
#include "access/tupdesc_details.h"
#include "access/xact.h"
#include "catalog/pg_opfamily.h"
#include "catalog/pg_proc.h"
@ -175,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList);
static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList);
static bool IsLocallyAccessibleCitusLocalTable(Oid relationId);
static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId);
/*
* CreateRouterPlan attempts to create a router executor plan for the given
@ -1952,6 +1954,75 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon
}
/*
* CheckAttributesMatch checks if the attributes of the Citus table and the shard
* table match.
*
* It is used to ensure that the shard table has the same schema as the Citus
* table before replacing the Citus table OID with the shard table OID in the
* parse tree we (Citus planner) recieved from Postgres.
*/
static
bool
CheckAttributesMatch(Oid citusTableId, Oid shardTableId)
{
Relation citusR, shardR;
bool same_schema = false;
citusR = RelationIdGetRelation(citusTableId);
shardR = RelationIdGetRelation(shardTableId);
if (RelationIsValid(citusR) && RelationIsValid(shardR))
{
TupleDesc citusTupDesc = citusR->rd_att;
TupleDesc shardTupDesc = shardR->rd_att;
if (citusTupDesc->natts == shardTupDesc->natts)
{
/*
* Do an attribute-by-attribute comparison. This is borrowed from
* the Postgres function equalTupleDescs(), which we cannot use
* because the citus table and shard table have different composite
* types.
*/
same_schema = true;
for (int i = 0; i < citusTupDesc->natts && same_schema; i++)
{
Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i);
Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i);
if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0)
{
same_schema = false;
}
if (attr1->atttypid != attr2->atttypid)
{
same_schema = false;
}
if (attr1->atttypmod != attr2->atttypmod)
{
same_schema = false;
}
if (attr1->attcollation != attr2->attcollation)
{
same_schema = false;
}
/* Record types derived from tables could have dropped fields. */
if (attr1->attisdropped != attr2->attisdropped)
{
same_schema = false;
}
}
}
}
RelationClose(citusR);
RelationClose(shardR);
return same_schema;
}
void
CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
DistributedPlan *plan)
@ -1969,7 +2040,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
if (job->deferredPruning)
{
/* Call fast path query planner, Save plan in planContext->plan */
/* Execution time pruning => don't know which shard at this point */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
@ -1983,35 +2054,44 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
Assert(list_length(placements) > 0);
int32 localGroupId = GetLocalGroupId();
ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements);
List *relationShards = task->relationShardList;
Assert(list_length(relationShards) == 1);
bool isLocalExecution = (primaryPlacement->groupId == localGroupId);
bool isLocalExecution = !IsDummyPlacement(primaryPlacement) &&
(primaryPlacement->groupId == localGroupId);
bool canBuildLocalPlan = true;
if (isLocalExecution)
{
ConvertToQueryOnShard(planContext->query,
fastPathContext->distTableRte->relid,
primaryPlacement->shardId);
List *relationShards = task->relationShardList;
Assert(list_length(relationShards) == 1);
RelationShard *relationShard = (RelationShard *) linitial(relationShards);
Assert(relationShard->shardId == primaryPlacement->shardId);
/* Plan the query with the new shard relation id */
/* Save plan in planContext->plan */
planContext->plan = standard_planner(planContext->query, NULL,
planContext->cursorOptions,
planContext->boundParams);
SetTaskQueryPlan(task, planContext->plan);
canBuildLocalPlan = ConvertToQueryOnShard(planContext->query,
relationShard->relationId,
relationShard->shardId);
if (canBuildLocalPlan)
{
/* Plan the query with the new shard relation id */
planContext->plan = standard_planner(planContext->query, NULL,
planContext->cursorOptions,
planContext->boundParams);
SetTaskQueryPlan(task, job->jobQuery, planContext->plan);
ereport(DEBUG2, (errmsg("Local plan for fast-path router "
"query")));
}
else
{
/* Call fast path query planner, Save plan in planContext->plan */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
UpdateRelationToShardNames((Node *) job->jobQuery, relationShards);
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
ereport(DEBUG2, (errmsg(
"Fast-path router query: created local execution plan "
"to avoid deparse to and compile of shard query")));
return;
}
}
Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan));
/* Fall back to fast path planner and generating SQL query on the shard */
planContext->plan = FastPathPlanner(planContext->originalQuery,
planContext->query,
planContext->boundParams);
UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList);
SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery);
}
@ -2029,7 +2109,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext,
* changes the RTEPermissionInfo's relid to the shard's relation id also.
* At this point the Query is ready for the postgres planner.
*/
static Query *
static bool
ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
{
Assert(list_length(query->rtable) == 1);
@ -2060,6 +2140,27 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
citusTableRte->rellockmode,
0, NULL, NULL); /* todo - use suitable callback for perms check? */
/* Verify that the attributes of citus table and shard table match */
if (!CheckAttributesMatch(citusTableOid, shardRelationId))
{
/* There is a difference between the attributes of the citus
* table and the shard table. This can happen if there is a DROP
* COLUMN on the citus table. In this case, we cannot
* convert the query to a shard query, so clean up and return.
*/
UnlockRelationOid(shardRelationId, citusTableRte->rellockmode);
ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg(
"Router planner fast path cannot modify parse tree for local execution: shard table \"%s.%s\" does not match the "
"distributed table \"%s.%s\"",
schemaName, shardRelationName, schemaName,
citusTableName)));
pfree(shardRelationName);
pfree(schemaName);
return false;
}
/* Change the target list entries that reference the original citus table's relation id */
ListCell *lc = NULL;
foreach(lc, query->targetList)
@ -2071,7 +2172,6 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
}
}
/* Change the range table entry's oid to that of the shard's */
Assert(shardRelationId != InvalidOid);
citusTableRte->relid = shardRelationId;
@ -2084,7 +2184,7 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId)
rtePermInfo->relid = shardRelationId;
#endif
return query;
return true;
}

View File

@ -212,7 +212,7 @@ static const char * MaxSharedPoolSizeGucShowHook(void);
static const char * LocalPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
static bool ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source);
static bool WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source);
static void CitusAuthHook(Port *port, int status);
static bool IsSuperuser(char *userName);
static void AdjustDynamicLibraryPathForCdcDecoders(void);
@ -1379,7 +1379,7 @@ RegisterCitusConfigVariables(void)
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_fast_path_local_execution",
"citus.enable_local_execution_local_plan",
gettext_noop("Enables the planner to avoid a query deparse and planning if "
"the shard is local to the current node."),
NULL,
@ -1387,7 +1387,7 @@ RegisterCitusConfigVariables(void)
true,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
ErrorIfLocalExectionDisabled, NULL, NULL);
WarnIfLocalExecutionDisabled, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_local_reference_table_foreign_keys",
@ -2815,16 +2815,14 @@ WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source)
static bool
ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source)
WarnIfLocalExecutionDisabled(bool *newval, void **extra, GucSource source)
{
if (*newval == true && EnableLocalExecution == false)
{
ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg(
"citus.enable_local_execution must be set in order for "
"citus.enable_fast_path_local_execution to be effective.")));
return false;
"citus.enable_local_execution_local_plan to be effective.")));
}
return true;

View File

@ -289,7 +289,8 @@ CopyTaskQuery(Task *newnode, Task *from)
case TASK_QUERY_LOCAL_PLAN:
{
COPY_NODE_FIELD(taskQuery.data.localPlan);
COPY_NODE_FIELD(localPlan);
COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing);
break;
}

View File

@ -27,7 +27,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern void SetTaskQueryStringList(Task *task, List *queryStringList);
extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan);
extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan);
extern char * TaskQueryString(Task *task);
extern PlannedStmt * TaskQueryLocalPlan(Task *task);
extern char * TaskQueryStringAtIndex(Task *task, int index);

View File

@ -101,15 +101,10 @@ typedef struct FastPathRestrictionContext
bool distributionKeyHasParam;
/*
* Indicates to hold off on callning the fast path planner until its
* known if the shard is local
* Indicates to hold off calling the fast path planner until its
* known if the shard is local or not.
*/
bool delayFastPathPlanning;
/*
* Range table entry for the table we're querying
*/
RangeTblEntry *distTableRte;
} FastPathRestrictionContext;
typedef struct PlannerRestrictionContext

View File

@ -220,8 +220,6 @@ typedef struct TaskQuery
* when we want to access each query string.
*/
List *queryStringList;
PlannedStmt *localPlan; /* only applies to local tasks */
}data;
}TaskQuery;
@ -337,6 +335,8 @@ typedef struct Task
Const *partitionKeyValue;
int colocationId;
PlannedStmt *localPlan; /* only applies to local tasks */
} Task;

View File

@ -100,6 +100,7 @@ extern void GenerateSingleShardRouterTaskList(Job *job,
* keep the external function here.
*/extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse);
extern void FastPathPreprocessParseTree(Query *parse);
extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo
boundParams);
extern bool FastPathRouterQuery(Query *query,

View File

@ -0,0 +1,190 @@
-- Test local execution with local plan in a sharded environment.
-- This is an enhancement to local execution where instead of deparsing
-- and compiling the shard query, the planner replaces the OID of the
-- distributed table with the OID of the local shard in the parse tree
-- and plans that.
--
-- https://github.com/citusdata/citus/pull/8035
CREATE SCHEMA local_shard_execution_local_plan;
SET search_path TO local_shard_execution_local_plan;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 86000000;
-- Test row-based sharding
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_tbl (a int, b int, data_f double precision);
SELECT create_distributed_table('test_tbl', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT setseed(0.42); -- make the random data inserted deterministic
setseed
---------------------------------------------------------------------
(1 row)
INSERT INTO test_tbl
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS data_f
FROM generate_series(1, 10000);
\c - - - :worker_1_port
SET search_path TO local_shard_execution_local_plan;
SET client_min_messages TO DEBUG2;
-- This query resolves to a single shard (aka fast path)
-- which is located on worker_1; with client_min_messages
-- at DEBUG2 we see a message that the planner is avoiding
-- query deparse and plan
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 5
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
SET citus.enable_local_execution_local_plan TO OFF;
-- With local execution local plan disabled, the same query
-- does query deparse and planning of the shard query and
-- provides the same results
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 5
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
\c - - - :worker_2_port
SET search_path TO local_shard_execution_local_plan;
SET client_min_messages TO DEBUG2;
-- Run the same query on the other worker - the local
-- execution path is not taken because the shard is not
-- local to this worker
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: query has a single distribution column value: 5
DEBUG: Local executor: Using task's cached local plan for task 0
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
\c - - - :master_port
DROP SCHEMA local_shard_execution_local_plan CASCADE;
NOTICE: drop cascades to table local_shard_execution_local_plan.test_tbl
-- Now test local execution with local plan for a schema sharded table.
SET citus.enable_schema_based_sharding to on;
CREATE SCHEMA schema_sharding_test;
SET search_path TO schema_sharding_test;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_tbl (a int, b int, data_f double precision);
SELECT setseed(0.42); -- make the random data inserted deterministic
setseed
---------------------------------------------------------------------
(1 row)
INSERT INTO test_tbl
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS data_f
FROM generate_series(1, 10000);
\c - - - :worker_1_port
SET client_min_messages TO DEBUG2;
-- Run the test query on worker_1; with schema based sharding
-- the data is not local to this worker so local execution
-- path is not taken.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: Local executor: Using task's cached local plan for task 0
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
\c - - - :worker_2_port
SET client_min_messages TO DEBUG2;
-- Run the test query on worker_2; with schema based sharding
-- the data is local to this worker so local execution
-- path is taken, and the planner avoids query deparse and
-- planning of the shard query.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
SET citus.enable_local_execution_local_plan TO OFF;
-- Run the test query on worker_2 but with local execution
-- local plan disabled; now the planner does query deparse
-- and planning of the shard query.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
b | avg | min | max | count
---------------------------------------------------------------------
1 | 4681.15766751073 | 940.198933762051 | 9831.90612899621 | 32
3 | 5206.08657105902 | 112.310356057681 | 9754.46118693233 | 30
5 | 5986.24362651021 | 1076.80441867932 | 9429.56328271469 | 21
8 | 4312.90603684942 | 176.005977242861 | 9606.9697868185 | 19
13 | 4793.76060079111 | 155.283954878487 | 9665.6748114535 | 26
(5 rows)
\c - - - :master_port
DROP SCHEMA schema_sharding_test CASCADE;
NOTICE: drop cascades to table schema_sharding_test.test_tbl
RESET ALL;

View File

@ -2410,8 +2410,10 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: query has a single distribution column value: 16
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
DEBUG: Local executor: Using task's cached local plan for task 0
count
---------------------------------------------------------------------
1
@ -2420,8 +2422,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
SELECT count(*) FROM event_responses WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: query has a single distribution column value: 16
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16)
DEBUG: Local executor: Using task's cached local plan for task 0
count
---------------------------------------------------------------------
1
@ -2430,8 +2434,10 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
UPDATE event_responses SET response = 'no' WHERE event_id = 16;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: query has a single distribution column value: 16
NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16)
DEBUG: Local executor: Using task's cached local plan for task 0
INSERT INTO event_responses VALUES (16, 666, 'maybe')
ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *;
@ -2529,8 +2535,10 @@ SET citus.log_remote_commands TO ON;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Fast-path router query: created local execution plan to avoid deparse to and compile of shard query
DEBUG: query has a single distribution column value: 2
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
DEBUG: Local executor: Using task's cached local plan for task 0
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)

View File

@ -193,8 +193,102 @@ execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
execute p4(8);
NOTICE: executing the command locally: UPDATE local_shard_execution_dropped_column.t1_2460000 t1 SET a = (a OPERATOR(pg_catalog.+) 1) WHERE (c OPERATOR(pg_catalog.=) 8)
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
-- does not take the fast path of modifying the parse tree with the shard OID, as
-- the dropped column means the attribute check between the distributed table and
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
-- for local execution", indicating that router planning has detected the difference.
--
-- (*) https://github.com/citusdata/citus/pull/8035
SET client_min_messages to DEBUG2;
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
execute p5(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (a OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
\c - - - :master_port
-- one another combination is that the shell table
-- one other combination is that the shell table
-- has a dropped column but not the shard, via rebalance operation
SET search_path TO local_shard_execution_dropped_column;
ALTER TABLE t1 DROP COLUMN a;
@ -204,6 +298,12 @@ SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localho
(1 row)
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO local_shard_execution_dropped_column;
-- show the dropped columns
@ -331,6 +431,108 @@ execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
execute p3(8);
NOTICE: executing the command locally: INSERT INTO local_shard_execution_dropped_column.t1_2460000 AS citus_table_alias (c) VALUES (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8), (8) ON CONFLICT DO NOTHING
-- Test that "Avoid deparse and planning of shard query for local execution"
-- does not take the fast path of modifying the parse tree with the shard OID
-- for this scenario (rebalance) also.
--
-- (*) https://github.com/citusdata/citus/pull/8035
SET client_min_messages to DEBUG2;
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: Router planner fast path cannot modify parse tree for local execution: shard table "local_shard_execution_dropped_column.t1_2460000" does not match the distributed table "local_shard_execution_dropped_column.t1"
DEBUG: query has a single distribution column value: 8
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
execute p4(5);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution_dropped_column.t1_2460000 t1 WHERE ((c OPERATOR(pg_catalog.=) 8) AND (5 OPERATOR(pg_catalog.=) $1)) GROUP BY c
count
---------------------------------------------------------------------
1
(1 row)
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA local_shard_execution_dropped_column CASCADE;
NOTICE: drop cascades to table local_shard_execution_dropped_column.t1

View File

@ -70,6 +70,7 @@ test: metadata_sync_helpers
test: issue_6592
test: executor_local_failure
test: local_execution_local_plan
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak

View File

@ -0,0 +1,128 @@
-- Test local execution with local plan in a sharded environment.
-- This is an enhancement to local execution where instead of deparsing
-- and compiling the shard query, the planner replaces the OID of the
-- distributed table with the OID of the local shard in the parse tree
-- and plans that.
--
-- https://github.com/citusdata/citus/pull/8035
CREATE SCHEMA local_shard_execution_local_plan;
SET search_path TO local_shard_execution_local_plan;
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 86000000;
-- Test row-based sharding
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_tbl (a int, b int, data_f double precision);
SELECT create_distributed_table('test_tbl', 'a');
SELECT setseed(0.42); -- make the random data inserted deterministic
INSERT INTO test_tbl
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS data_f
FROM generate_series(1, 10000);
\c - - - :worker_1_port
SET search_path TO local_shard_execution_local_plan;
SET client_min_messages TO DEBUG2;
-- This query resolves to a single shard (aka fast path)
-- which is located on worker_1; with client_min_messages
-- at DEBUG2 we see a message that the planner is avoiding
-- query deparse and plan
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
SET citus.enable_local_execution_local_plan TO OFF;
-- With local execution local plan disabled, the same query
-- does query deparse and planning of the shard query and
-- provides the same results
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
\c - - - :worker_2_port
SET search_path TO local_shard_execution_local_plan;
SET client_min_messages TO DEBUG2;
-- Run the same query on the other worker - the local
-- execution path is not taken because the shard is not
-- local to this worker
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
\c - - - :master_port
DROP SCHEMA local_shard_execution_local_plan CASCADE;
-- Now test local execution with local plan for a schema sharded table.
SET citus.enable_schema_based_sharding to on;
CREATE SCHEMA schema_sharding_test;
SET search_path TO schema_sharding_test;
SET citus.shard_replication_factor TO 1;
CREATE TABLE test_tbl (a int, b int, data_f double precision);
SELECT setseed(0.42); -- make the random data inserted deterministic
INSERT INTO test_tbl
SELECT (random()*20)::int AS a,
(random()*20)::int AS b,
random()*10000.0 AS data_f
FROM generate_series(1, 10000);
\c - - - :worker_1_port
SET client_min_messages TO DEBUG2;
-- Run the test query on worker_1; with schema based sharding
-- the data is not local to this worker so local execution
-- path is not taken.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
\c - - - :worker_2_port
SET client_min_messages TO DEBUG2;
-- Run the test query on worker_2; with schema based sharding
-- the data is local to this worker so local execution
-- path is taken, and the planner avoids query deparse and
-- planning of the shard query.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
SET citus.enable_local_execution_local_plan TO OFF;
-- Run the test query on worker_2 but with local execution
-- local plan disabled; now the planner does query deparse
-- and planning of the shard query.
SELECT b, AVG(data_f), MIN(data_f), MAX(data_f), COUNT(1)
FROM schema_sharding_test.test_tbl
WHERE a = 5 AND b IN (1,3,5,8,13,21)
GROUP BY b
ORDER BY b;
\c - - - :master_port
DROP SCHEMA schema_sharding_test CASCADE;
RESET ALL;

View File

@ -78,14 +78,37 @@ execute p4(8);
execute p4(8);
execute p4(8);
-- Test that "Avoid deparse and planning of shard query for local execution" (*)
-- does not take the fast path of modifying the parse tree with the shard OID, as
-- the dropped column means the attribute check between the distributed table and
-- shard fails. With client_min_messages at DEBUG2, we see "cannot modify parse tree
-- for local execution", indicating that router planning has detected the difference.
--
-- (*) https://github.com/citusdata/citus/pull/8035
SET client_min_messages to DEBUG2;
prepare p5(int) as SELECT count(*) FROM t1 WHERE c = 8 and a = $1 GROUP BY c;
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
execute p5(5);
RESET client_min_messages;
\c - - - :master_port
-- one another combination is that the shell table
-- one other combination is that the shell table
-- has a dropped column but not the shard, via rebalance operation
SET search_path TO local_shard_execution_dropped_column;
ALTER TABLE t1 DROP COLUMN a;
SELECT citus_move_shard_placement(2460000, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'block_writes');
SELECT public.wait_for_resource_cleanup(); -- otherwise fails flakiness tests
\c - - - :worker_2_port
SET search_path TO local_shard_execution_dropped_column;
@ -132,5 +155,25 @@ execute p3(8);
execute p3(8);
execute p3(8);
-- Test that "Avoid deparse and planning of shard query for local execution"
-- does not take the fast path of modifying the parse tree with the shard OID
-- for this scenario (rebalance) also.
--
-- (*) https://github.com/citusdata/citus/pull/8035
SET client_min_messages to DEBUG2;
prepare p4(int) as SELECT count(*) FROM t1 WHERE c = 8 and 5 = $1 GROUP BY c;
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
execute p4(5);
RESET client_min_messages;
\c - - - :master_port
DROP SCHEMA local_shard_execution_dropped_column CASCADE;