From c25281188424b547e76f8236c48d5fb4ce282ecf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 3 Feb 2020 19:13:39 +0000 Subject: [PATCH 1/2] dont: don't, wont: won't, acylic: acyclic --- .../distributed/connection/connection_management.c | 2 +- src/backend/distributed/executor/adaptive_executor.c | 2 +- ..._execution.c => directed_acyclic_graph_execution.c} | 4 ++-- .../distributed/executor/repartition_join_execution.c | 6 +++--- src/backend/distributed/planner/multi_join_order.c | 2 +- ..._execution.h => directed_acyclic_graph_execution.h} | 10 +++++----- src/test/regress/upgrade/README.md | 2 +- 7 files changed, 14 insertions(+), 14 deletions(-) rename src/backend/distributed/executor/{directed_acylic_graph_execution.c => directed_acyclic_graph_execution.c} (98%) rename src/include/distributed/{directed_acylic_graph_execution.h => directed_acyclic_graph_execution.h} (59%) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index bb2352af5..f9b08c50d 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -353,7 +353,7 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) if (flags & OUTSIDE_TRANSACTION) { - /* dont return connections that are used in transactions */ + /* don't return connections that are used in transactions */ if (connection->remoteTransaction.transactionState != REMOTE_TRANS_NOT_STARTED) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2b271a14e..16f5fc15a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -646,7 +646,7 @@ AdaptiveExecutor(CitusScanState *scanState) /* * PostgreSQL takes locks on all partitions in the executor. It's not entirely * clear why this is necessary (instead of locking the parent during DDL), but - * We do the same for consistency. + * we do the same for consistency. */ LockPartitionsForDistributedPlan(distributedPlan); diff --git a/src/backend/distributed/executor/directed_acylic_graph_execution.c b/src/backend/distributed/executor/directed_acyclic_graph_execution.c similarity index 98% rename from src/backend/distributed/executor/directed_acylic_graph_execution.c rename to src/backend/distributed/executor/directed_acyclic_graph_execution.c index d8f31d47e..d4aeb6659 100644 --- a/src/backend/distributed/executor/directed_acylic_graph_execution.c +++ b/src/backend/distributed/executor/directed_acyclic_graph_execution.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * directed_acylic_graph_execution_logic.c + * directed_acyclic_graph_execution_logic.c * * Logic to run tasks in their dependency order. * @@ -11,7 +11,7 @@ #include "access/hash.h" #include "distributed/hash_helpers.h" -#include "distributed/directed_acylic_graph_execution.h" +#include "distributed/directed_acyclic_graph_execution.h" #include "distributed/multi_physical_planner.h" #include "distributed/adaptive_executor.h" #include "distributed/worker_manager.h" diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 41db0cbb8..556acba7b 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -16,8 +16,8 @@ * * * Repartition queries do not begin a transaction even if we are in - * a transaction block. As we dont begin a transaction, they wont see the - * DDLs that happened earlier in the transaction because we dont have that + * a transaction block. As we don't begin a transaction, they won't see the + * DDLs that happened earlier in the transaction because we don't have that * transaction id with repartition queries. Therefore we error in this case. * * Copyright (c) Citus Data, Inc. @@ -29,7 +29,7 @@ #include "utils/builtins.h" #include "distributed/hash_helpers.h" -#include "distributed/directed_acylic_graph_execution.h" +#include "distributed/directed_acyclic_graph_execution.h" #include "distributed/multi_physical_planner.h" #include "distributed/adaptive_executor.h" #include "distributed/worker_manager.h" diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index c976d40cd..81a35c181 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -238,7 +238,7 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList) /* * Check if both join columns and both partition key columns match, since the - * current and candidate column's can't be NULL we know they wont match if either + * current and candidate column's can't be NULL we know they won't match if either * of the columns resolved to NULL above. */ if (equal(leftColumn, currentColumn) && diff --git a/src/include/distributed/directed_acylic_graph_execution.h b/src/include/distributed/directed_acyclic_graph_execution.h similarity index 59% rename from src/include/distributed/directed_acylic_graph_execution.h rename to src/include/distributed/directed_acyclic_graph_execution.h index be6987564..bf6b74e6c 100644 --- a/src/include/distributed/directed_acylic_graph_execution.h +++ b/src/include/distributed/directed_acyclic_graph_execution.h @@ -1,14 +1,14 @@ /*------------------------------------------------------------------------- * - * directed_acylic_graph_execution.h - * Execution logic for directed acylic graph tasks. + * directed_acyclic_graph_execution.h + * Execution logic for directed acyclic graph tasks. * * Copyright (c) Citus Data, Inc. *------------------------------------------------------------------------- */ -#ifndef DIRECTED_ACYLIC_GRAPH_EXECUTION_H -#define DIRECTED_ACYLIC_GRAPH_EXECUTION_H +#ifndef DIRECTED_ACYCLIC_GRAPH_EXECUTION_H +#define DIRECTED_ACYCLIC_GRAPH_EXECUTION_H #include "postgres.h" @@ -17,4 +17,4 @@ extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); -#endif /* DIRECTED_ACYLIC_GRAPH_EXECUTION_H */ +#endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */ diff --git a/src/test/regress/upgrade/README.md b/src/test/regress/upgrade/README.md index 0a7988888..73413949c 100644 --- a/src/test/regress/upgrade/README.md +++ b/src/test/regress/upgrade/README.md @@ -89,7 +89,7 @@ How the citus upgrade test work: Note that when the version of citus changes, we should update `MASTER_VERSION` with the new version of citus otherwise that will be outdated and it will fail. -There is a target for citus upgrade test. We run citus upgrade tests both in normal mode and in mixed mode. In mixed mode, we dont upgrade one of the workers. `'citus.enable_version_checks' : 'false'` is used to prevent citus from giving an error for mixed mode. +There is a target for citus upgrade test. We run citus upgrade tests both in normal mode and in mixed mode. In mixed mode, we don't upgrade one of the workers. `'citus.enable_version_checks' : 'false'` is used to prevent citus from giving an error for mixed mode. To see full command list: From ecad4aa5e6fab8077df16dda4c794b1be93c05bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Mon, 3 Feb 2020 22:08:08 +0000 Subject: [PATCH 2/2] Fill in jobIdList field of DistributedExecution Pass down jobIdList from ExecuteTasksInDependencyOrder Also clean up comment for ExecuteTaskListOutsideTransaction --- src/backend/distributed/commands/call.c | 2 +- .../distributed/executor/adaptive_executor.c | 36 ++++++++++--------- .../directed_acyclic_graph_execution.c | 4 +-- .../distributed_intermediate_results.c | 3 +- .../executor/repartition_join_execution.c | 2 +- src/include/distributed/adaptive_executor.h | 7 ++-- .../directed_acyclic_graph_execution.h | 3 +- src/include/distributed/multi_executor.h | 3 +- 8 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index d9d25c4c0..c6187da75 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), tupleDesc, tupleStore, hasReturning, MaxAdaptiveExecutorPoolSize, - &xactProperties); + &xactProperties, NIL); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 16f5fc15a..415c26d2a 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -543,14 +543,15 @@ typedef struct TaskPlacementExecution /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, - List *taskList, bool - hasReturning, + List *taskList, + bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, TransactionProperties * - xactProperties); + xactProperties, + List *jobIdList); static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, @@ -680,7 +681,8 @@ AdaptiveExecutor(CitusScanState *scanState) tupleDescriptor, scanState->tuplestorestate, targetPoolSize, - &xactProperties); + &xactProperties, + jobIdList); /* * Make sure that we acquire the appropriate locks even if the local tasks @@ -803,24 +805,24 @@ ExecuteUtilityTaskListWithoutResults(List *taskList) /* - * ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults - * for some of the arguments for a repartition query. + * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended + * with defaults for some of the arguments. */ uint64 -ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int - targetPoolSize) +ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, + int targetPoolSize, List *jobIdList) { TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = NULL; bool hasReturning = false; - TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( - modLevel, taskList, true); + TransactionProperties xactProperties = + DecideTransactionPropertiesForTaskList(modLevel, taskList, true); return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, jobIdList); } @@ -840,7 +842,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, NIL); } @@ -860,7 +862,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, - &xactProperties); + &xactProperties, NIL); } @@ -872,7 +874,7 @@ uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties) + TransactionProperties *xactProperties, List *jobIdList) { ParamListInfo paramListInfo = NULL; @@ -890,7 +892,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, DistributedExecution *execution = CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, tupleDescriptor, tupleStore, targetPoolSize, - xactProperties); + xactProperties, jobIdList); StartDistributedExecution(execution); RunDistributedExecution(execution); @@ -909,7 +911,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, int targetPoolSize, - TransactionProperties *xactProperties) + TransactionProperties *xactProperties, List *jobIdList) { DistributedExecution *execution = (DistributedExecution *) palloc0(sizeof(DistributedExecution)); @@ -941,6 +943,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, execution->connectionSetChanged = false; execution->waitFlagsChanged = false; + execution->jobIdList = jobIdList; + /* allocate execution specific data once, on the ExecutorState memory context */ if (tupleDescriptor != NULL) { diff --git a/src/backend/distributed/executor/directed_acyclic_graph_execution.c b/src/backend/distributed/executor/directed_acyclic_graph_execution.c index d4aeb6659..9c177fa95 100644 --- a/src/backend/distributed/executor/directed_acyclic_graph_execution.c +++ b/src/backend/distributed/executor/directed_acyclic_graph_execution.c @@ -51,7 +51,7 @@ static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks); * execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize. */ void -ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) +ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds) { HTAB *completedTasks = CreateTaskHashTable(); @@ -66,7 +66,7 @@ ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) break; } ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, - MaxAdaptiveExecutorPoolSize); + MaxAdaptiveExecutorPoolSize, jobIds); AddCompletedTasks(curTasks, completedTasks); curTasks = NIL; diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index addef3b2a..777e2436c 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -413,7 +413,8 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor, work_mem); ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, - resultStore, hasReturning, targetPoolSize, &xactProperties); + resultStore, hasReturning, targetPoolSize, &xactProperties, + NIL); return resultStore; } diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index 556acba7b..66a15baa9 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -68,7 +68,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); - ExecuteTasksInDependencyOrder(allTasks, topLevelTasks); + ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds); return jobIds; } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h index c9d871839..b55271c4d 100644 --- a/src/include/distributed/adaptive_executor.h +++ b/src/include/distributed/adaptive_executor.h @@ -10,11 +10,10 @@ extern int MaxAdaptiveExecutorPoolSize; /* GUC, number of ms to wait between opening connections to the same worker */ extern int ExecutorSlowStartInterval; -extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int - targetPoolSize); +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, + int targetPoolSize); extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, - int - targetPoolSize); + int targetPoolSize, List *jobIdList); #endif /* ADAPTIVE_EXECUTOR_H */ diff --git a/src/include/distributed/directed_acyclic_graph_execution.h b/src/include/distributed/directed_acyclic_graph_execution.h index bf6b74e6c..a071baf16 100644 --- a/src/include/distributed/directed_acyclic_graph_execution.h +++ b/src/include/distributed/directed_acyclic_graph_execution.h @@ -14,7 +14,8 @@ #include "nodes/pg_list.h" -extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); +extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, + List *jobIds); #endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */ diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index ce1fc51b3..6099b8356 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -77,7 +77,8 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, bool hasReturning, int targetPoolSize, - TransactionProperties *xactProperties); + TransactionProperties *xactProperties, + List *jobIdList); extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,