Lazy query deparsing executable queries (#3350)

Deparsing and parsing a query can be heavy on CPU. When locally executing 
the query we don't need to do this in theory most of the time.

This PR is the first step in allowing to skip deparsing and parsing
the query in these cases, by lazily creating the query string and
storing the query in the task. Future commits will make use of this and
not deparse and parse the query anymore, but use the one from the task
directly.
pull/3397/head
Jelte Fennema 2020-01-17 11:49:43 +01:00 committed by GitHub
parent 60a2bc5ec2
commit 246435be7e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 417 additions and 86 deletions

View File

@ -20,6 +20,7 @@
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
@ -168,7 +169,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
task->jobId = INVALID_JOB_ID;
task->taskId = 0;
task->taskType = DDL_TASK;
task->queryString = callCommand->data;
SetTaskQueryString(task, callCommand->data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NIL;
task->anchorShardId = placement->shardId;

View File

@ -23,6 +23,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_planner.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
@ -528,7 +529,7 @@ CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
SetTaskQueryString(task, pstrdup(ddlString.data));
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
@ -573,7 +574,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
SetTaskQueryString(task, pstrdup(ddlString.data));
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
@ -903,7 +904,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
SetTaskQueryString(task, pstrdup(ddlString.data));
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL;
task->anchorShardId = shardId;

View File

@ -23,6 +23,7 @@
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
@ -1379,7 +1380,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = applyCommand->data;
SetTaskQueryString(task, applyCommand->data);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = leftShardId;

View File

@ -42,6 +42,7 @@
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
#include "distributed/deparser.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/maintenanced.h"
@ -49,6 +50,7 @@
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
@ -864,7 +866,7 @@ DDLTaskList(Oid relationId, const char *commandString)
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = applyCommand->data;
SetTaskQueryString(task, applyCommand->data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NULL;
task->anchorShardId = shardId;
@ -899,7 +901,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
task->queryString = concatenatedCommands;
SetTaskQueryString(task, concatenatedCommands);
foreach(workerNodeCell, workerNodes)
{

View File

@ -16,6 +16,7 @@
#include "commands/vacuum.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/resource_lock.h"
@ -225,7 +226,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = VACUUM_ANALYZE_TASK;
task->queryString = pstrdup(vacuumString->data);
SetTaskQueryString(task, pstrdup(vacuumString->data));
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;

View File

@ -133,6 +133,7 @@
#include "commands/dbcommands.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/local_executor.h"
#include "distributed/multi_client_executor.h"
@ -905,7 +906,8 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
* a distributed plan.
*/
static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning,
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize,
TransactionProperties *xactProperties)
@ -3130,9 +3132,9 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
int querySent = 0;
char *queryString = NULL;
if (task->queryString != NULL)
if (list_length(task->perPlacementQueryStrings) == 0)
{
queryString = task->queryString;
queryString = TaskQueryString(task);
}
else
{

View File

@ -17,6 +17,7 @@
#include "access/tupdesc.h"
#include "catalog/pg_type.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
@ -221,7 +222,7 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
shardPlacement->nodeId,
quote_literal_cstr(taskPrefix),
quote_literal_cstr(taskPrefix),
quote_literal_cstr(selectTask->queryString),
quote_literal_cstr(TaskQueryString(selectTask)),
partitionColumnIndex,
quote_literal_cstr(partitionMethodString),
minValuesString->data, maxValuesString->data,
@ -229,7 +230,7 @@ WrapTasksForPartitioning(char *resultIdPrefix, List *selectTaskList,
perPlacementQueries = lappend(perPlacementQueries, wrappedQuery->data);
}
selectTask->queryString = NULL;
SetTaskQueryString(selectTask, NULL);
selectTask->perPlacementQueryStrings = perPlacementQueries;
}
}
@ -546,7 +547,7 @@ FragmentTransferTaskList(List *fragmentListTransfers)
Task *task = CitusMakeNode(Task);
task->taskType = SELECT_TASK;
task->queryString = QueryStringForFragmentsTransfer(fragmentsTransfer);
SetTaskQueryString(task, QueryStringForFragmentsTransfer(fragmentsTransfer));
task->taskPlacementList = list_make1(targetPlacement);
fetchTaskList = lappend(fetchTaskList, task);

View File

@ -14,6 +14,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/adaptive_executor.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
@ -1094,13 +1095,11 @@ WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
Task *task = NULL;
foreach_ptr(task, taskList)
{
Assert(task->queryString != NULL);
StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
projectedColumnsString->data,
task->queryString);
task->queryString = wrappedQuery->data;
TaskQueryString(task));
SetTaskQueryString(task, wrappedQuery->data);
}
}

View File

@ -73,6 +73,8 @@
#include "miscadmin.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/master_protocol.h"
@ -103,9 +105,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList,
List **remoteTaskPlacementList);
static uint64 ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan,
char *queryString);
static bool TaskAccessesLocalNode(Task *task);
static void LogLocalCommand(const char *command);
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes,
const char ***parameterValues);
@ -143,7 +143,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
{
Task *task = (Task *) lfirst(taskCell);
const char *shardQueryString = task->queryString;
const char *shardQueryString = TaskQueryString(task);
Query *shardQuery = ParseQueryString(shardQueryString, parameterTypes, numParams);
/*
@ -167,7 +167,7 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
LogLocalCommand(shardQueryString);
totalRowsProcessed +=
ExecuteLocalTaskPlan(scanState, localPlan, task->queryString);
ExecuteLocalTaskPlan(scanState, localPlan, TaskQueryString(task));
}
return totalRowsProcessed;
@ -432,7 +432,7 @@ ShouldExecuteTasksLocally(List *taskList)
* TaskAccessesLocalNode returns true if any placements of the task reside on the
* node that we're executing the query.
*/
static bool
bool
TaskAccessesLocalNode(Task *task)
{
ListCell *placementCell = NULL;

View File

@ -28,6 +28,7 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_nodes.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
@ -1060,7 +1061,7 @@ ManageTaskExecution(TaskTracker *taskTracker, TaskTracker *sourceTaskTracker,
StringInfo mapFetchTaskQueryString = MapFetchTaskQueryString(task,
mapTask);
task->queryString = mapFetchTaskQueryString->data;
SetTaskQueryString(task, mapFetchTaskQueryString->data);
taskExecution->querySourceNodeIndex = mapTaskExecution->currentNodeIndex;
}
@ -1576,7 +1577,7 @@ TrackerQueueSqlTask(TaskTracker *taskTracker, Task *task)
*/
StringInfo sqlTaskQueryString = makeStringInfo();
char *escapedTaskQueryString = quote_literal_cstr(task->queryString);
char *escapedTaskQueryString = quote_literal_cstr(TaskQueryString(task));
if (BinaryMasterCopyFormat)
{
@ -1611,7 +1612,7 @@ TrackerQueueTask(TaskTracker *taskTracker, Task *task)
HTAB *taskStateHash = taskTracker->taskStateHash;
/* wrap a task assignment query outside the original query */
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, task->queryString);
StringInfo taskAssignmentQuery = TaskAssignmentQuery(task, TaskQueryString(task));
TrackerTaskState *taskState = TaskStateHashEnter(taskStateHash, task->jobId,
task->taskId);
@ -2731,7 +2732,7 @@ JobCleanupTask(uint64 jobId)
jobCleanupTask->jobId = jobId;
jobCleanupTask->taskId = JOB_CLEANUP_TASK_ID;
jobCleanupTask->replicationModel = REPLICATION_MODEL_INVALID;
jobCleanupTask->queryString = jobCleanupQuery->data;
SetTaskQueryString(jobCleanupTask, jobCleanupQuery->data);
return jobCleanupTask;
}
@ -2767,9 +2768,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
if (!taskTracker->connectionBusy)
{
/* assign through task tracker to manage resource utilization */
StringInfo jobCleanupQuery = TaskAssignmentQuery(jobCleanupTask,
jobCleanupTask->
queryString);
StringInfo jobCleanupQuery = TaskAssignmentQuery(
jobCleanupTask, TaskQueryString(jobCleanupTask));
jobCleanupQuerySent = MultiClientSendQuery(taskTracker->connectionId,
jobCleanupQuery->data);
@ -2850,7 +2850,8 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodeName, nodePort, (int) queryStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
nodePort, TaskQueryString(
jobCleanupTask))));
}
else
{
@ -2868,7 +2869,7 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
nodePort, (int) resultStatus),
errhint("Manually clean job resources on node "
"\"%s:%u\" by running \"%s\" ", nodeName,
nodePort, jobCleanupTask->queryString)));
nodePort, TaskQueryString(jobCleanupTask))));
}
else
{

View File

@ -30,6 +30,7 @@
#include "distributed/commands.h"
#include "distributed/adaptive_executor.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_planner.h"
#include "distributed/listutils.h"
#include "distributed/multi_client_executor.h"
@ -515,7 +516,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = StringJoin(commandList, ';');
SetTaskQueryString(task, StringJoin(commandList, ';'));
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependentTaskList = NIL;
task->anchorShardId = shardId;

View File

@ -16,6 +16,7 @@
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_executor.h"
@ -116,7 +117,7 @@ TruncateTaskList(Oid relationId)
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = shardQueryString->data;
SetTaskQueryString(task, shardQueryString->data);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;

View File

@ -17,6 +17,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_planner.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
@ -36,6 +37,8 @@
static void UpdateTaskQueryString(Query *query, Oid distributedTableId,
RangeTblEntry *valuesRTE, Task *task);
static void ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte);
static bool ShouldLazyDeparseQuery(Task *task);
static char * DeparseTaskQuery(Task *task, Query *query);
/*
@ -105,13 +108,15 @@ RebuildQueryStrings(Query *originalQuery, List *taskList)
}
ereport(DEBUG4, (errmsg("query before rebuilding: %s",
task->queryString == NULL ? "(null)" :
ApplyLogRedaction(task->queryString))));
task->queryForLocalExecution == NULL &&
task->queryStringLazy == NULL
? "(null)"
: ApplyLogRedaction(TaskQueryString(task)))));
UpdateTaskQueryString(query, relationId, valuesRTE, task);
ereport(DEBUG4, (errmsg("query after rebuilding: %s",
ApplyLogRedaction(task->queryString))));
ApplyLogRedaction(TaskQueryString(task)))));
}
}
@ -127,7 +132,6 @@ static void
UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *valuesRTE,
Task *task)
{
StringInfo queryString = makeStringInfo();
List *oldValuesLists = NIL;
if (valuesRTE != NULL)
@ -139,30 +143,40 @@ UpdateTaskQueryString(Query *query, Oid distributedTableId, RangeTblEntry *value
valuesRTE->values_lists = task->rowValuesLists;
}
/*
* For INSERT queries, we only have one relation to update, so we can
* use deparse_shard_query(). For UPDATE and DELETE queries, we may have
* subqueries and joins, so we use relation shard list to update shard
* names and call pg_get_query_def() directly.
*/
if (query->commandType == CMD_INSERT)
{
deparse_shard_query(query, distributedTableId, task->anchorShardId, queryString);
}
else
if (query->commandType != CMD_INSERT)
{
/*
* For UPDATE and DELETE queries, we may have subqueries and joins, so
* we use relation shard list to update shard names and call
* pg_get_query_def() directly.
*/
List *relationShardList = task->relationShardList;
UpdateRelationToShardNames((Node *) query, relationShardList);
pg_get_query_def(query, queryString);
}
else if (ShouldLazyDeparseQuery(task))
{
/*
* not all insert queries are copied before calling this
* function, so we do it here
*/
query = copyObject(query);
}
if (query->commandType == CMD_INSERT)
{
/*
* We store this in the task so we can lazily call
* deparse_shard_query when the string is needed
*/
task->anchorDistributedTableId = distributedTableId;
}
SetTaskQuery(task, query);
if (valuesRTE != NULL)
{
valuesRTE->values_lists = oldValuesLists;
}
task->queryString = queryString->data;
}
@ -303,3 +317,111 @@ ConvertRteToSubqueryWithEmptyResult(RangeTblEntry *rte)
rte->subquery = subquery;
rte->alias = copyObject(rte->eref);
}
/*
* ShouldLazyDeparseQuery returns true if we should lazily deparse the query
* when adding it to the task. Right now it simply checks if any shards on the
* local node can be used for the task.
*/
static bool
ShouldLazyDeparseQuery(Task *task)
{
return TaskAccessesLocalNode(task);
}
/*
* SetTaskQuery attaches the query to the task so that it can be used during
* execution. If local execution can possibly take place it sets task->queryForLocalExecution.
* If not it deparses the query and sets queryStringLazy, to avoid blowing the
* size of the task unnecesarily.
*/
void
SetTaskQuery(Task *task, Query *query)
{
if (ShouldLazyDeparseQuery(task))
{
task->queryForLocalExecution = query;
task->queryStringLazy = NULL;
return;
}
task->queryForLocalExecution = NULL;
task->queryStringLazy = DeparseTaskQuery(task, query);
}
/*
* SetTaskQueryString attaches the query string to the task so that it can be
* used during execution. It also unsets queryForLocalExecution to be sure
* these are kept in sync.
*/
void
SetTaskQueryString(Task *task, char *queryString)
{
task->queryForLocalExecution = NULL;
task->queryStringLazy = queryString;
}
/*
* DeparseTaskQuery is a general way of deparsing a query based on a task.
*/
static char *
DeparseTaskQuery(Task *task, Query *query)
{
StringInfo queryString = makeStringInfo();
if (query->commandType == CMD_INSERT)
{
/*
* For INSERT queries we cannot use pg_get_query_def. Mainly because we
* cannot run UpdateRelationToShardNames on an INSERT query. This is
* because the PG deparsing logic fails when trying to insert into a
* RTE_FUNCTION (which is what will happen if you call
* UpdateRelationToShardNames).
*/
deparse_shard_query(query, task->anchorDistributedTableId, task->anchorShardId,
queryString);
}
else
{
pg_get_query_def(query, queryString);
}
return queryString->data;
}
/*
* TaskQueryString generates task->queryStringLazy if missing.
*
* For performance reasons, the queryString is generated lazily. For example
* for local queries it is usually not needed to generate it, so this way we
* can skip the expensive deparsing+parsing.
*/
char *
TaskQueryString(Task *task)
{
if (task->queryStringLazy != NULL)
{
return task->queryStringLazy;
}
Assert(task->queryForLocalExecution != NULL);
/*
* Switch to the memory context of task->queryForLocalExecution before generating the query
* string. This way the query string is not freed in between multiple
* executions of a prepared statement. Except when UpdateTaskQueryString is
* used to set task->queryForLocalExecution, in that case it is freed but it will be set to
* NULL on the next execution of the query because UpdateTaskQueryString
* does that.
*/
MemoryContext previousContext = MemoryContextSwitchTo(GetMemoryChunkContext(
task->queryForLocalExecution));
task->queryStringLazy = DeparseTaskQuery(task, task->queryForLocalExecution);
MemoryContextSwitchTo(previousContext);
return task->queryStringLazy;
}

View File

@ -126,6 +126,7 @@ static PlannedStmt * PlanFastPathDistributedStmt(DistributedPlanningContext *pla
static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext,
List *rangeTableList, int rteIdCounter);
/* Distributed planner hook */
PlannedStmt *
distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
@ -143,6 +144,7 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
.boundParams = boundParams,
};
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
{
/* this cursor flag could only be set when Citus has been loaded */

View File

@ -21,6 +21,7 @@
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/function_call_delegation.h"
#include "distributed/insert_select_planner.h"
#include "distributed/insert_select_executor.h"
@ -111,7 +112,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
Var *partitionColumn = NULL;
ShardPlacement *placement = NULL;
WorkerNode *workerNode = NULL;
StringInfo queryString = NULL;
Task *task = NULL;
Job *job = NULL;
DistributedPlan *distributedPlan = CitusMakeNode(DistributedPlan);
@ -364,13 +364,10 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
ereport(DEBUG1, (errmsg("pushing down the function call")));
queryString = makeStringInfo();
pg_get_query_def(planContext->query, queryString);
task = CitusMakeNode(Task);
task->taskType = SELECT_TASK;
task->queryString = queryString->data;
task->taskPlacementList = placementList;
SetTaskQuery(task, planContext->query);
task->anchorShardId = shardInterval->shardId;
task->replicationModel = distTable->replicationModel;

View File

@ -22,6 +22,7 @@
#include "optimizer/cost.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/connection_management.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/insert_select_planner.h"
#include "distributed/insert_select_executor.h"
#include "distributed/listutils.h"
@ -400,7 +401,7 @@ RemoteExplain(Task *task, ExplainState *es)
RemoteExplainPlan *remotePlan = (RemoteExplainPlan *) palloc0(
sizeof(RemoteExplainPlan));
StringInfo explainQuery = BuildRemoteExplainQuery(task->queryString, es);
StringInfo explainQuery = BuildRemoteExplainQuery(TaskQueryString(task), es);
/*
* Use a coordinated transaction to ensure that we open a transaction block

View File

@ -2449,7 +2449,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
pg_get_query_def(taskQuery, queryString);
ereport(DEBUG4, (errmsg("distributed statement: %s",
ApplyLogRedaction(queryString->data))));
subqueryTask->queryString = queryString->data;
SetTaskQueryString(subqueryTask, queryString->data);
}
subqueryTask->dependentTaskList = NULL;
@ -3977,7 +3977,7 @@ CreateBasicTask(uint64 jobId, uint32 taskId, TaskType taskType, char *queryStrin
task->taskId = taskId;
task->taskType = taskType;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->queryString = queryString;
SetTaskQueryString(task, queryString);
return task;
}
@ -4244,7 +4244,7 @@ MapTaskList(MapMergeJob *mapMergeJob, List *filterTaskList)
/* convert filter query task into map task */
Task *mapTask = filterTask;
mapTask->queryString = mapQueryString->data;
SetTaskQueryString(mapTask, mapQueryString->data);
mapTask->taskType = MAP_TASK;
mapTaskList = lappend(mapTaskList, mapTask);
@ -4266,7 +4266,7 @@ CreateMapQueryString(MapMergeJob *mapMergeJob, Task *filterTask,
/* wrap repartition query string around filter query string */
StringInfo mapQueryString = makeStringInfo();
char *filterQueryString = filterTask->queryString;
char *filterQueryString = TaskQueryString(filterTask);
char *filterQueryEscapedText = quote_literal_cstr(filterQueryString);
PartitionType partitionType = mapMergeJob->partitionType;

View File

@ -1581,6 +1581,14 @@ RouterInsertTaskList(Query *query, DeferredErrorMessage **planningError)
modifyTask->replicationModel = cacheEntry->replicationModel;
modifyTask->rowValuesLists = modifyRoute->rowValuesLists;
RelationShard *relationShard = CitusMakeNode(RelationShard);
relationShard->shardId = modifyRoute->shardId;
relationShard->relationId = distributedTableId;
modifyTask->relationShardList = list_make1(relationShard);
modifyTask->taskPlacementList = ShardPlacementList(modifyRoute->shardId);
insertTaskList = lappend(insertTaskList, modifyTask);
}
@ -1598,7 +1606,7 @@ CreateTask(TaskType taskType)
task->taskType = taskType;
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->queryString = NULL;
SetTaskQueryString(task, NULL);
task->anchorShardId = INVALID_SHARD_ID;
task->taskPlacementList = NIL;
task->dependentTaskList = NIL;
@ -1875,20 +1883,22 @@ RemoveCoordinatorPlacement(List *placementList)
*/
static List *
SingleShardSelectTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList,
uint64 shardId)
List *placementList, uint64 shardId)
{
Task *task = CreateTask(SELECT_TASK);
StringInfo queryString = makeStringInfo();
List *relationRowLockList = NIL;
RowLocksOnRelations((Node *) query, &relationRowLockList);
pg_get_query_def(query, queryString);
task->queryString = queryString->data;
/*
* For performance reasons, we skip generating the queryString. For local
* execution this is not needed, so we wait until the executor determines
* that the query cannot be executed locally.
*/
task->taskPlacementList = placementList;
SetTaskQuery(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;
task->taskPlacementList = placementList;
task->relationShardList = relationShardList;
task->relationRowLockList = relationRowLockList;
@ -1946,7 +1956,6 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId)
{
Task *task = CreateTask(MODIFY_TASK);
StringInfo queryString = makeStringInfo();
List *rangeTableList = NIL;
ExtractRangeTableEntryWalker((Node *) query, &rangeTableList);
@ -1964,12 +1973,10 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
"and modify a reference table")));
}
pg_get_query_def(query, queryString);
task->queryString = queryString->data;
task->taskPlacementList = placementList;
SetTaskQuery(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;
task->taskPlacementList = placementList;
task->relationShardList = relationShardList;
task->replicationModel = modificationTableCacheEntry->replicationModel;
@ -2084,7 +2091,6 @@ PlanRouterQuery(Query *originalQuery,
TargetShardIntervalForFastPathQuery(originalQuery, partitionValueConst,
&isMultiShardQuery, distributionKeyValue);
/*
* This could only happen when there is a parameter on the distribution key.
* We defer error here, later the planner is forced to use a generic plan

View File

@ -246,7 +246,9 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(taskType);
COPY_SCALAR_FIELD(jobId);
COPY_SCALAR_FIELD(taskId);
COPY_STRING_FIELD(queryString);
COPY_NODE_FIELD(queryForLocalExecution);
COPY_STRING_FIELD(queryStringLazy);
COPY_SCALAR_FIELD(anchorDistributedTableId);
COPY_SCALAR_FIELD(anchorShardId);
COPY_NODE_FIELD(taskPlacementList);
COPY_NODE_FIELD(dependentTaskList);

View File

@ -463,7 +463,9 @@ OutTask(OUTFUNC_ARGS)
WRITE_ENUM_FIELD(taskType, TaskType);
WRITE_UINT64_FIELD(jobId);
WRITE_UINT_FIELD(taskId);
WRITE_STRING_FIELD(queryString);
WRITE_NODE_FIELD(queryForLocalExecution);
WRITE_STRING_FIELD(queryStringLazy);
WRITE_OID_FIELD(anchorDistributedTableId);
WRITE_UINT64_FIELD(anchorShardId);
WRITE_NODE_FIELD(taskPlacementList);
WRITE_NODE_FIELD(dependentTaskList);

View File

@ -377,7 +377,9 @@ ReadTask(READFUNC_ARGS)
READ_ENUM_FIELD(taskType, TaskType);
READ_UINT64_FIELD(jobId);
READ_UINT_FIELD(taskId);
READ_STRING_FIELD(queryString);
READ_NODE_FIELD(queryForLocalExecution);
READ_STRING_FIELD(queryStringLazy);
READ_OID_FIELD(anchorDistributedTableId);
READ_UINT64_FIELD(anchorShardId);
READ_NODE_FIELD(taskPlacementList);
READ_NODE_FIELD(dependentTaskList);

View File

@ -18,10 +18,14 @@
#include "nodes/nodes.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "distributed/citus_custom_scan.h"
extern void RebuildQueryStrings(Query *originalQuery, List *taskList);
extern bool UpdateRelationToShardNames(Node *node, List *relationShardList);
extern void SetTaskQuery(Task *task, Query *query);
extern void SetTaskQueryString(Task *task, char *queryString);
extern char * TaskQueryString(Task *task);
#endif /* DEPARSE_SHARD_QUERY_H */

View File

@ -26,5 +26,6 @@ extern bool ShouldExecuteTasksLocally(List *taskList);
extern void ErrorIfLocalExecutionHappened(void);
extern void DisableLocalExecution(void);
extern bool AnyTaskAccessesRemoteNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task);
#endif /* LOCAL_EXECUTION_H */

View File

@ -182,15 +182,36 @@ typedef struct Task
uint32 taskId;
/*
* If queryString != NULL, then we have a single query for all placements.
* Otherwise, length of perPlacementQueryStrings is equal to length of
* taskPlacementList and can assign a different query for each placement.
* We need this flexibility when a query should return node specific values.
* For example, on which node did we succeed storing some result files?
* For most queries queryForLocalExecution and/or queryStringLazy is not
* NULL. This means we have a single query for all placements.
*
* If this is not the case, the length of perPlacementQueryStrings is
* non-zero and equal to length of taskPlacementList. Like this it can
* assign a different query for each placement. We need this flexibility
* when a query should return node specific values. For example, on which
* node did we succeed storing some result files?
*
* queryForLocalExecution is only not null when the planner thinks the
* query could possibly be locally executed. In that case deparsing+parsing
* the query might not be necessary, so we do that lazily.
*
* queryForLocalExecution should only be set by using SetTaskQuery()
*/
char *queryString;
Query *queryForLocalExecution;
/*
* In almost all cases queryStringLazy should be read only indirectly by
* using TaskQueryString(). This will populate the field if only the
* queryForLocalExecution field is not NULL.
*
* This field should only be set by using SetTaskQueryString() (or as a
* side effect from TaskQueryString()). Otherwise it might not be in sync
* with queryForLocalExecution.
*/
char *queryStringLazy;
List *perPlacementQueryStrings;
Oid anchorDistributedTableId; /* only applies to insert tasks */
uint64 anchorShardId; /* only applies to compute tasks */
List *taskPlacementList; /* only applies to compute tasks */
List *dependentTaskList; /* only applies to compute tasks */

View File

@ -1,6 +1,7 @@
-- Test queries on a distributed table with shards on the coordinator
CREATE SCHEMA coordinator_shouldhaveshards;
SET search_path TO coordinator_shouldhaveshards;
SET citus.next_shard_id TO 1503000;
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
@ -31,11 +32,16 @@ WHERE logicalrelid = 'test'::regclass AND groupid = 0;
2
(1 row)
--- enable logging to see which tasks are executed locally
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
-- INSERT..SELECT with COPY under the covers
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
-- router queries execute locally
INSERT INTO test VALUES (1, 1);
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
1
@ -57,12 +63,15 @@ WITH a AS (SELECT * FROM test) SELECT count(*) FROM test;
-- multi-shard queries in transaction blocks execute locally
BEGIN;
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
@ -71,12 +80,15 @@ SELECT count(*) FROM test;
END;
BEGIN;
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM test;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
count
---------------------------------------------------------------------
100
@ -88,6 +100,7 @@ ALTER TABLE test ADD COLUMN z int;
-- DDL after local execution
BEGIN;
SELECT y FROM test WHERE x = 1;
NOTICE: executing the command locally: SELECT y FROM coordinator_shouldhaveshards.test_1503000 test WHERE (x OPERATOR(pg_catalog.=) 1)
y
---------------------------------------------------------------------
1

View File

@ -808,6 +808,15 @@ INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,
(5 rows)
PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1;
PREPARE local_prepare_no_param_subquery AS
SELECT DISTINCT trim(value) FROM (
SELECT value FROM distributed_table
WHERE
key IN (1, 6, 500, 701)
AND (select 2) > random()
order by 1
limit 2
) t;
PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1;
PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1;
BEGIN;
@ -854,6 +863,61 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
1
(1 row)
-- 6 local execution without params and some subqueries
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
EXECUTE local_prepare_no_param_subquery;
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT worker_column_1 AS value FROM (SELECT distributed_table.value AS worker_column_1 FROM local_shard_execution.distributed_table_1470003 distributed_table WHERE ((distributed_table.key OPERATOR(pg_catalog.=) ANY (ARRAY[1, 6, 500, 701])) AND (((SELECT 2))::double precision OPERATOR(pg_catalog.>) random()))) worker_subquery ORDER BY worker_column_1 LIMIT '2'::bigint
NOTICE: executing the command locally: SELECT DISTINCT btrim(value) AS btrim FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)) t
btrim
---------------------------------------------------------------------
12
(1 row)
-- 6 local executions with params
EXECUTE local_prepare_param(1);
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
@ -1370,7 +1434,6 @@ EXECUTE serial_prepared_local;
-- Citus currently doesn't allow using task_assignment_policy for intermediate results
WITH distributed_local_mixed AS (INSERT INTO reference_table VALUES (1000) RETURNING *) SELECT * FROM distributed_local_mixed;
NOTICE: executing the command locally: INSERT INTO local_shard_execution.reference_table_1470000 (key) VALUES (1000) RETURNING key
NOTICE: executing the command locally: SELECT key FROM (SELECT intermediate_result.key FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) distributed_local_mixed
key
---------------------------------------------------------------------
1000
@ -1418,6 +1481,33 @@ NOTICE: executing the command locally: DELETE FROM local_shard_execution.refere
(2 rows)
COMMIT;
-- however complex the query, local execution can handle
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
WITH cte_1 AS
(SELECT *
FROM
(WITH cte_1 AS
(SELECT *
FROM distributed_table
WHERE key = 1) SELECT *
FROM cte_1) AS foo)
SELECT count(*)
FROM cte_1
JOIN distributed_table USING (key)
WHERE distributed_table.key = 1
AND distributed_table.key IN
(SELECT key
FROM distributed_table
WHERE key = 1);
NOTICE: executing the command locally: SELECT count(*) AS count FROM ((SELECT foo.key, foo.value, foo.age FROM (SELECT cte_1_1.key, cte_1_1.value, cte_1_1.age FROM (SELECT distributed_table_1.key, distributed_table_1.value, distributed_table_1.age FROM local_shard_execution.distributed_table_1470001 distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 1)) cte_1_1) foo) cte_1 JOIN local_shard_execution.distributed_table_1470001 distributed_table(key, value, age) USING (key)) WHERE ((distributed_table.key OPERATOR(pg_catalog.=) 1) AND (distributed_table.key OPERATOR(pg_catalog.=) ANY (SELECT distributed_table_1.key FROM local_shard_execution.distributed_table_1470001 distributed_table_1 WHERE (distributed_table_1.key OPERATOR(pg_catalog.=) 1))))
count
---------------------------------------------------------------------
0
(1 row)
RESET client_min_messages;
RESET citus.log_local_commands;
\c - - - :master_port
-- local execution with custom type
SET citus.replication_model TO "streaming";

View File

@ -926,6 +926,14 @@ RETURNING value_1, value_2, value_3;
3 | 1 | 2
(1 row)
INSERT INTO
reference_table_test_fifth (value_4) VALUES (now())
RETURNING value_1, value_2, value_3;
value_1 | value_2 | value_3
---------------------------------------------------------------------
4 | |
(1 row)
UPDATE
reference_table_test_fifth SET value_4 = now()
WHERE

View File

@ -2,6 +2,7 @@
CREATE SCHEMA coordinator_shouldhaveshards;
SET search_path TO coordinator_shouldhaveshards;
SET citus.next_shard_id TO 1503000;
-- idempotently add node to allow this test to run without add_coordinator
SET client_min_messages TO WARNING;
@ -18,6 +19,11 @@ SELECT create_distributed_table('test','x', colocate_with := 'none');
SELECT count(*) FROM pg_dist_shard JOIN pg_dist_placement USING (shardid)
WHERE logicalrelid = 'test'::regclass AND groupid = 0;
--- enable logging to see which tasks are executed locally
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
-- INSERT..SELECT with COPY under the covers
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;

View File

@ -461,6 +461,15 @@ INSERT INTO distributed_table VALUES (1, '11',21), (2,'22',22), (3,'33',33), (4,
PREPARE local_prepare_no_param AS SELECT count(*) FROM distributed_table WHERE key = 1;
PREPARE local_prepare_no_param_subquery AS
SELECT DISTINCT trim(value) FROM (
SELECT value FROM distributed_table
WHERE
key IN (1, 6, 500, 701)
AND (select 2) > random()
order by 1
limit 2
) t;
PREPARE local_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key = $1;
PREPARE remote_prepare_param (int) AS SELECT count(*) FROM distributed_table WHERE key != $1;
BEGIN;
@ -472,6 +481,14 @@ BEGIN;
EXECUTE local_prepare_no_param;
EXECUTE local_prepare_no_param;
-- 6 local execution without params and some subqueries
EXECUTE local_prepare_no_param_subquery;
EXECUTE local_prepare_no_param_subquery;
EXECUTE local_prepare_no_param_subquery;
EXECUTE local_prepare_no_param_subquery;
EXECUTE local_prepare_no_param_subquery;
EXECUTE local_prepare_no_param_subquery;
-- 6 local executions with params
EXECUTE local_prepare_param(1);
EXECUTE local_prepare_param(5);
@ -732,6 +749,28 @@ BEGIN;
DELETE FROM reference_table RETURNING key;
COMMIT;
-- however complex the query, local execution can handle
SET client_min_messages TO LOG;
SET citus.log_local_commands TO ON;
WITH cte_1 AS
(SELECT *
FROM
(WITH cte_1 AS
(SELECT *
FROM distributed_table
WHERE key = 1) SELECT *
FROM cte_1) AS foo)
SELECT count(*)
FROM cte_1
JOIN distributed_table USING (key)
WHERE distributed_table.key = 1
AND distributed_table.key IN
(SELECT key
FROM distributed_table
WHERE key = 1);
RESET client_min_messages;
RESET citus.log_local_commands;
\c - - - :master_port

View File

@ -563,6 +563,10 @@ INSERT INTO
reference_table_test_fifth (value_2, value_3) VALUES (nextval('example_ref_value_seq'), nextval('example_ref_value_seq')::text)
RETURNING value_1, value_2, value_3;
INSERT INTO
reference_table_test_fifth (value_4) VALUES (now())
RETURNING value_1, value_2, value_3;
UPDATE
reference_table_test_fifth SET value_4 = now()
WHERE