Merge pull request #3461 from citusdata/fix-adaptive-repartition-join-leak

Fix adaptive repartition join leak
pull/3473/head
Philip Dubé 2020-02-05 17:43:23 +00:00 committed by GitHub
commit 345455d765
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 47 additions and 41 deletions

View File

@ -189,7 +189,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task), ExecuteTaskListExtended(ROW_MODIFY_NONE, list_make1(task),
tupleDesc, tupleStore, hasReturning, tupleDesc, tupleStore, hasReturning,
MaxAdaptiveExecutorPoolSize, MaxAdaptiveExecutorPoolSize,
&xactProperties); &xactProperties, NIL);
while (tuplestore_gettupleslot(tupleStore, true, false, slot)) while (tuplestore_gettupleslot(tupleStore, true, false, slot))
{ {

View File

@ -353,7 +353,7 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
if (flags & OUTSIDE_TRANSACTION) if (flags & OUTSIDE_TRANSACTION)
{ {
/* dont return connections that are used in transactions */ /* don't return connections that are used in transactions */
if (connection->remoteTransaction.transactionState != if (connection->remoteTransaction.transactionState !=
REMOTE_TRANS_NOT_STARTED) REMOTE_TRANS_NOT_STARTED)
{ {

View File

@ -543,14 +543,15 @@ typedef struct TaskPlacementExecution
/* local functions */ /* local functions */
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList, bool List *taskList,
hasReturning, bool hasReturning,
ParamListInfo paramListInfo, ParamListInfo paramListInfo,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
int targetPoolSize, int targetPoolSize,
TransactionProperties * TransactionProperties *
xactProperties); xactProperties,
List *jobIdList);
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
modLevel, modLevel,
List *taskList, List *taskList,
@ -646,7 +647,7 @@ AdaptiveExecutor(CitusScanState *scanState)
/* /*
* PostgreSQL takes locks on all partitions in the executor. It's not entirely * PostgreSQL takes locks on all partitions in the executor. It's not entirely
* clear why this is necessary (instead of locking the parent during DDL), but * clear why this is necessary (instead of locking the parent during DDL), but
* We do the same for consistency. * we do the same for consistency.
*/ */
LockPartitionsForDistributedPlan(distributedPlan); LockPartitionsForDistributedPlan(distributedPlan);
@ -680,7 +681,8 @@ AdaptiveExecutor(CitusScanState *scanState)
tupleDescriptor, tupleDescriptor,
scanState->tuplestorestate, scanState->tuplestorestate,
targetPoolSize, targetPoolSize,
&xactProperties); &xactProperties,
jobIdList);
/* /*
* Make sure that we acquire the appropriate locks even if the local tasks * Make sure that we acquire the appropriate locks even if the local tasks
@ -803,24 +805,24 @@ ExecuteUtilityTaskListWithoutResults(List *taskList)
/* /*
* ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults * ExecuteTaskListOutsideTransaction is a proxy to ExecuteTaskListExtended
* for some of the arguments for a repartition query. * with defaults for some of the arguments.
*/ */
uint64 uint64
ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
targetPoolSize) int targetPoolSize, List *jobIdList)
{ {
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = NULL; Tuplestorestate *tupleStore = NULL;
bool hasReturning = false; bool hasReturning = false;
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( TransactionProperties xactProperties =
modLevel, taskList, true); DecideTransactionPropertiesForTaskList(modLevel, taskList, true);
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, jobIdList);
} }
@ -840,7 +842,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, NIL);
} }
@ -860,7 +862,7 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
tupleStore, hasReturning, targetPoolSize, tupleStore, hasReturning, targetPoolSize,
&xactProperties); &xactProperties, NIL);
} }
@ -872,7 +874,7 @@ uint64
ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList, ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize, bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties) TransactionProperties *xactProperties, List *jobIdList)
{ {
ParamListInfo paramListInfo = NULL; ParamListInfo paramListInfo = NULL;
@ -890,7 +892,7 @@ ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
DistributedExecution *execution = DistributedExecution *execution =
CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo, CreateDistributedExecution(modLevel, taskList, hasReturning, paramListInfo,
tupleDescriptor, tupleStore, targetPoolSize, tupleDescriptor, tupleStore, targetPoolSize,
xactProperties); xactProperties, jobIdList);
StartDistributedExecution(execution); StartDistributedExecution(execution);
RunDistributedExecution(execution); RunDistributedExecution(execution);
@ -909,7 +911,7 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
bool hasReturning, bool hasReturning,
ParamListInfo paramListInfo, TupleDesc tupleDescriptor, ParamListInfo paramListInfo, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, int targetPoolSize, Tuplestorestate *tupleStore, int targetPoolSize,
TransactionProperties *xactProperties) TransactionProperties *xactProperties, List *jobIdList)
{ {
DistributedExecution *execution = DistributedExecution *execution =
(DistributedExecution *) palloc0(sizeof(DistributedExecution)); (DistributedExecution *) palloc0(sizeof(DistributedExecution));
@ -941,6 +943,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->connectionSetChanged = false; execution->connectionSetChanged = false;
execution->waitFlagsChanged = false; execution->waitFlagsChanged = false;
execution->jobIdList = jobIdList;
/* allocate execution specific data once, on the ExecutorState memory context */ /* allocate execution specific data once, on the ExecutorState memory context */
if (tupleDescriptor != NULL) if (tupleDescriptor != NULL)
{ {

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* directed_acylic_graph_execution_logic.c * directed_acyclic_graph_execution_logic.c
* *
* Logic to run tasks in their dependency order. * Logic to run tasks in their dependency order.
* *
@ -11,7 +11,7 @@
#include "access/hash.h" #include "access/hash.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/directed_acylic_graph_execution.h" #include "distributed/directed_acyclic_graph_execution.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -51,7 +51,7 @@ static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks);
* execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize. * execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize.
*/ */
void void
ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds)
{ {
HTAB *completedTasks = CreateTaskHashTable(); HTAB *completedTasks = CreateTaskHashTable();
@ -66,7 +66,7 @@ ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks)
break; break;
} }
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks,
MaxAdaptiveExecutorPoolSize); MaxAdaptiveExecutorPoolSize, jobIds);
AddCompletedTasks(curTasks, completedTasks); AddCompletedTasks(curTasks, completedTasks);
curTasks = NIL; curTasks = NIL;

View File

@ -413,7 +413,8 @@ ExecuteSelectTasksIntoTupleStore(List *taskList, TupleDesc resultDescriptor,
work_mem); work_mem);
ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor, ExecuteTaskListExtended(ROW_MODIFY_READONLY, taskList, resultDescriptor,
resultStore, hasReturning, targetPoolSize, &xactProperties); resultStore, hasReturning, targetPoolSize, &xactProperties,
NIL);
return resultStore; return resultStore;
} }

View File

@ -16,8 +16,8 @@
* *
* *
* Repartition queries do not begin a transaction even if we are in * Repartition queries do not begin a transaction even if we are in
* a transaction block. As we dont begin a transaction, they wont see the * a transaction block. As we don't begin a transaction, they won't see the
* DDLs that happened earlier in the transaction because we dont have that * DDLs that happened earlier in the transaction because we don't have that
* transaction id with repartition queries. Therefore we error in this case. * transaction id with repartition queries. Therefore we error in this case.
* *
* Copyright (c) Citus Data, Inc. * Copyright (c) Citus Data, Inc.
@ -29,7 +29,7 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "distributed/hash_helpers.h" #include "distributed/hash_helpers.h"
#include "distributed/directed_acylic_graph_execution.h" #include "distributed/directed_acyclic_graph_execution.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/adaptive_executor.h" #include "distributed/adaptive_executor.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -68,7 +68,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob);
ExecuteTasksInDependencyOrder(allTasks, topLevelTasks); ExecuteTasksInDependencyOrder(allTasks, topLevelTasks, jobIds);
return jobIds; return jobIds;
} }

View File

@ -238,7 +238,7 @@ JoinOnColumns(Var *currentColumn, Var *candidateColumn, List *joinClauseList)
/* /*
* Check if both join columns and both partition key columns match, since the * Check if both join columns and both partition key columns match, since the
* current and candidate column's can't be NULL we know they wont match if either * current and candidate column's can't be NULL we know they won't match if either
* of the columns resolved to NULL above. * of the columns resolved to NULL above.
*/ */
if (equal(leftColumn, currentColumn) && if (equal(leftColumn, currentColumn) &&

View File

@ -10,11 +10,10 @@ extern int MaxAdaptiveExecutorPoolSize;
/* GUC, number of ms to wait between opening connections to the same worker */ /* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList,
targetPoolSize); int targetPoolSize);
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
int int targetPoolSize, List *jobIdList);
targetPoolSize);
#endif /* ADAPTIVE_EXECUTOR_H */ #endif /* ADAPTIVE_EXECUTOR_H */

View File

@ -1,20 +1,21 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* directed_acylic_graph_execution.h * directed_acyclic_graph_execution.h
* Execution logic for directed acylic graph tasks. * Execution logic for directed acyclic graph tasks.
* *
* Copyright (c) Citus Data, Inc. * Copyright (c) Citus Data, Inc.
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef DIRECTED_ACYLIC_GRAPH_EXECUTION_H #ifndef DIRECTED_ACYCLIC_GRAPH_EXECUTION_H
#define DIRECTED_ACYLIC_GRAPH_EXECUTION_H #define DIRECTED_ACYCLIC_GRAPH_EXECUTION_H
#include "postgres.h" #include "postgres.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks,
List *jobIds);
#endif /* DIRECTED_ACYLIC_GRAPH_EXECUTION_H */ #endif /* DIRECTED_ACYCLIC_GRAPH_EXECUTION_H */

View File

@ -77,7 +77,8 @@ extern uint64 ExecuteTaskListExtended(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,
bool hasReturning, int targetPoolSize, bool hasReturning, int targetPoolSize,
TransactionProperties *xactProperties); TransactionProperties *xactProperties,
List *jobIdList);
extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, extern uint64 ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
TupleDesc tupleDescriptor, TupleDesc tupleDescriptor,
Tuplestorestate *tupleStore, Tuplestorestate *tupleStore,

View File

@ -89,7 +89,7 @@ How the citus upgrade test work:
Note that when the version of citus changes, we should update `MASTER_VERSION` with the new version of citus otherwise that will be outdated and it will fail. Note that when the version of citus changes, we should update `MASTER_VERSION` with the new version of citus otherwise that will be outdated and it will fail.
There is a target for citus upgrade test. We run citus upgrade tests both in normal mode and in mixed mode. In mixed mode, we dont upgrade one of the workers. `'citus.enable_version_checks' : 'false'` is used to prevent citus from giving an error for mixed mode. There is a target for citus upgrade test. We run citus upgrade tests both in normal mode and in mixed mode. In mixed mode, we don't upgrade one of the workers. `'citus.enable_version_checks' : 'false'` is used to prevent citus from giving an error for mixed mode.
To see full command list: To see full command list: