From 17d9b934c33742e4e2fec740a7a2683a96bfacef Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 28 Feb 2020 19:02:52 +0300 Subject: [PATCH 1/3] refactor local_executor.c lines with >78 characters --- .../distributed/executor/local_executor.c | 149 ++++++++++-------- 1 file changed, 79 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index c520cd6d4..89e5c9e7c 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -3,21 +3,22 @@ * * The scope of the local execution is locally executing the queries on the * shards. In other words, local execution does not deal with any local tables - * that are not on shards on the node that the query is being executed. In that sense, - * the local executor is only triggered if the node has both the metadata and the - * shards (e.g., only Citus MX worker nodes). + * that are not on shards on the node that the query is being executed. In that + * sense, the local executor is only triggered if the node has both the metadata + * and the shards (e.g., only Citus MX worker nodes). * * The goal of the local execution is to skip the unnecessary network round-trip - * happening on the node itself. Instead, identify the locally executable tasks and - * simply call PostgreSQL's planner and executor. + * happening on the node itself. Instead, identify the locally executable tasks + * and simply call PostgreSQL's planner and executor. * - * The local executor is an extension of the adaptive executor. So, the executor uses - * adaptive executor's custom scan nodes. + * The local executor is an extension of the adaptive executor. So, the executor + * uses adaptive executor's custom scan nodes. * - * One thing to note is that Citus MX is only supported with replication factor = 1, so - * keep that in mind while continuing the comments below. + * One thing to note is that Citus MX is only supported with replication factor + * to be equal to 1, so keep that in mind while continuing the comments below. * - * On the high level, there are 3 slightly different ways of utilizing local execution: + * On the high level, there are 3 slightly different ways of utilizing local + * execution: * * (1) Execution of local single shard queries of a distributed table * @@ -25,16 +26,19 @@ * executor, and since the query is only a single task the execution finishes * without going to the network at all. * - * Even if there is a transaction block (or recursively planned CTEs), as long - * as the queries hit the shards on the same node, the local execution will kick in. + * Even if there is a transaction block (or recursively planned CTEs), as + * long as the queries hit the shards on the same node, the local execution + * will kick in. * * (2) Execution of local single queries and remote multi-shard queries * - * The rule is simple. If a transaction block starts with a local query execution, - * all the other queries in the same transaction block that touch any local shard - * have to use the local execution. Although this sounds restrictive, we prefer to - * implement it in this way, otherwise we'd end-up with as complex scenarios as we - * have in the connection managements due to foreign keys. + * The rule is simple. If a transaction block starts with a local query + * execution, + * all the other queries in the same transaction block that touch any local + * shard have to use the local execution. Although this sounds restrictive, + * we prefer to implement it in this way, otherwise we'd end-up with as + * complex scenarios as we have in the connection managements due to foreign + * keys. * * See the following example: * BEGIN; @@ -49,26 +53,28 @@ * * (3) Modifications of reference tables * - * Modifications to reference tables have to be executed on all nodes. So, after the - * local execution, the adaptive executor keeps continuing the execution on the other - * nodes. + * Modifications to reference tables have to be executed on all nodes. So, + * after the local execution, the adaptive executor keeps continuing the + * execution on the other nodes. * - * Note that for read-only queries, after the local execution, there is no need to - * kick in adaptive executor. + * Note that for read-only queries, after the local execution, there is no + * need to kick in adaptive executor. * * There are also a few limitations/trade-offs that are worth mentioning. - * - The local execution on multiple shards might be slow because the execution has to - * happen one task at a time (e.g., no parallelism). - * - If a transaction block/CTE starts with a multi-shard command, we do not use local query execution - * since local execution is sequential. Basically, we do not want to lose parallelism - * across local tasks by switching to local execution. - * - The local execution currently only supports queries. In other words, any utility commands like TRUNCATE, - * fails if the command is executed after a local execution inside a transaction block. + * - The local execution on multiple shards might be slow because the execution + * has to happen one task at a time (e.g., no parallelism). + * - If a transaction block/CTE starts with a multi-shard command, we do not + * use local query execution since local execution is sequential. Basically, + * we do not want to lose parallelism across local tasks by switching to local + * execution. + * - The local execution currently only supports queries. In other words, any + * utility commands like TRUNCATE, fails if the command is executed after a local + * execution inside a transaction block. * - The local execution cannot be mixed with the executors other than adaptive, * namely task-tracker executor. - * - Related with the previous item, COPY command - * cannot be mixed with local execution in a transaction. The implication of that is any - * part of INSERT..SELECT via coordinator cannot happen via the local execution. + * - Related with the previous item, COPY command cannot be mixed with local + * execution in a transaction. The implication of that is any part of INSERT..SELECT + * via coordinator cannot happen via the local execution. */ #include "postgres.h" #include "miscadmin.h" @@ -215,9 +221,10 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) /* - * ExtractParametersForLocalExecution extracts parameter types and values from - * the given ParamListInfo structure, and fills parameter type and value arrays. - * It does not change the oid of custom types, because the query will be run locally. + * ExtractParametersForLocalExecution extracts parameter types and values + * from the given ParamListInfo structure, and fills parameter type and + * value arrays. It does not change the oid of custom types, because the + * query will be run locally. */ static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterTypes, @@ -272,8 +279,8 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, List *localTaskPlacementList = NULL; List *remoteTaskPlacementList = NULL; - SplitLocalAndRemotePlacements(task->taskPlacementList, &localTaskPlacementList, - &remoteTaskPlacementList); + SplitLocalAndRemotePlacements( + task->taskPlacementList, &localTaskPlacementList, &remoteTaskPlacementList); /* either the local or the remote should be non-nil */ Assert(!(localTaskPlacementList == NIL && remoteTaskPlacementList == NIL)); @@ -281,9 +288,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, if (list_length(task->taskPlacementList) == 1) { /* - * At this point, the task has a single placement (e.g,. anchor shard is - * distributed table's shard). So, it is either added to local or remote - * taskList. + * At this point, the task has a single placement (e.g,. anchor shard + * is distributed table's shard). So, it is either added to local or + * remote taskList. */ if (localTaskPlacementList == NIL) { @@ -297,10 +304,10 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, else { /* - * At this point, we're dealing with reference tables or intermediate results - * where the task has placements on both local and remote nodes. We always - * prefer to use local placement, and require remote placements only for - * modifications. + * At this point, we're dealing with reference tables or intermediate + * results where the task has placements on both local and remote + * nodes. We always prefer to use local placement, and require remote + * placements only for modifications. */ task->partiallyLocalOrRemote = true; @@ -326,9 +333,9 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, /* - * SplitLocalAndRemotePlacements is a helper function which iterates over the input - * taskPlacementList and puts the placements into corresponding list of either - * localTaskPlacementList or remoteTaskPlacementList. + * SplitLocalAndRemotePlacements is a helper function which iterates over the + * input taskPlacementList and puts the placements into corresponding list of + * either localTaskPlacementList or remoteTaskPlacementList. */ static void SplitLocalAndRemotePlacements(List *taskPlacementList, List **localTaskPlacementList, @@ -433,8 +440,8 @@ ShouldExecuteTasksLocally(List *taskList) /* * TODO: A future improvement could be to keep track of which placements - * have been locally executed. At this point, only use local execution for - * those placements. That'd help to benefit more from parallelism. + * have been locally executed. At this point, only use local execution + * for those placements. That'd help to benefit more from parallelism. */ return true; @@ -446,14 +453,14 @@ ShouldExecuteTasksLocally(List *taskList) { /* * This is the valuable time to use the local execution. We are likely - * to avoid any network round-trips by simply executing the command within - * this session. + * to avoid any network round-trips by simply executing the command + * within this session. * * We cannot avoid network round trips if the task is not a read only * task and accesses multiple placements. For example, modifications to * distributed tables (with replication factor == 1) would avoid network - * round-trips. However, modifications to reference tables still needs to - * go to over the network to do the modification on the other placements. + * round-trips. However, modifications to reference tables still needs + * to go to over the network to do the modification on the other placements. * Still, we'll be avoding the network round trip for this node. * * Note that we shouldn't use local execution if any distributed execution @@ -466,10 +473,10 @@ ShouldExecuteTasksLocally(List *taskList) if (!singleTask) { /* - * For multi-task executions, switching to local execution would likely to - * perform poorly, because we'd lose the parallelism. Note that the local - * execution is happening one task at a time (e.g., similar to sequential - * distributed execution). + * For multi-task executions, switching to local execution would likely + * to perform poorly, because we'd lose the parallelism. Note that the + * local execution is happening one task at a time (e.g., similar to + * sequential distributed execution). */ Assert(!TransactionAccessedLocalPlacement); @@ -481,8 +488,8 @@ ShouldExecuteTasksLocally(List *taskList) /* - * TaskAccessesLocalNode returns true if any placements of the task reside on the - * node that we're executing the query. + * TaskAccessesLocalNode returns true if any placements of the task reside on + * the node that we're executing the query. */ bool TaskAccessesLocalNode(Task *task) @@ -503,24 +510,26 @@ TaskAccessesLocalNode(Task *task) /* - * ErrorIfTransactionAccessedPlacementsLocally() errors out if a local query on any shard - * has already been executed in the same transaction. + * ErrorIfTransactionAccessedPlacementsLocally errors out if a local query + * on any shard has already been executed in the same transaction. * - * This check is required because Citus currently hasn't implemented local execution - * infrastructure for all the commands/executors. As we implement local execution for - * the command/executor that this function call exists, we should simply remove the check. + * This check is required because Citus currently hasn't implemented local + * execution infrastructure for all the commands/executors. As we implement + * local execution for the command/executor that this function call exists, + * we should simply remove the check. */ void ErrorIfTransactionAccessedPlacementsLocally(void) { if (TransactionAccessedLocalPlacement) { - ereport(ERROR, (errmsg("cannot execute command because a local execution has " - "accessed a placement in the transaction"), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.enable_local_execution TO OFF;\""), - errdetail("Some parallel commands cannot be executed if a " - "previous command has already been executed locally"))); + ereport(ERROR, + (errmsg("cannot execute command because a local execution has " + "accessed a placement in the transaction"), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.enable_local_execution TO OFF;\""), + errdetail("Some parallel commands cannot be executed if a " + "previous command has already been executed locally"))); } } From cf718ffe77b2c6fb9ae820c004161e87f1e0c7d1 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Sat, 29 Feb 2020 14:53:48 +0300 Subject: [PATCH 2/3] safely error out in DistributedTableCacheEntry function --- src/backend/distributed/metadata/metadata_cache.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index c4f1ba037..bc4204fe1 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -790,7 +790,16 @@ DistributedTableCacheEntry(Oid distributedRelationId) else { char *relationName = get_rel_name(distributedRelationId); - ereport(ERROR, (errmsg("relation %s is not distributed", relationName))); + + if (relationName == NULL) + { + ereport(ERROR, (errmsg("relation with OID %u does not exist", + distributedRelationId))); + } + else + { + ereport(ERROR, (errmsg("relation %s is not distributed", relationName))); + } } } From ff9c9d1808db2082f4a6cb41b9fd021f66ed735d Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Sat, 29 Feb 2020 14:54:39 +0300 Subject: [PATCH 3/3] make VacuumTaskList even with other taskList functions and some safety changes Makees VacuumTaskList function even with other TaskList creator functions. Also, previously we were generating per-shard vacuum command strings via unconventional usage of StringInfo struct (setting the stringInfo->len field manually) which could cause unexepected memory errors (that I cannot foresee now). --- src/backend/distributed/commands/vacuum.c | 39 +++++++++++++---------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 6574d5da6..7b8d5d057 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -44,7 +44,7 @@ typedef struct CitusVacuumParams static bool IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList); static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList); -static StringInfo DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams); +static char * DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams); static char * DeparseVacuumColumnNames(List *columnNameList); static List * VacuumColumnList(VacuumStmt *vacuumStmt, int relationIndex); static List * ExtractVacuumTargetRels(VacuumStmt *vacuumStmt); @@ -182,15 +182,17 @@ IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList) static List * VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColumnList) { + /* resulting task list */ List *taskList = NIL; - uint64 jobId = INVALID_JOB_ID; + + /* enumerate the tasks when putting them to the taskList */ int taskId = 1; - StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumParams); - const int vacuumPrefixLen = vacuumString->len; + Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); - char *tableName = get_rel_name(relationId); + char *relationName = get_rel_name(relationId); + const char *vacuumStringPrefix = DeparseVacuumStmtPrefix(vacuumParams); const char *columnNames = DeparseVacuumColumnNames(vacuumColumnList); /* @@ -210,20 +212,25 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; + char *shardRelationName = pstrdup(relationName); - char *shardName = pstrdup(tableName); - AppendShardIdToName(&shardName, shardInterval->shardId); - shardName = quote_qualified_identifier(schemaName, shardName); + /* build shard relation name */ + AppendShardIdToName(&shardRelationName, shardId); - vacuumString->len = vacuumPrefixLen; - appendStringInfoString(vacuumString, shardName); - appendStringInfoString(vacuumString, columnNames); + char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); + + /* copy base vacuum string and build the shard specific command */ + StringInfo vacuumStringForShard = makeStringInfo(); + appendStringInfoString(vacuumStringForShard, vacuumStringPrefix); + + appendStringInfoString(vacuumStringForShard, quotedShardName); + appendStringInfoString(vacuumStringForShard, columnNames); Task *task = CitusMakeNode(Task); - task->jobId = jobId; + task->jobId = INVALID_JOB_ID; task->taskId = taskId++; task->taskType = VACUUM_ANALYZE_TASK; - SetTaskQueryString(task, pstrdup(vacuumString->data)); + SetTaskQueryString(task, vacuumStringForShard->data); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; @@ -242,7 +249,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum * reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE * statements. */ -static StringInfo +static char * DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) { int vacuumFlags = vacuumParams.options; @@ -276,7 +283,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) #endif ) { - return vacuumPrefix; + return vacuumPrefix->data; } /* otherwise, handle options */ @@ -334,7 +341,7 @@ DeparseVacuumStmtPrefix(CitusVacuumParams vacuumParams) appendStringInfoChar(vacuumPrefix, ' '); - return vacuumPrefix; + return vacuumPrefix->data; }