diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 09d560f4a..d9d25c4c0 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -20,6 +20,7 @@ #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -168,7 +169,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->jobId = INVALID_JOB_ID; task->taskId = 0; task->taskType = DDL_TASK; - task->queryString = callCommand->data; + SetTaskQueryString(task, callCommand->data); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NIL; task->anchorShardId = placement->shardId; diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index f1e08c78b..b5ff77d46 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -23,6 +23,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_planner.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" @@ -528,7 +529,7 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt) task->jobId = jobId; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = pstrdup(ddlString.data); + SetTaskQueryString(task, pstrdup(ddlString.data)); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; @@ -573,7 +574,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->jobId = jobId; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = pstrdup(ddlString.data); + SetTaskQueryString(task, pstrdup(ddlString.data)); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; @@ -903,7 +904,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->jobId = jobId; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = pstrdup(ddlString.data); + SetTaskQueryString(task, pstrdup(ddlString.data)); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index c23a1dc21..174429bff 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -23,6 +23,7 @@ #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparser.h" +#include "distributed/deparse_shard_query.h" #include "distributed/master_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" @@ -1379,7 +1380,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, task->jobId = jobId; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = applyCommand->data; + SetTaskQueryString(task, applyCommand->data); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = leftShardId; diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index dcdc343af..7d3bde2b7 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -42,6 +42,7 @@ #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */ #include "distributed/deparser.h" +#include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/maintenanced.h" @@ -49,6 +50,7 @@ #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" +#include "distributed/multi_explain.h" #include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/version_compat.h" @@ -864,7 +866,7 @@ DDLTaskList(Oid relationId, const char *commandString) task->jobId = jobId; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = applyCommand->data; + SetTaskQueryString(task, applyCommand->data); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NULL; task->anchorShardId = shardId; @@ -899,7 +901,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) Task *task = CitusMakeNode(Task); task->taskType = DDL_TASK; - task->queryString = concatenatedCommands; + SetTaskQueryString(task, concatenatedCommands); foreach(workerNodeCell, workerNodes) { diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 4825d48df..e426b2c8c 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -16,6 +16,7 @@ #include "commands/vacuum.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" +#include "distributed/deparse_shard_query.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/resource_lock.h" @@ -225,7 +226,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->jobId = jobId; task->taskId = taskId++; task->taskType = VACUUM_ANALYZE_TASK; - task->queryString = pstrdup(vacuumString->data); + SetTaskQueryString(task, pstrdup(vacuumString->data)); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 152c5523c..ca6bf1931 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -133,6 +133,7 @@ #include "commands/dbcommands.h" #include "distributed/citus_custom_scan.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/local_executor.h" #include "distributed/multi_client_executor.h" @@ -905,7 +906,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, * a distributed plan. */ static DistributedExecution * -CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, +CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, + bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, TransactionProperties *xactProperties) @@ -3130,9 +3132,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, int querySent = 0; char *queryString = NULL; - if (task->queryString != NULL) + if (list_length(task->perPlacementQueryStrings) == 0) { - queryString = task->queryString; + queryString = TaskQueryString(task); } else { diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index 095995856..8ed62ff99 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -17,6 +17,7 @@ #include "access/tupdesc.h" #include "catalog/pg_type.h" +#include "distributed/deparse_shard_query.h" #include "distributed/intermediate_results.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" @@ -221,7 +222,7 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, shardPlacement->nodeId, quote_literal_cstr(taskPrefix), quote_literal_cstr(taskPrefix), - quote_literal_cstr(selectTask->queryString), + quote_literal_cstr(TaskQueryString(selectTask)), partitionColumnIndex, quote_literal_cstr(partitionMethodString), minValuesString->data, maxValuesString->data, @@ -229,7 +230,7 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList, perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data); } - selectTask->queryString = NULL; + SetTaskQueryString(selectTask, NULL); selectTask->perPlacementQueryStrings = perPlacementQueries; } } @@ -546,7 +547,7 @@ FragmentTransferTaskList(List *fragmentListTransfers) Task *task = CitusMakeNode(Task); task->taskType = SELECT_TASK; - task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer); + SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer)); task->taskPlacementList = list_make1(targetPlacement); fetchTaskList = lappend(fetchTaskList, task); diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 14e781be5..b1bb6ab2d 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -14,6 +14,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/commands/multi_copy.h" #include "distributed/adaptive_executor.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" @@ -1094,13 +1095,11 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) Task *task = NULL; foreach_ptr(task, taskList) { - Assert(task->queryString != NULL); - StringInfo wrappedQuery = makeStringInfo(); appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", projectedColumnsString->data, - task->queryString); - task->queryString = wrappedQuery->data; + TaskQueryString(task)); + SetTaskQueryString(task, wrappedQuery->data); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index e9252a8b6..41df95f62 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -73,6 +73,8 @@ #include "miscadmin.h" #include "distributed/citus_custom_scan.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/deparse_shard_query.h" #include "distributed/local_executor.h" #include "distributed/multi_executor.h" #include "distributed/master_protocol.h" @@ -103,9 +105,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **remoteTaskPlacementList); static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *queryString); -static bool TaskAccessesLocalNode(Task *task); static void LogLocalCommand(const char *command); - static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); @@ -143,7 +143,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) { Task *task = (Task *) lfirst(taskCell); - const char *shardQueryString = task->queryString; + const char *shardQueryString = TaskQueryString(task); Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams); /* @@ -167,7 +167,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) LogLocalCommand(shardQueryString); totalRowsProcessed += - ExecuteLocalTaskPlan(scanState, localPlan, task->queryString); + ExecuteLocalTaskPlan(scanState, localPlan, TaskQueryString(task)); } return totalRowsProcessed; @@ -432,7 +432,7 @@ ShouldExecuteTasksLocally(List *taskList) * TaskAccessesLocalNode returns true if any placements of the task reside on the * node that we're executing the query. */ -static bool +bool TaskAccessesLocalNode(Task *task) { ListCell *placementCell = NULL; diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 06d9a403d..6ab4b2064 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -28,6 +28,7 @@ #include "distributed/citus_custom_scan.h" #include "distributed/citus_nodes.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_execution_locks.h" #include "distributed/local_executor.h" #include "distributed/metadata_cache.h" @@ -1060,7 +1061,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker, StringInfo mapFetchTaskQueryString = MapFetchTaskQueryString(task, mapTask); - task->queryString = mapFetchTaskQueryString->data; + SetTaskQueryString(task, mapFetchTaskQueryString->data); taskExecution->querySourceNodeIndex = mapTaskExecution->currentNodeIndex; } @@ -1576,7 +1577,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task) */ StringInfo sqlTaskQueryString = makeStringInfo(); - char *escapedTaskQueryString = quote_literal_cstr(task->queryString); + char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task)); if (BinaryMasterCopyFormat) { @@ -1611,7 +1612,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task) HTAB *taskStateHash = taskTracker->taskStateHash; /* wrap a task assignment query outside the original query */ - StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, task->queryString); + StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task)); TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId, task->taskId); @@ -2731,7 +2732,7 @@ JobCleanupTask(uint64 jobId) jobCleanupTask->jobId = jobId; jobCleanupTask->taskId = JOB_CLEANUP_TASK_ID; jobCleanupTask->replicationModel = REPLICATION_MODEL_INVALID; - jobCleanupTask->queryString = jobCleanupQuery->data; + SetTaskQueryString(jobCleanupTask, jobCleanupQuery->data); return jobCleanupTask; } @@ -2767,9 +2768,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) if (!taskTracker->connectionBusy) { /* assign through task tracker to manage resource utilization */ - StringInfo jobCleanupQuery = TaskAssignmentQuery(jobCleanupTask, - jobCleanupTask-> - queryString); + StringInfo jobCleanupQuery = TaskAssignmentQuery( + jobCleanupTask, TaskQueryString(jobCleanupTask)); jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId, jobCleanupQuery->data); @@ -2850,7 +2850,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodeName, nodePort, (int) queryStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, jobCleanupTask->queryString))); + nodePort, TaskQueryString( + jobCleanupTask)))); } else { @@ -2868,7 +2869,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) nodePort, (int) resultStatus), errhint("Manually clean job resources on node " "\"%s:%u\" by running \"%s\" ", nodeName, - nodePort, jobCleanupTask->queryString))); + nodePort, TaskQueryString(jobCleanupTask)))); } else { diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index b6f030a90..819f16b15 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -30,6 +30,7 @@ #include "distributed/commands.h" #include "distributed/adaptive_executor.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distributed_planner.h" #include "distributed/listutils.h" #include "distributed/multi_client_executor.h" @@ -515,7 +516,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, task->jobId = INVALID_JOB_ID; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = StringJoin(commandList, ';'); + SetTaskQueryString(task, StringJoin(commandList, ';')); task->replicationModel = REPLICATION_MODEL_INVALID; task->dependentTaskList = NIL; task->anchorShardId = shardId; diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 84e89226a..94de213cf 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -16,6 +16,7 @@ #include "commands/tablecmds.h" #include "commands/trigger.h" #include "distributed/commands/utility_hook.h" +#include "distributed/deparse_shard_query.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/multi_executor.h" @@ -116,7 +117,7 @@ TruncateTaskList(Oid relationId) task->jobId = INVALID_JOB_ID; task->taskId = taskId++; task->taskType = DDL_TASK; - task->queryString = shardQueryString->data; + SetTaskQueryString(task, shardQueryString->data); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 9198550ab..8befa16a7 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -17,6 +17,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/deparse_shard_query.h" #include "distributed/insert_select_planner.h" +#include "distributed/local_executor.h" #include "distributed/metadata_cache.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" @@ -36,6 +37,8 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, Task *task); static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte); +static bool ShouldLazyDeparseQuery(Task *task); +static char * DeparseTaskQuery(Task *task, Query *query); /* @@ -105,13 +108,15 @@ RebuildQueryStrings(Query *originalQuery, List *taskList) } ereport(DEBUG4, (errmsg("query before rebuilding: %s", - task->queryString == NULL ? "(null)" : - ApplyLogRedaction(task->queryString)))); + task->queryForLocalExecution == NULL && + task->queryStringLazy == NULL + ? "(null)" + : ApplyLogRedaction(TaskQueryString(task))))); UpdateTaskQueryString(query, relationId, valuesRTE, task); ereport(DEBUG4, (errmsg("query after rebuilding: %s", - ApplyLogRedaction(task->queryString)))); + ApplyLogRedaction(TaskQueryString(task))))); } } @@ -127,7 +132,6 @@ static void UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE, Task *task) { - StringInfo queryString = makeStringInfo(); List *oldValuesLists = NIL; if (valuesRTE != NULL) @@ -139,30 +143,40 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value valuesRTE->values_lists = task->rowValuesLists; } - /* - * For INSERT queries, we only have one relation to update, so we can - * use deparse_shard_query(). For UPDATE and DELETE queries, we may have - * subqueries and joins, so we use relation shard list to update shard - * names and call pg_get_query_def() directly. - */ - if (query->commandType == CMD_INSERT) - { - deparse_shard_query(query, distributedTableId, task->anchorShardId, queryString); - } - else + if (query->commandType != CMD_INSERT) { + /* + * For UPDATE and DELETE queries, we may have subqueries and joins, so + * we use relation shard list to update shard names and call + * pg_get_query_def() directly. + */ List *relationShardList = task->relationShardList; UpdateRelationToShardNames((Node *) query, relationShardList); - - pg_get_query_def(query, queryString); } + else if (ShouldLazyDeparseQuery(task)) + { + /* + * not all insert queries are copied before calling this + * function, so we do it here + */ + query = copyObject(query); + } + + if (query->commandType == CMD_INSERT) + { + /* + * We store this in the task so we can lazily call + * deparse_shard_query when the string is needed + */ + task->anchorDistributedTableId = distributedTableId; + } + + SetTaskQuery(task, query); if (valuesRTE != NULL) { valuesRTE->values_lists = oldValuesLists; } - - task->queryString = queryString->data; } @@ -303,3 +317,111 @@ ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte) rte->subquery = subquery; rte->alias = copyObject(rte->eref); } + + +/* + * ShouldLazyDeparseQuery returns true if we should lazily deparse the query + * when adding it to the task. Right now it simply checks if any shards on the + * local node can be used for the task. + */ +static bool +ShouldLazyDeparseQuery(Task *task) +{ + return TaskAccessesLocalNode(task); +} + + +/* + * SetTaskQuery attaches the query to the task so that it can be used during + * execution. If local execution can possibly take place it sets task->queryForLocalExecution. + * If not it deparses the query and sets queryStringLazy, to avoid blowing the + * size of the task unnecesarily. + */ +void +SetTaskQuery(Task *task, Query *query) +{ + if (ShouldLazyDeparseQuery(task)) + { + task->queryForLocalExecution = query; + task->queryStringLazy = NULL; + return; + } + + task->queryForLocalExecution = NULL; + task->queryStringLazy = DeparseTaskQuery(task, query); +} + + +/* + * SetTaskQueryString attaches the query string to the task so that it can be + * used during execution. It also unsets queryForLocalExecution to be sure + * these are kept in sync. + */ +void +SetTaskQueryString(Task *task, char *queryString) +{ + task->queryForLocalExecution = NULL; + task->queryStringLazy = queryString; +} + + +/* + * DeparseTaskQuery is a general way of deparsing a query based on a task. + */ +static char * +DeparseTaskQuery(Task *task, Query *query) +{ + StringInfo queryString = makeStringInfo(); + + if (query->commandType == CMD_INSERT) + { + /* + * For INSERT queries we cannot use pg_get_query_def. Mainly because we + * cannot run UpdateRelationToShardNames on an INSERT query. This is + * because the PG deparsing logic fails when trying to insert into a + * RTE_FUNCTION (which is what will happen if you call + * UpdateRelationToShardNames). + */ + deparse_shard_query(query, task->anchorDistributedTableId, task->anchorShardId, + queryString); + } + else + { + pg_get_query_def(query, queryString); + } + + return queryString->data; +} + + +/* + * TaskQueryString generates task->queryStringLazy if missing. + * + * For performance reasons, the queryString is generated lazily. For example + * for local queries it is usually not needed to generate it, so this way we + * can skip the expensive deparsing+parsing. + */ +char * +TaskQueryString(Task *task) +{ + if (task->queryStringLazy != NULL) + { + return task->queryStringLazy; + } + Assert(task->queryForLocalExecution != NULL); + + + /* + * Switch to the memory context of task->queryForLocalExecution before generating the query + * string. This way the query string is not freed in between multiple + * executions of a prepared statement. Except when UpdateTaskQueryString is + * used to set task->queryForLocalExecution, in that case it is freed but it will be set to + * NULL on the next execution of the query because UpdateTaskQueryString + * does that. + */ + MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext( + task->queryForLocalExecution)); + task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution); + MemoryContextSwitchTo(previousContext); + return task->queryStringLazy; +} diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 90a3750a3..192271f5a 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -126,6 +126,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext, List *rangeTableList, int rteIdCounter); + /* Distributed planner hook */ PlannedStmt * distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) @@ -143,6 +144,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams) .boundParams = boundParams, }; + if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) { /* this cursor flag could only be set when Citus has been loaded */ diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 1499a2c87..51e5d4bfd 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -21,6 +21,7 @@ #include "distributed/commands.h" #include "distributed/commands/multi_copy.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" #include "distributed/insert_select_executor.h" @@ -111,7 +112,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) Var *partitionColumn = NULL; ShardPlacement *placement = NULL; WorkerNode *workerNode = NULL; - StringInfo queryString = NULL; Task *task = NULL; Job *job = NULL; DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan); @@ -364,13 +364,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG1, (errmsg("pushing down the function call"))); - queryString = makeStringInfo(); - pg_get_query_def(planContext->query, queryString); - task = CitusMakeNode(Task); task->taskType = SELECT_TASK; - task->queryString = queryString->data; task->taskPlacementList = placementList; + SetTaskQuery(task, planContext->query); task->anchorShardId = shardInterval->shardId; task->replicationModel = distTable->replicationModel; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 11ba065ac..5fec712c8 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -22,6 +22,7 @@ #include "optimizer/cost.h" #include "distributed/citus_nodefuncs.h" #include "distributed/connection_management.h" +#include "distributed/deparse_shard_query.h" #include "distributed/insert_select_planner.h" #include "distributed/insert_select_executor.h" #include "distributed/listutils.h" @@ -400,7 +401,7 @@ RemoteExplain(Task *task, ExplainState *es) RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0( sizeof(RemoteExplainPlan)); - StringInfo explainQuery = BuildRemoteExplainQuery(task->queryString, es); + StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es); /* * Use a coordinated transaction to ensure that we open a transaction block diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 618d99744..61f3afc9c 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2449,7 +2449,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, pg_get_query_def(taskQuery, queryString); ereport(DEBUG4, (errmsg("distributed statement: %s", ApplyLogRedaction(queryString->data)))); - subqueryTask->queryString = queryString->data; + SetTaskQueryString(subqueryTask, queryString->data); } subqueryTask->dependentTaskList = NULL; @@ -3977,7 +3977,7 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin task->taskId = taskId; task->taskType = taskType; task->replicationModel = REPLICATION_MODEL_INVALID; - task->queryString = queryString; + SetTaskQueryString(task, queryString); return task; } @@ -4244,7 +4244,7 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList) /* convert filter query task into map task */ Task *mapTask = filterTask; - mapTask->queryString = mapQueryString->data; + SetTaskQueryString(mapTask, mapQueryString->data); mapTask->taskType = MAP_TASK; mapTaskList = lappend(mapTaskList, mapTask); @@ -4266,7 +4266,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask, /* wrap repartition query string around filter query string */ StringInfo mapQueryString = makeStringInfo(); - char *filterQueryString = filterTask->queryString; + char *filterQueryString = TaskQueryString(filterTask); char *filterQueryEscapedText = quote_literal_cstr(filterQueryString); PartitionType partitionType = mapMergeJob->partitionType; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index bb1a975d2..b4ba6c237 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1581,6 +1581,14 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError) modifyTask->replicationModel = cacheEntry->replicationModel; modifyTask->rowValuesLists = modifyRoute->rowValuesLists; + RelationShard *relationShard = CitusMakeNode(RelationShard); + relationShard->shardId = modifyRoute->shardId; + relationShard->relationId = distributedTableId; + + modifyTask->relationShardList = list_make1(relationShard); + + modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId); + insertTaskList = lappend(insertTaskList, modifyTask); } @@ -1598,7 +1606,7 @@ CreateTask(TaskType taskType) task->taskType = taskType; task->jobId = INVALID_JOB_ID; task->taskId = INVALID_TASK_ID; - task->queryString = NULL; + SetTaskQueryString(task, NULL); task->anchorShardId = INVALID_SHARD_ID; task->taskPlacementList = NIL; task->dependentTaskList = NIL; @@ -1875,20 +1883,22 @@ RemoveCoordinatorPlacement(List *placementList) */ static List * SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList, - List *placementList, - uint64 shardId) + List *placementList, uint64 shardId) { Task *task = CreateTask(SELECT_TASK); - StringInfo queryString = makeStringInfo(); List *relationRowLockList = NIL; RowLocksOnRelations((Node *) query, &relationRowLockList); - pg_get_query_def(query, queryString); - task->queryString = queryString->data; + /* + * For performance reasons, we skip generating the queryString. For local + * execution this is not needed, so we wait until the executor determines + * that the query cannot be executed locally. + */ + task->taskPlacementList = placementList; + SetTaskQuery(task, query); task->anchorShardId = shardId; task->jobId = jobId; - task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->relationRowLockList = relationRowLockList; @@ -1946,7 +1956,6 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId) { Task *task = CreateTask(MODIFY_TASK); - StringInfo queryString = makeStringInfo(); List *rangeTableList = NIL; ExtractRangeTableEntryWalker((Node *) query, &rangeTableList); @@ -1964,12 +1973,10 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList, "and modify a reference table"))); } - pg_get_query_def(query, queryString); - - task->queryString = queryString->data; + task->taskPlacementList = placementList; + SetTaskQuery(task, query); task->anchorShardId = shardId; task->jobId = jobId; - task->taskPlacementList = placementList; task->relationShardList = relationShardList; task->replicationModel = modificationTableCacheEntry->replicationModel; @@ -2084,7 +2091,6 @@ PlanRouterQuery(Query *originalQuery, TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst, &isMultiShardQuery, distributionKeyValue); - /* * This could only happen when there is a parameter on the distribution key. * We defer error here, later the planner is forced to use a generic plan diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 6c4c7b0e4..a087f14b6 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -246,7 +246,9 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(taskType); COPY_SCALAR_FIELD(jobId); COPY_SCALAR_FIELD(taskId); - COPY_STRING_FIELD(queryString); + COPY_NODE_FIELD(queryForLocalExecution); + COPY_STRING_FIELD(queryStringLazy); + COPY_SCALAR_FIELD(anchorDistributedTableId); COPY_SCALAR_FIELD(anchorShardId); COPY_NODE_FIELD(taskPlacementList); COPY_NODE_FIELD(dependentTaskList); diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 593e6826e..73bbc4bae 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -463,7 +463,9 @@ OutTask(OUTFUNC_ARGS) WRITE_ENUM_FIELD(taskType, TaskType); WRITE_UINT64_FIELD(jobId); WRITE_UINT_FIELD(taskId); - WRITE_STRING_FIELD(queryString); + WRITE_NODE_FIELD(queryForLocalExecution); + WRITE_STRING_FIELD(queryStringLazy); + WRITE_OID_FIELD(anchorDistributedTableId); WRITE_UINT64_FIELD(anchorShardId); WRITE_NODE_FIELD(taskPlacementList); WRITE_NODE_FIELD(dependentTaskList); diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index b560e1f65..34065217f 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -377,7 +377,9 @@ ReadTask(READFUNC_ARGS) READ_ENUM_FIELD(taskType, TaskType); READ_UINT64_FIELD(jobId); READ_UINT_FIELD(taskId); - READ_STRING_FIELD(queryString); + READ_NODE_FIELD(queryForLocalExecution); + READ_STRING_FIELD(queryStringLazy); + READ_OID_FIELD(anchorDistributedTableId); READ_UINT64_FIELD(anchorShardId); READ_NODE_FIELD(taskPlacementList); READ_NODE_FIELD(dependentTaskList); diff --git a/src/include/distributed/deparse_shard_query.h b/src/include/distributed/deparse_shard_query.h index a390ec68b..971907099 100644 --- a/src/include/distributed/deparse_shard_query.h +++ b/src/include/distributed/deparse_shard_query.h @@ -18,10 +18,14 @@ #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "distributed/citus_custom_scan.h" extern void RebuildQueryStrings(Query *originalQuery, List *taskList); extern bool UpdateRelationToShardNames(Node *node, List *relationShardList); +extern void SetTaskQuery(Task *task, Query *query); +extern void SetTaskQueryString(Task *task, char *queryString); +extern char * TaskQueryString(Task *task); #endif /* DEPARSE_SHARD_QUERY_H */ diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 00d4a7953..4629132ca 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -26,5 +26,6 @@ extern bool ShouldExecuteTasksLocally(List *taskList); extern void ErrorIfLocalExecutionHappened(void); extern void DisableLocalExecution(void); extern bool AnyTaskAccessesRemoteNode(List *taskList); +extern bool TaskAccessesLocalNode(Task *task); #endif /* LOCAL_EXECUTION_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 126316440..1b2fb894b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -182,15 +182,36 @@ typedef struct Task uint32 taskId; /* - * If queryString != NULL, then we have a single query for all placements. - * Otherwise, length of perPlacementQueryStrings is equal to length of - * taskPlacementList and can assign a different query for each placement. - * We need this flexibility when a query should return node specific values. - * For example, on which node did we succeed storing some result files? + * For most queries queryForLocalExecution and/or queryStringLazy is not + * NULL. This means we have a single query for all placements. + * + * If this is not the case, the length of perPlacementQueryStrings is + * non-zero and equal to length of taskPlacementList. Like this it can + * assign a different query for each placement. We need this flexibility + * when a query should return node specific values. For example, on which + * node did we succeed storing some result files? + * + * queryForLocalExecution is only not null when the planner thinks the + * query could possibly be locally executed. In that case deparsing+parsing + * the query might not be necessary, so we do that lazily. + * + * queryForLocalExecution should only be set by using SetTaskQuery() */ - char *queryString; + Query *queryForLocalExecution; + + /* + * In almost all cases queryStringLazy should be read only indirectly by + * using TaskQueryString(). This will populate the field if only the + * queryForLocalExecution field is not NULL. + * + * This field should only be set by using SetTaskQueryString() (or as a + * side effect from TaskQueryString()). Otherwise it might not be in sync + * with queryForLocalExecution. + */ + char *queryStringLazy; List *perPlacementQueryStrings; + Oid anchorDistributedTableId; /* only applies to insert tasks */ uint64 anchorShardId; /* only applies to compute tasks */ List *taskPlacementList; /* only applies to compute tasks */ List *dependentTaskList; /* only applies to compute tasks */ diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 06e20c4b3..8084d101f 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -1,6 +1,7 @@ -- Test queries on a distributed table with shards on the coordinator CREATE SCHEMA coordinator_shouldhaveshards; SET search_path TO coordinator_shouldhaveshards; +SET citus.next_shard_id TO 1503000; -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); @@ -31,11 +32,16 @@ WHERE logicalrelid = 'test'::regclass AND groupid = 0; 2 (1 row) +--- enable logging to see which tasks are executed locally +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; -- router queries execute locally INSERT INTO test VALUES (1, 1); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 @@ -57,12 +63,15 @@ WITH a AS (SELECT * FROM test) SELECT count(*) FROM test; -- multi-shard queries in transaction blocks execute locally BEGIN; SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 @@ -71,12 +80,15 @@ SELECT count(*) FROM test; END; BEGIN; SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 (1 row) SELECT count(*) FROM test; +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true count --------------------------------------------------------------------- 100 @@ -88,6 +100,7 @@ ALTER TABLE test ADD COLUMN z int; -- DDL after local execution BEGIN; SELECT y FROM test WHERE x = 1; +NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1) y --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e123d0887..5d8f755ad 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -808,6 +808,15 @@ INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4, (5 rows) PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1; +PREPARE local_prepare_no_param_subquery AS +SELECT DISTINCT trim(value) FROM ( + SELECT value FROM distributed_table + WHERE + key IN (1, 6, 500, 701) + AND (select 2) > random() + order by 1 + limit 2 + ) t; PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1; PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1; BEGIN; @@ -854,6 +863,61 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 1 (1 row) + -- 6 local execution without params and some subqueries + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + + EXECUTE local_prepare_no_param_subquery; +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint +NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t + btrim +--------------------------------------------------------------------- + 12 +(1 row) + -- 6 local executions with params EXECUTE local_prepare_param(1); NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) @@ -1370,7 +1434,6 @@ EXECUTE serial_prepared_local; -- Citus currently doesn't allow using task_assignment_policy for intermediate results WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed; NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1000) RETURNING key -NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_local_mixed key --------------------------------------------------------------------- 1000 @@ -1418,6 +1481,33 @@ NOTICE: executing the command locally: DELETE FROM local_shard_execution.refere (2 rows) COMMIT; +-- however complex the query, local execution can handle +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; +WITH cte_1 AS + (SELECT * + FROM + (WITH cte_1 AS + (SELECT * + FROM distributed_table + WHERE key = 1) SELECT * + FROM cte_1) AS foo) +SELECT count(*) +FROM cte_1 +JOIN distributed_table USING (key) +WHERE distributed_table.key = 1 + AND distributed_table.key IN + (SELECT key + FROM distributed_table + WHERE key = 1); +NOTICE: executing the command locally: SELECT count(*) AS count FROM ((SELECT foo.key, foo.value, foo.age FROM (SELECT cte_1_1.key, cte_1_1.value, cte_1_1.age FROM (SELECT distributed_table_1.key, distributed_table_1.value, distributed_table_1.age FROM local_shard_execution.distributed_table_1470001 distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 1)) cte_1_1) foo) cte_1 JOIN local_shard_execution.distributed_table_1470001 distributed_table(key, value, age) USING (key)) WHERE ((distributed_table.key OPERATOR(pg_catalog.=) 1) AND (distributed_table.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_1.key FROM local_shard_execution.distributed_table_1470001 distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 1)))) + count +--------------------------------------------------------------------- + 0 +(1 row) + +RESET client_min_messages; +RESET citus.log_local_commands; \c - - - :master_port -- local execution with custom type SET citus.replication_model TO "streaming"; diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 8b15a8ff5..4f7affed3 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -926,6 +926,14 @@ RETURNING value_1, value_2, value_3; 3 | 1 | 2 (1 row) +INSERT INTO + reference_table_test_fifth (value_4) VALUES (now()) +RETURNING value_1, value_2, value_3; + value_1 | value_2 | value_3 +--------------------------------------------------------------------- + 4 | | +(1 row) + UPDATE reference_table_test_fifth SET value_4 = now() WHERE diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 42fe96bec..66979a7c8 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -2,6 +2,7 @@ CREATE SCHEMA coordinator_shouldhaveshards; SET search_path TO coordinator_shouldhaveshards; +SET citus.next_shard_id TO 1503000; -- idempotently add node to allow this test to run without add_coordinator SET client_min_messages TO WARNING; @@ -18,6 +19,11 @@ SELECT create_distributed_table('test','x', colocate_with := 'none'); SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid) WHERE logicalrelid = 'test'::regclass AND groupid = 0; +--- enable logging to see which tasks are executed locally +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; + + -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 45ecf377b..f829733c1 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -461,6 +461,15 @@ INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4, PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1; +PREPARE local_prepare_no_param_subquery AS +SELECT DISTINCT trim(value) FROM ( + SELECT value FROM distributed_table + WHERE + key IN (1, 6, 500, 701) + AND (select 2) > random() + order by 1 + limit 2 + ) t; PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1; PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1; BEGIN; @@ -472,6 +481,14 @@ BEGIN; EXECUTE local_prepare_no_param; EXECUTE local_prepare_no_param; + -- 6 local execution without params and some subqueries + EXECUTE local_prepare_no_param_subquery; + EXECUTE local_prepare_no_param_subquery; + EXECUTE local_prepare_no_param_subquery; + EXECUTE local_prepare_no_param_subquery; + EXECUTE local_prepare_no_param_subquery; + EXECUTE local_prepare_no_param_subquery; + -- 6 local executions with params EXECUTE local_prepare_param(1); EXECUTE local_prepare_param(5); @@ -732,6 +749,28 @@ BEGIN; DELETE FROM reference_table RETURNING key; COMMIT; +-- however complex the query, local execution can handle +SET client_min_messages TO LOG; +SET citus.log_local_commands TO ON; +WITH cte_1 AS + (SELECT * + FROM + (WITH cte_1 AS + (SELECT * + FROM distributed_table + WHERE key = 1) SELECT * + FROM cte_1) AS foo) +SELECT count(*) +FROM cte_1 +JOIN distributed_table USING (key) +WHERE distributed_table.key = 1 + AND distributed_table.key IN + (SELECT key + FROM distributed_table + WHERE key = 1); + +RESET client_min_messages; +RESET citus.log_local_commands; \c - - - :master_port diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index d3fc89896..87b377945 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -563,6 +563,10 @@ INSERT INTO reference_table_test_fifth (value_2, value_3) VALUES (nextval('example_ref_value_seq'), nextval('example_ref_value_seq')::text) RETURNING value_1, value_2, value_3; +INSERT INTO + reference_table_test_fifth (value_4) VALUES (now()) +RETURNING value_1, value_2, value_3; + UPDATE reference_table_test_fifth SET value_4 = now() WHERE