From 3c99db40b9087dd7cc03d57742022b6758258afd Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 21 Feb 2020 14:39:07 +0300 Subject: [PATCH] Some small typos & cleanup --- .../distributed/executor/adaptive_executor.c | 10 ++-- .../distributed/executor/local_executor.c | 60 +++++++++---------- .../distributed/master/master_truncate.c | 24 +++++--- src/include/distributed/local_executor.h | 8 +-- 4 files changed, 53 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 71c3f6e29..b8e706226 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -37,9 +37,9 @@ * same connection since it may hold relevant locks or have uncommitted * writes. In that case we "assign" the task to a connection by adding * it to the task queue of specific connection (in - * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task unassigned - * and add it to the task queue of a worker pool, which means that it - * can be executed over any connection in the pool. + * AssignTasksToConnectionsOrWorkerPool ). Otherwise we consider the task + * unassigned and add it to the task queue of a worker pool, which means + * that it can be executed over any connection in the pool. * * A task may be executed on multiple placements in case of a reference * table or a replicated distributed table. Depending on the type of @@ -772,7 +772,7 @@ RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) uint64 rowsProcessed = ExecuteLocalTaskList(scanState, execution->localTaskList); /* - * We're deliberately not setting execution->rowsProceessed here. The main reason + * We're deliberately not setting execution->rowsProcessed here. The main reason * is that modifications to reference tables would end-up setting it both here * and in AdaptiveExecutor. Instead, we set executorState here and skip updating it * for reference table modifications in AdaptiveExecutor. @@ -884,7 +884,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ParamListInfo paramListInfo = NULL; /* - * The code-paths that rely on this function do not know how execute + * The code-paths that rely on this function do not know how to execute * commands locally. */ ErrorIfTransactionAccessedPlacementsLocally(); diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7c60049a4..2006a6261 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -134,7 +134,8 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) if (paramListInfo != NULL) { - const char **parameterValues = NULL; /* not used anywhere, so decleare here */ + /* not used anywhere, so declare here */ + const char **parameterValues = NULL; ExtractParametersForLocalExecution(paramListInfo, ¶meterTypes, ¶meterValues); @@ -228,6 +229,25 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT } +/* + * LogLocalCommand logs commands executed locally on this node. Although we're + * talking about local execution, the function relies on citus.log_remote_commands + * GUC. This makes sense because the local execution is still on a shard of a + * distributed table, meaning it is part of distributed execution. + */ +static void +LogLocalCommand(Task *task) +{ + if (!(LogRemoteCommands || LogLocalCommands)) + { + return; + } + + ereport(NOTICE, (errmsg("executing the command locally: %s", + ApplyLogRedaction(TaskQueryString(task))))); +} + + /* * ExtractLocalAndRemoteTasks gets a taskList and generates two * task lists namely localTaskList and remoteTaskList. The function goes @@ -236,9 +256,9 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT * the lists could be NIL depending on the input taskList. * * One slightly different case is modifications to replicated tables - * (e.g., reference tables) where a single task ends in two seperate tasks - * and the local task is added to localTaskList and the remanings to the - * remoteTaskList. + * (e.g., reference tables) where a single task ends in two separate tasks + * and the local task is added to localTaskList and the remaning ones to + * the remoteTaskList. */ void ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, @@ -280,8 +300,6 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - Task *remoteTask = NULL; - /* * At this point, we're dealing with reference tables or intermediate results * where the task has placements on both local and remote nodes. We always @@ -301,7 +319,7 @@ ExtractLocalAndRemoteTasks(bool readOnly, List *taskList, List **localTaskList, } else { - remoteTask = copyObject(task); + Task *remoteTask = copyObject(task); remoteTask->taskPlacementList = remoteTaskPlacementList; *remoteTaskList = lappend(*remoteTaskList, remoteTask); @@ -353,7 +371,7 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que { EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - DestReceiver *tupleStoreDestReceiever = CreateDestReceiver(DestTuplestore); + DestReceiver *tupleStoreDestReceiver = CreateDestReceiver(DestTuplestore); ScanDirection scanDirection = ForwardScanDirection; QueryEnvironment *queryEnv = create_queryEnv(); int eflags = 0; @@ -363,14 +381,14 @@ ExecuteLocalTaskPlan(CitusScanState *scanState, PlannedStmt *taskPlan, char *que * Use the tupleStore provided by the scanState because it is shared accross * the other task executions and the adaptive executor. */ - SetTuplestoreDestReceiverParams(tupleStoreDestReceiever, + SetTuplestoreDestReceiverParams(tupleStoreDestReceiver, scanState->tuplestorestate, CurrentMemoryContext, false); /* Create a QueryDesc for the query */ QueryDesc *queryDesc = CreateQueryDesc(taskPlan, queryString, GetActiveSnapshot(), InvalidSnapshot, - tupleStoreDestReceiever, paramListInfo, + tupleStoreDestReceiver, paramListInfo, queryEnv, 0); ExecutorStart(queryDesc, eflags); @@ -430,6 +448,7 @@ ShouldExecuteTasksLocally(List *taskList) } bool singleTask = (list_length(taskList) == 1); + if (singleTask && TaskAccessesLocalNode((Task *) linitial(taskList))) { /* @@ -455,7 +474,7 @@ ShouldExecuteTasksLocally(List *taskList) { /* * For multi-task executions, switching to local execution would likely to - * perform poorly, because we'd lose the parallelizm. Note that the local + * perform poorly, because we'd lose the parallelism. Note that the local * execution is happening one task at a time (e.g., similar to sequential * distributed execution). */ @@ -515,25 +534,6 @@ ErrorIfTransactionAccessedPlacementsLocally(void) } -/* - * LogLocalCommand logs commands executed locally on this node. Although we're - * talking about local execution, the function relies on citus.log_remote_commands GUC. - * This makes sense because the local execution is still on a shard of a distributed table, - * meaning it is part of distributed execution. - */ -static void -LogLocalCommand(Task *task) -{ - if (!(LogRemoteCommands || LogLocalCommands)) - { - return; - } - - ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); -} - - /* * DisableLocalExecution simply a C interface for * setting the following: diff --git a/src/backend/distributed/master/master_truncate.c b/src/backend/distributed/master/master_truncate.c index 94de213cf..8647eaf5b 100644 --- a/src/backend/distributed/master/master_truncate.c +++ b/src/backend/distributed/master/master_truncate.c @@ -27,7 +27,6 @@ #include "utils/lsyscache.h" #include "utils/rel.h" - static List * TruncateTaskList(Oid relationId); @@ -89,29 +88,36 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) static List * TruncateTaskList(Oid relationId) { - List *shardIntervalList = LoadShardIntervalList(relationId); - ListCell *shardIntervalCell = NULL; + /* resulting task list */ List *taskList = NIL; + + /* enumerate the tasks when putting them to the taskList */ int taskId = 1; Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); char *relationName = get_rel_name(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + /* lock metadata before getting placement lists */ LockShardListMetadata(shardIntervalList, ShareLock); + ListCell *shardIntervalCell = NULL; + foreach(shardIntervalCell, shardIntervalList) { ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); uint64 shardId = shardInterval->shardId; + char *shardRelationName = pstrdup(relationName); + + /* build shard relation name */ + AppendShardIdToName(&shardRelationName, shardId); + + char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); + StringInfo shardQueryString = makeStringInfo(); - char *shardName = pstrdup(relationName); - - AppendShardIdToName(&shardName, shardId); - - appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", - quote_qualified_identifier(schemaName, shardName)); + appendStringInfo(shardQueryString, "TRUNCATE TABLE %s CASCADE", quotedShardName); Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 1a0d8ff24..98e69bf97 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -20,16 +20,14 @@ extern bool LogLocalCommands; extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; +/* extern function declarations */ extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, List **localTaskList, List **remoteTaskList); extern bool ShouldExecuteTasksLocally(List *taskList); +extern bool AnyTaskAccessesLocalNode(List *taskList); +extern bool TaskAccessesLocalNode(Task *task); extern void ErrorIfTransactionAccessedPlacementsLocally(void); -extern void SetTaskQueryAndPlacementList(Task *task, Query *query, List *placementList); -extern char * TaskQueryString(Task *task); -extern bool TaskAccessesLocalNode(Task *task); extern void DisableLocalExecution(void); -extern bool AnyTaskAccessesRemoteNode(List *taskList); -extern bool TaskAccessesLocalNode(Task *task); #endif /* LOCAL_EXECUTION_H */