Merge pull request #3554 from citusdata/refactor/vacuum-and-local-executor

Refactor vacuumTaskList function and local_executor.c line lengths
pull/3562/head
Onur Tirtir 2020-03-02 12:04:55 +03:00 committed by GitHub
commit c9c6e58c53
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 112 additions and 87 deletions

View File

@ -44,7 +44,7 @@ typedef struct CitusVacuumParams
static bool IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList); static bool IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList);
static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams,
List *vacuumColumnList); List *vacuumColumnList);
static StringInfo DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams); static char * DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams);
static char * DeparseVacuumColumnNames(List *columnNameList); static char * DeparseVacuumColumnNames(List *columnNameList);
static List * VacuumColumnList(VacuumStmt *vacuumStmt, int relationIndex); static List * VacuumColumnList(VacuumStmt *vacuumStmt, int relationIndex);
static List * ExtractVacuumTargetRels(VacuumStmt *vacuumStmt); static List * ExtractVacuumTargetRels(VacuumStmt *vacuumStmt);
@ -182,15 +182,17 @@ IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList)
static List * static List *
VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList) VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList)
{ {
/* resulting task list */
List *taskList = NIL; List *taskList = NIL;
uint64 jobId = INVALID_JOB_ID;
/* enumerate the tasks when putting them to the taskList */
int taskId = 1; int taskId = 1;
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumParams);
const int vacuumPrefixLen = vacuumString->len;
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(relationId); char *relationName = get_rel_name(relationId);
const char *vacuumStringPrefix = DeparseVacuumStmtPrefix(vacuumParams);
const char *columnNames = DeparseVacuumColumnNames(vacuumColumnList); const char *columnNames = DeparseVacuumColumnNames(vacuumColumnList);
/* /*
@ -210,20 +212,25 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
char *shardRelationName = pstrdup(relationName);
char *shardName = pstrdup(tableName); /* build shard relation name */
AppendShardIdToName(&shardName, shardInterval->shardId); AppendShardIdToName(&shardRelationName, shardId);
shardName = quote_qualified_identifier(schemaName, shardName);
vacuumString->len = vacuumPrefixLen; char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName);
appendStringInfoString(vacuumString, shardName);
appendStringInfoString(vacuumString, columnNames); /* copy base vacuum string and build the shard specific command */
StringInfo vacuumStringForShard = makeStringInfo();
appendStringInfoString(vacuumStringForShard, vacuumStringPrefix);
appendStringInfoString(vacuumStringForShard, quotedShardName);
appendStringInfoString(vacuumStringForShard, columnNames);
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
task->jobId = jobId; task->jobId = INVALID_JOB_ID;
task->taskId = taskId++; task->taskId = taskId++;
task->taskType = VACUUM_ANALYZE_TASK; task->taskType = VACUUM_ANALYZE_TASK;
SetTaskQueryString(task, pstrdup(vacuumString->data)); SetTaskQueryString(task, vacuumStringForShard->data);
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
@ -242,7 +249,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
* reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE * reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE
* statements. * statements.
*/ */
static StringInfo static char *
DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
{ {
int vacuumFlags = vacuumParams.options; int vacuumFlags = vacuumParams.options;
@ -276,7 +283,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
#endif #endif
) )
{ {
return vacuumPrefix; return vacuumPrefix->data;
} }
/* otherwise, handle options */ /* otherwise, handle options */
@ -334,7 +341,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams)
appendStringInfoChar(vacuumPrefix, ' '); appendStringInfoChar(vacuumPrefix, ' ');
return vacuumPrefix; return vacuumPrefix->data;
} }

View File

@ -3,21 +3,22 @@
* *
* The scope of the local execution is locally executing the queries on the * The scope of the local execution is locally executing the queries on the
* shards. In other words, local execution does not deal with any local tables * shards. In other words, local execution does not deal with any local tables
* that are not on shards on the node that the query is being executed. In that sense, * that are not on shards on the node that the query is being executed. In that
* the local executor is only triggered if the node has both the metadata and the * sense, the local executor is only triggered if the node has both the metadata
* shards (e.g., only Citus MX worker nodes). * and the shards (e.g., only Citus MX worker nodes).
* *
* The goal of the local execution is to skip the unnecessary network round-trip * The goal of the local execution is to skip the unnecessary network round-trip
* happening on the node itself. Instead, identify the locally executable tasks and * happening on the node itself. Instead, identify the locally executable tasks
* simply call PostgreSQL's planner and executor. * and simply call PostgreSQL's planner and executor.
* *
* The local executor is an extension of the adaptive executor. So, the executor uses * The local executor is an extension of the adaptive executor. So, the executor
* adaptive executor's custom scan nodes. * uses adaptive executor's custom scan nodes.
* *
* One thing to note is that Citus MX is only supported with replication factor = 1, so * One thing to note is that Citus MX is only supported with replication factor
* keep that in mind while continuing the comments below. * to be equal to 1, so keep that in mind while continuing the comments below.
* *
* On the high level, there are 3 slightly different ways of utilizing local execution: * On the high level, there are 3 slightly different ways of utilizing local
* execution:
* *
* (1) Execution of local single shard queries of a distributed table * (1) Execution of local single shard queries of a distributed table
* *
@ -25,16 +26,19 @@
* executor, and since the query is only a single task the execution finishes * executor, and since the query is only a single task the execution finishes
* without going to the network at all. * without going to the network at all.
* *
* Even if there is a transaction block (or recursively planned CTEs), as long * Even if there is a transaction block (or recursively planned CTEs), as
* as the queries hit the shards on the same node, the local execution will kick in. * long as the queries hit the shards on the same node, the local execution
* will kick in.
* *
* (2) Execution of local single queries and remote multi-shard queries * (2) Execution of local single queries and remote multi-shard queries
* *
* The rule is simple. If a transaction block starts with a local query execution, * The rule is simple. If a transaction block starts with a local query
* all the other queries in the same transaction block that touch any local shard * execution,
* have to use the local execution. Although this sounds restrictive, we prefer to * all the other queries in the same transaction block that touch any local
* implement it in this way, otherwise we'd end-up with as complex scenarios as we * shard have to use the local execution. Although this sounds restrictive,
* have in the connection managements due to foreign keys. * we prefer to implement it in this way, otherwise we'd end-up with as
* complex scenarios as we have in the connection managements due to foreign
* keys.
* *
* See the following example: * See the following example:
* BEGIN; * BEGIN;
@ -49,26 +53,28 @@
* *
* (3) Modifications of reference tables * (3) Modifications of reference tables
* *
* Modifications to reference tables have to be executed on all nodes. So, after the * Modifications to reference tables have to be executed on all nodes. So,
* local execution, the adaptive executor keeps continuing the execution on the other * after the local execution, the adaptive executor keeps continuing the
* nodes. * execution on the other nodes.
* *
* Note that for read-only queries, after the local execution, there is no need to * Note that for read-only queries, after the local execution, there is no
* kick in adaptive executor. * need to kick in adaptive executor.
* *
* There are also a few limitations/trade-offs that are worth mentioning. * There are also a few limitations/trade-offs that are worth mentioning.
* - The local execution on multiple shards might be slow because the execution has to * - The local execution on multiple shards might be slow because the execution
* happen one task at a time (e.g., no parallelism). * has to happen one task at a time (e.g., no parallelism).
* - If a transaction block/CTE starts with a multi-shard command, we do not use local query execution * - If a transaction block/CTE starts with a multi-shard command, we do not
* since local execution is sequential. Basically, we do not want to lose parallelism * use local query execution since local execution is sequential. Basically,
* across local tasks by switching to local execution. * we do not want to lose parallelism across local tasks by switching to local
* - The local execution currently only supports queries. In other words, any utility commands like TRUNCATE, * execution.
* fails if the command is executed after a local execution inside a transaction block. * - The local execution currently only supports queries. In other words, any
* utility commands like TRUNCATE, fails if the command is executed after a local
* execution inside a transaction block.
* - The local execution cannot be mixed with the executors other than adaptive, * - The local execution cannot be mixed with the executors other than adaptive,
* namely task-tracker executor. * namely task-tracker executor.
* - Related with the previous item, COPY command * - Related with the previous item, COPY command cannot be mixed with local
* cannot be mixed with local execution in a transaction. The implication of that is any * execution in a transaction. The implication of that is any part of INSERT..SELECT
* part of INSERT..SELECT via coordinator cannot happen via the local execution. * via coordinator cannot happen via the local execution.
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
@ -215,9 +221,10 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
/* /*
* ExtractParametersForLocalExecution extracts parameter types and values from * ExtractParametersForLocalExecution extracts parameter types and values
* the given ParamListInfo structure, and fills parameter type and value arrays. * from the given ParamListInfo structure, and fills parameter type and
* It does not change the oid of custom types, because the query will be run locally. * value arrays. It does not change the oid of custom types, because the
* query will be run locally.
*/ */
static void static void
ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes,
@ -272,8 +279,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
List *localTaskPlacementList = NULL; List *localTaskPlacementList = NULL;
List *remoteTaskPlacementList = NULL; List *remoteTaskPlacementList = NULL;
SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList, SplitLocalAndRemotePlacements(
&remoteTaskPlacementList); task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList);
/* either the local or the remote should be non-nil */ /* either the local or the remote should be non-nil */
Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL)); Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL));
@ -281,9 +288,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
if (list_length(task->taskPlacementList) == 1) if (list_length(task->taskPlacementList) == 1)
{ {
/* /*
* At this point, the task has a single placement (e.g,. anchor shard is * At this point, the task has a single placement (e.g,. anchor shard
* distributed table's shard). So, it is either added to local or remote * is distributed table's shard). So, it is either added to local or
* taskList. * remote taskList.
*/ */
if (localTaskPlacementList == NIL) if (localTaskPlacementList == NIL)
{ {
@ -297,10 +304,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
else else
{ {
/* /*
* At this point, we're dealing with reference tables or intermediate results * At this point, we're dealing with reference tables or intermediate
* where the task has placements on both local and remote nodes. We always * results where the task has placements on both local and remote
* prefer to use local placement, and require remote placements only for * nodes. We always prefer to use local placement, and require remote
* modifications. * placements only for modifications.
*/ */
task->partiallyLocalOrRemote = true; task->partiallyLocalOrRemote = true;
@ -326,9 +333,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList,
/* /*
* SplitLocalAndRemotePlacements is a helper function which iterates over the input * SplitLocalAndRemotePlacements is a helper function which iterates over the
* taskPlacementList and puts the placements into corresponding list of either * input taskPlacementList and puts the placements into corresponding list of
* localTaskPlacementList or remoteTaskPlacementList. * either localTaskPlacementList or remoteTaskPlacementList.
*/ */
static void static void
SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList,
@ -433,8 +440,8 @@ ShouldExecuteTasksLocally(List *taskList)
/* /*
* TODO: A future improvement could be to keep track of which placements * TODO: A future improvement could be to keep track of which placements
* have been locally executed. At this point, only use local execution for * have been locally executed. At this point, only use local execution
* those placements. That'd help to benefit more from parallelism. * for those placements. That'd help to benefit more from parallelism.
*/ */
return true; return true;
@ -446,14 +453,14 @@ ShouldExecuteTasksLocally(List *taskList)
{ {
/* /*
* This is the valuable time to use the local execution. We are likely * This is the valuable time to use the local execution. We are likely
* to avoid any network round-trips by simply executing the command within * to avoid any network round-trips by simply executing the command
* this session. * within this session.
* *
* We cannot avoid network round trips if the task is not a read only * We cannot avoid network round trips if the task is not a read only
* task and accesses multiple placements. For example, modifications to * task and accesses multiple placements. For example, modifications to
* distributed tables (with replication factor == 1) would avoid network * distributed tables (with replication factor == 1) would avoid network
* round-trips. However, modifications to reference tables still needs to * round-trips. However, modifications to reference tables still needs
* go to over the network to do the modification on the other placements. * to go to over the network to do the modification on the other placements.
* Still, we'll be avoding the network round trip for this node. * Still, we'll be avoding the network round trip for this node.
* *
* Note that we shouldn't use local execution if any distributed execution * Note that we shouldn't use local execution if any distributed execution
@ -466,10 +473,10 @@ ShouldExecuteTasksLocally(List *taskList)
if (!singleTask) if (!singleTask)
{ {
/* /*
* For multi-task executions, switching to local execution would likely to * For multi-task executions, switching to local execution would likely
* perform poorly, because we'd lose the parallelism. Note that the local * to perform poorly, because we'd lose the parallelism. Note that the
* execution is happening one task at a time (e.g., similar to sequential * local execution is happening one task at a time (e.g., similar to
* distributed execution). * sequential distributed execution).
*/ */
Assert(!TransactionAccessedLocalPlacement); Assert(!TransactionAccessedLocalPlacement);
@ -481,8 +488,8 @@ ShouldExecuteTasksLocally(List *taskList)
/* /*
* TaskAccessesLocalNode returns true if any placements of the task reside on the * TaskAccessesLocalNode returns true if any placements of the task reside on
* node that we're executing the query. * the node that we're executing the query.
*/ */
bool bool
TaskAccessesLocalNode(Task *task) TaskAccessesLocalNode(Task *task)
@ -503,19 +510,21 @@ TaskAccessesLocalNode(Task *task)
/* /*
* ErrorIfTransactionAccessedPlacementsLocally() errors out if a local query on any shard * ErrorIfTransactionAccessedPlacementsLocally errors out if a local query
* has already been executed in the same transaction. * on any shard has already been executed in the same transaction.
* *
* This check is required because Citus currently hasn't implemented local execution * This check is required because Citus currently hasn't implemented local
* infrastructure for all the commands/executors. As we implement local execution for * execution infrastructure for all the commands/executors. As we implement
* the command/executor that this function call exists, we should simply remove the check. * local execution for the command/executor that this function call exists,
* we should simply remove the check.
*/ */
void void
ErrorIfTransactionAccessedPlacementsLocally(void) ErrorIfTransactionAccessedPlacementsLocally(void)
{ {
if (TransactionAccessedLocalPlacement) if (TransactionAccessedLocalPlacement)
{ {
ereport(ERROR, (errmsg("cannot execute command because a local execution has " ereport(ERROR,
(errmsg("cannot execute command because a local execution has "
"accessed a placement in the transaction"), "accessed a placement in the transaction"),
errhint("Try re-running the transaction with " errhint("Try re-running the transaction with "
"\"SET LOCAL citus.enable_local_execution TO OFF;\""), "\"SET LOCAL citus.enable_local_execution TO OFF;\""),

View File

@ -790,9 +790,18 @@ DistributedTableCacheEntry(Oid distributedRelationId)
else else
{ {
char *relationName = get_rel_name(distributedRelationId); char *relationName = get_rel_name(distributedRelationId);
if (relationName == NULL)
{
ereport(ERROR, (errmsg("relation with OID %u does not exist",
distributedRelationId)));
}
else
{
ereport(ERROR, (errmsg("relation %s is not distributed", relationName))); ereport(ERROR, (errmsg("relation %s is not distributed", relationName)));
} }
} }
}
/* /*