From 60e77d48c8530f2b8278b0eabcdd85d6f5a8eaed Mon Sep 17 00:00:00 2001 From: Colm McHugh Date: Fri, 27 Jun 2025 13:14:59 +0000 Subject: [PATCH] Check that attributes are the same in citus and shard tables. Retain query in task for EXPLAIN ANALYZE and debug messages. --- .../distributed/planner/deparse_shard_query.c | 18 ++- .../distributed/planner/distributed_planner.c | 17 +- .../planner/fast_path_router_planner.c | 120 +++++++------- .../planner/multi_router_planner.c | 151 ++++++++++++++---- src/backend/distributed/shared_library_init.c | 4 +- .../distributed/utils/citus_copyfuncs.c | 3 +- src/include/distributed/deparse_shard_query.h | 2 +- src/include/distributed/distributed_planner.h | 9 +- .../distributed/multi_physical_planner.h | 4 +- .../distributed/multi_router_planner.h | 1 + .../expected/local_shard_execution.out | 4 + 11 files changed, 226 insertions(+), 107 deletions(-) diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index c889e00ad..89f29596d 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -440,11 +440,12 @@ SetTaskQueryStringList(Task *task, List *queryStringList) void -SetTaskQueryPlan(Task *task, PlannedStmt *localPlan) +SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan) { Assert(localPlan != NULL); task->taskQuery.queryType = TASK_QUERY_LOCAL_PLAN; - task->taskQuery.data.localPlan = localPlan; + task->taskQuery.data.jobQueryReferenceForLazyDeparsing = query; + task->localPlan = localPlan; task->queryCount = 1; } @@ -453,7 +454,7 @@ PlannedStmt * TaskQueryLocalPlan(Task *task) { Assert(task->taskQuery.queryType == TASK_QUERY_LOCAL_PLAN); - return task->taskQuery.data.localPlan; + return task->localPlan; } @@ -515,8 +516,6 @@ TaskQueryStringAtIndex(Task *task, int index) } -static char *qry_unavailable_msg = "SELECT 'Task query unavailable - optimized away'"; - /* * TaskQueryString generates task query string text if missing. * @@ -546,7 +545,14 @@ TaskQueryString(Task *task) } else if (taskQueryType == TASK_QUERY_LOCAL_PLAN) { - return qry_unavailable_msg; + Query *query = task->taskQuery.data.jobQueryReferenceForLazyDeparsing; + Assert(query != NULL); + MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( + query)); + UpdateRelationToShardNames((Node *) query, task->relationShardList); + MemoryContextSwitchTo(previousContext); + return AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionKeyValue, task->colocationId); } Query *jobQueryReferenceForLazyDeparsing = diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index bcc09eede..e22296ec7 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -653,6 +653,10 @@ PlanFastPathDistributedStmt(DistributedPlanningContext *planContext) { FastPathRestrictionContext *fastPathContext = planContext->plannerRestrictionContext->fastPathRestrictionContext; + Assert(fastPathContext != NULL); + Assert(fastPathContext->fastPathRouterQuery); + + FastPathPreprocessParseTree(planContext->query); if (!fastPathContext->delayFastPathPlanning) { @@ -2421,9 +2425,16 @@ CreateAndPushPlannerRestrictionContext( if (fastPathRestrictionContext != NULL) { /* copy the given fast path restriction context */ - memcpy(plannerRestrictionContext->fastPathRestrictionContext, - fastPathRestrictionContext, - sizeof(FastPathRestrictionContext)); + FastPathRestrictionContext *plannersFastPathCtx = + plannerRestrictionContext->fastPathRestrictionContext; + plannersFastPathCtx->fastPathRouterQuery = + fastPathRestrictionContext->fastPathRouterQuery; + plannersFastPathCtx->distributionKeyValue = + fastPathRestrictionContext->distributionKeyValue; + plannersFastPathCtx->distributionKeyHasParam = + fastPathRestrictionContext->distributionKeyHasParam; + plannersFastPathCtx->delayFastPathPlanning = + fastPathRestrictionContext->delayFastPathPlanning; } plannerRestrictionContext->memoryContext = CurrentMemoryContext; diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index c0ec05e89..f887e7b24 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -63,6 +63,19 @@ static bool ConjunctionContainsColumnFilter(Node *node, Var *column, Node **distributionKeyValue); +void +FastPathPreprocessParseTree(Query *parse) +{ + /* + * Citus planner relies on some of the transformations on constant + * evaluation on the parse tree. + */ + parse->targetList = + (List *) eval_const_expressions(NULL, (Node *) parse->targetList); + parse->jointree->quals = + (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); +} + /* * FastPathPlanner is intended to be used instead of standard_planner() for trivial @@ -75,15 +88,6 @@ static bool ConjunctionContainsColumnFilter(Node *node, PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams) { - /* - * Citus planner relies on some of the transformations on constant - * evaluation on the parse tree. - */ - parse->targetList = - (List *) eval_const_expressions(NULL, (Node *) parse->targetList); - parse->jointree->quals = - (Node *) eval_const_expressions(NULL, (Node *) parse->jointree->quals); - PlannedStmt *result = GeneratePlaceHolderPlannedStmt(originalQuery); return result; @@ -148,6 +152,51 @@ GeneratePlaceHolderPlannedStmt(Query *parse) } +static void +InitializeFastPathContext(FastPathRestrictionContext *fastPathContext, + Node *distributionKeyValue, + bool canAvoidDeparse, + Query *query) +{ + Assert(fastPathContext != NULL); + Assert(!fastPathContext->fastPathRouterQuery); + Assert(!fastPathContext->delayFastPathPlanning); + + /* + * We're looking at a fast path query, so we can fill the + * fastPathContext with relevant details. + */ + fastPathContext->fastPathRouterQuery = true; + if (distributionKeyValue == NULL) + { + /* nothing to record */ + } + else if (IsA(distributionKeyValue, Const)) + { + fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; + } + else if (IsA(distributionKeyValue, Param)) + { + fastPathContext->distributionKeyHasParam = true; + } + + if (EnableFastPathLocalExecutor) + { + /* + * This fast path query may be executed by the local executor. + * We need to delay the fast path planning until we know if the + * shard is local or not. Make a final check for volatile + * functions in the query tree to determine if we should delay + * the fast path planning. + */ + fastPathContext->delayFastPathPlanning = canAvoidDeparse && + !FindNodeMatchingCheckFunction( + (Node *) query, + CitusIsVolatileFunction); + } +} + + /* * FastPathRouterQuery gets a query and returns true if the query is eligible for * being a fast path router query. It also fills the given fastPathContext with @@ -175,7 +224,6 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) bool isFastPath = false; bool canAvoidDeparse = false; Node *distributionKeyValue = NULL; - RangeTblEntry *rangeTableEntry = NULL; if (!EnableFastPathRouterPlanner) { @@ -207,8 +255,8 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) else if (query->commandType == CMD_INSERT) { /* we don't need to do any further checks, all INSERTs are fast-path */ - isFastPath = true; - goto returnFastPath; + InitializeFastPathContext(fastPathContext, NULL, true, query); + return true; } /* make sure that the only range table in FROM clause */ @@ -217,7 +265,7 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) return false; } - rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); + RangeTblEntry *rangeTableEntry = (RangeTblEntry *) linitial(query->rtable); if (rangeTableEntry->rtekind != RTE_RELATION) { return false; @@ -281,52 +329,10 @@ FastPathRouterQuery(Query *query, FastPathRestrictionContext *fastPathContext) !ColumnAppearsMultipleTimes(quals, distributionKey)); } -returnFastPath: - if (isFastPath) { - Assert(fastPathContext != NULL); - Assert(!fastPathContext->fastPathRouterQuery); - Assert(!fastPathContext->delayFastPathPlanning); - - /* - * We're looking at a fast path query, so we can fill the - * fastPathContext with relevant details. - */ - fastPathContext->fastPathRouterQuery = true; - if (distributionKeyValue == NULL) - { - /* nothing to record */ - } - else if (IsA(distributionKeyValue, Const)) - { - fastPathContext->distributionKeyValue = (Const *) distributionKeyValue; - } - else if (IsA(distributionKeyValue, Param)) - { - fastPathContext->distributionKeyHasParam = true; - } - - /* - * Note the range table entry for the table we're querying. - */ - Assert(rangeTableEntry != NULL || query->commandType == CMD_INSERT); - fastPathContext->distTableRte = rangeTableEntry; - - if (EnableFastPathLocalExecutor) - { - /* - * This fast path query may be executed by the local executor. - * We need to delay the fast path planning until we know if the - * shard is local or not. Make a final check for volatile - * functions in the query tree to determine if we should delay - * the fast path planning. - */ - fastPathContext->delayFastPathPlanning = canAvoidDeparse && - !FindNodeMatchingCheckFunction( - (Node *) query, - CitusIsVolatileFunction); - } + InitializeFastPathContext(fastPathContext, distributionKeyValue, canAvoidDeparse, + query); } return isFastPath; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 03c339566..d665e800d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -16,6 +16,8 @@ #include "postgres.h" #include "access/stratnum.h" +#include "access/tupdesc.h" +#include "access/tupdesc_details.h" #include "access/xact.h" #include "catalog/pg_opfamily.h" #include "catalog/pg_proc.h" @@ -175,7 +177,7 @@ static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, static bool ModifiesLocalTableWithRemoteCitusLocalTable(List *rangeTableList); static DeferredErrorMessage * DeferErrorIfUnsupportedLocalTableJoin(List *rangeTableList); static bool IsLocallyAccessibleCitusLocalTable(Oid relationId); -static Query * ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); +static bool ConvertToQueryOnShard(Query *query, Oid relationID, Oid shardRelationId); /* * CreateRouterPlan attempts to create a router executor plan for the given @@ -1952,6 +1954,75 @@ RouterJob(Query *originalQuery, PlannerRestrictionContext *plannerRestrictionCon } +/* + * CheckAttributesMatch checks if the attributes of the Citus table and the shard + * table match. + * + * It is used to ensure that the shard table has the same schema as the Citus + * table before replacing the Citus table OID with the shard table OID in the + * parse tree we (Citus planner) recieved from Postgres. + */ +static +bool +CheckAttributesMatch(Oid citusTableId, Oid shardTableId) +{ + Relation citusR, shardR; + bool same_schema = false; + + citusR = RelationIdGetRelation(citusTableId); + shardR = RelationIdGetRelation(shardTableId); + + if (RelationIsValid(citusR) && RelationIsValid(shardR)) + { + TupleDesc citusTupDesc = citusR->rd_att; + TupleDesc shardTupDesc = shardR->rd_att; + + if (citusTupDesc->natts == shardTupDesc->natts) + { + /* + * Do an attribute-by-attribute comparison. This is borrowed from + * the Postgres function equalTupleDescs(), which we cannot use + * because the citus table and shard table have different composite + * types. + */ + same_schema = true; + for (int i = 0; i < citusTupDesc->natts && same_schema; i++) + { + Form_pg_attribute attr1 = TupleDescAttr(citusTupDesc, i); + Form_pg_attribute attr2 = TupleDescAttr(shardTupDesc, i); + + if (strcmp(NameStr(attr1->attname), NameStr(attr2->attname)) != 0) + { + same_schema = false; + } + if (attr1->atttypid != attr2->atttypid) + { + same_schema = false; + } + if (attr1->atttypmod != attr2->atttypmod) + { + same_schema = false; + } + if (attr1->attcollation != attr2->attcollation) + { + same_schema = false; + } + + /* Record types derived from tables could have dropped fields. */ + if (attr1->attisdropped != attr2->attisdropped) + { + same_schema = false; + } + } + } + } + + RelationClose(citusR); + RelationClose(shardR); + return same_schema; +} + + void CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, DistributedPlan *plan) @@ -1969,7 +2040,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, if (job->deferredPruning) { - /* Call fast path query planner, Save plan in planContext->plan */ + /* Execution time pruning => don't know which shard at this point */ planContext->plan = FastPathPlanner(planContext->originalQuery, planContext->query, planContext->boundParams); @@ -1983,35 +2054,43 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, Assert(list_length(placements) > 0); int32 localGroupId = GetLocalGroupId(); ShardPlacement *primaryPlacement = (ShardPlacement *) linitial(placements); - List *relationShards = task->relationShardList; - Assert(list_length(relationShards) == 1); - bool isLocalExecution = (primaryPlacement->groupId == localGroupId); + + bool isLocalExecution = !IsDummyPlacement(primaryPlacement) && + (primaryPlacement->groupId == localGroupId); + bool canBuildLocalPlan = true; if (isLocalExecution) { - ConvertToQueryOnShard(planContext->query, - fastPathContext->distTableRte->relid, - primaryPlacement->shardId); + List *relationShards = task->relationShardList; + Assert(list_length(relationShards) == 1); + RelationShard *relationShard = (RelationShard *) linitial(relationShards); + Assert(relationShard->shardId == primaryPlacement->shardId); - /* Plan the query with the new shard relation id */ - /* Save plan in planContext->plan */ - planContext->plan = standard_planner(planContext->query, NULL, - planContext->cursorOptions, - planContext->boundParams); - SetTaskQueryPlan(task, planContext->plan); + canBuildLocalPlan = ConvertToQueryOnShard(planContext->query, + relationShard->relationId, + relationShard->shardId); + if (canBuildLocalPlan) + { + /* Plan the query with the new shard relation id */ + planContext->plan = standard_planner(planContext->query, NULL, + planContext->cursorOptions, + planContext->boundParams); + SetTaskQueryPlan(task, job->jobQuery, planContext->plan); - ereport(DEBUG2, (errmsg("Local plan for fast-path router " - "query"))); - } - else - { - /* Call fast path query planner, Save plan in planContext->plan */ - planContext->plan = FastPathPlanner(planContext->originalQuery, - planContext->query, - planContext->boundParams); - UpdateRelationToShardNames((Node *) job->jobQuery, relationShards); - SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery); + ereport(DEBUG2, (errmsg("Local plan for fast-path router " + "query"))); + return; + } } + + Assert(!isLocalExecution || (isLocalExecution && !canBuildLocalPlan)); + + /* Fall back to fast path planner and generating SQL query on the shard */ + planContext->plan = FastPathPlanner(planContext->originalQuery, + planContext->query, + planContext->boundParams); + UpdateRelationToShardNames((Node *) job->jobQuery, task->relationShardList); + SetTaskQueryIfShouldLazyDeparse(task, job->jobQuery); } @@ -2029,7 +2108,7 @@ CheckAndBuildDelayedFastPathPlan(DistributedPlanningContext *planContext, * changes the RTEPermissionInfo's relid to the shard's relation id also. * At this point the Query is ready for the postgres planner. */ -static Query * +static bool ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) { Assert(list_length(query->rtable) == 1); @@ -2060,6 +2139,23 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) citusTableRte->rellockmode, 0, NULL, NULL); /* todo - use suitable callback for perms check? */ + /* Verify that the attributes of citus table and shard table match */ + if (!CheckAttributesMatch(citusTableOid, shardRelationId)) + { + /* There is a difference between the attributes of the citus + * table and the shard table. This can happen if there is a DROP + * COLUMN on the citus table. In this case, we cannot + * convert the query to a shard query, so clean up and return. + */ + UnlockRelationOid(shardRelationId, citusTableRte->rellockmode); + pfree(shardRelationName); + ereport(DEBUG2, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("shard table \"%s\" does not match the " + "distributed table \"%s\"", + shardRelationName, citusTableName))); + return false; + } + /* Change the target list entries that reference the original citus table's relation id */ ListCell *lc = NULL; foreach(lc, query->targetList) @@ -2071,7 +2167,6 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) } } - /* Change the range table entry's oid to that of the shard's */ Assert(shardRelationId != InvalidOid); citusTableRte->relid = shardRelationId; @@ -2084,7 +2179,7 @@ ConvertToQueryOnShard(Query *query, Oid citusTableOid, Oid shardId) rtePermInfo->relid = shardRelationId; #endif - return query; + return true; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 53d6f90e7..3a9e2d901 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1379,7 +1379,7 @@ RegisterCitusConfigVariables(void) NULL, NULL, NULL); DefineCustomBoolVariable( - "citus.enable_fast_path_local_execution", + "citus.enable_local_execution_fast_path", gettext_noop("Enables the planner to avoid a query deparse and planning if " "the shard is local to the current node."), NULL, @@ -2822,7 +2822,7 @@ ErrorIfLocalExectionDisabled(bool *newval, void **extra, GucSource source) ereport(WARNING, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg( "citus.enable_local_execution must be set in order for " - "citus.enable_fast_path_local_execution to be effective."))); + "citus.enable_local_execution_fast_path to be effective."))); return false; } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index e834c1470..1e47609a0 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -289,7 +289,8 @@ CopyTaskQuery(Task *newnode, Task *from) case TASK_QUERY_LOCAL_PLAN: { - COPY_NODE_FIELD(taskQuery.data.localPlan); + COPY_NODE_FIELD(localPlan); + COPY_NODE_FIELD(taskQuery.data.jobQueryReferenceForLazyDeparsing); break; } diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index 7ac49f920..efcdb3032 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -27,7 +27,7 @@ extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); extern void SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query); extern void SetTaskQueryString(Task *task, char *queryString); extern void SetTaskQueryStringList(Task *task, List *queryStringList); -extern void SetTaskQueryPlan(Task *task, PlannedStmt *localPlan); +extern void SetTaskQueryPlan(Task *task, Query *query, PlannedStmt *localPlan); extern char * TaskQueryString(Task *task); extern PlannedStmt * TaskQueryLocalPlan(Task *task); extern char * TaskQueryStringAtIndex(Task *task, int index); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index 9c7cb0b90..f416aa911 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -101,15 +101,10 @@ typedef struct FastPathRestrictionContext bool distributionKeyHasParam; /* - * Indicates to hold off on callning the fast path planner until its - * known if the shard is local + * Indicates to hold off calling the fast path planner until its + * known if the shard is local or not. */ bool delayFastPathPlanning; - - /* - * Range table entry for the table we're querying - */ - RangeTblEntry *distTableRte; } FastPathRestrictionContext; typedef struct PlannerRestrictionContext diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index c73b78227..bb68f1636 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -220,8 +220,6 @@ typedef struct TaskQuery * when we want to access each query string. */ List *queryStringList; - - PlannedStmt *localPlan; /* only applies to local tasks */ }data; }TaskQuery; @@ -337,6 +335,8 @@ typedef struct Task Const *partitionKeyValue; int colocationId; + + PlannedStmt *localPlan; /* only applies to local tasks */ } Task; diff --git a/src/include/distributed/multi_router_planner.h b/src/include/distributed/multi_router_planner.h index 97a961685..6f5a22f6f 100644 --- a/src/include/distributed/multi_router_planner.h +++ b/src/include/distributed/multi_router_planner.h @@ -100,6 +100,7 @@ extern void GenerateSingleShardRouterTaskList(Job *job, * keep the external function here. */extern PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse); +extern void FastPathPreprocessParseTree(Query *parse); extern PlannedStmt * FastPathPlanner(Query *originalQuery, Query *parse, ParamListInfo boundParams); extern bool FastPathRouterQuery(Query *query, diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 2b1fa3c0b..4a432d4f2 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -2410,6 +2410,7 @@ NOTICE: executing the command locally: UPDATE local_shard_execution.event_respo SELECT count(*) FROM event_responses WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Local plan for fast-path router query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) count @@ -2420,6 +2421,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar SELECT count(*) FROM event_responses WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Local plan for fast-path router query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.event_responses_1480001 event_responses WHERE (event_id OPERATOR(pg_catalog.=) 16) count @@ -2430,6 +2432,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar UPDATE event_responses SET response = 'no' WHERE event_id = 16; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Local plan for fast-path router query DEBUG: query has a single distribution column value: 16 NOTICE: executing the command locally: UPDATE local_shard_execution.event_responses_1480001 event_responses SET response = 'no'::local_shard_execution.invite_resp WHERE (event_id OPERATOR(pg_catalog.=) 16) INSERT INTO event_responses VALUES (16, 666, 'maybe') @@ -2529,6 +2532,7 @@ SET citus.log_remote_commands TO ON; SELECT * FROM event_responses_no_pkey WHERE event_id = 2; DEBUG: Distributed planning for a fast-path router query DEBUG: Creating router plan +DEBUG: Local plan for fast-path router query DEBUG: query has a single distribution column value: 2 NOTICE: executing the command locally: SELECT event_id, user_id, response FROM local_shard_execution.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2) event_id | user_id | response