citus/src/backend/distributed/executor/directed_acyclic_graph_exec...

247 lines
5.9 KiB
C

/*-------------------------------------------------------------------------
*
* directed_acyclic_graph_execution_logic.c
*
* Logic to run tasks in their dependency order.
*
* Copyright (c) Citus Data, Inc.
*/
#include "postgres.h"
#include "access/hash.h"
#include "distributed/hash_helpers.h"
#include "distributed/adaptive_executor.h"
#include "distributed/directed_acyclic_graph_execution.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/transaction_management.h"
#include "distributed/transmit.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
typedef struct TaskHashKey
{
uint64 jobId;
uint32 taskId;
}TaskHashKey;
typedef struct TaskHashEntry
{
TaskHashKey key;
Task *task;
}TaskHashEntry;
static HASHCTL InitHashTableInfo(void);
static HTAB * CreateTaskHashTable(void);
static bool IsAllDependencyCompleted(Task *task, HTAB *completedTasks);
static void AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks);
static List * FindExecutableTasks(List *allTasks, HTAB *completedTasks);
static List * RemoveMergeTasks(List *taskList);
static int TaskHashCompare(const void *key1, const void *key2, Size keysize);
static uint32 TaskHash(const void *key, Size keysize);
static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks);
/*
* ExecuteTasksInDependencyOrder executes the given tasks except the excluded
* tasks in their dependency order. To do so, it iterates all
* the tasks and finds the ones that can be executed at that time, it tries to
* execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize.
*/
void
ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks, List *jobIds)
{
HTAB *completedTasks = CreateTaskHashTable();
/* We only execute depended jobs' tasks, therefore to not execute */
/* top level tasks, we add them to the completedTasks. */
AddCompletedTasks(excludedTasks, completedTasks);
while (true)
{
List *curTasks = FindExecutableTasks(allTasks, completedTasks);
if (list_length(curTasks) == 0)
{
break;
}
/* merge tasks do not need to be executed */
List *executableTasks = RemoveMergeTasks(curTasks);
if (list_length(executableTasks) > 0)
{
ExecuteTaskList(ROW_MODIFY_NONE, executableTasks);
}
AddCompletedTasks(curTasks, completedTasks);
curTasks = NIL;
}
}
/*
* FindExecutableTasks finds the tasks that can be executed currently,
* which means that all of their dependencies are executed. If a task
* is already executed, it is not added to the result.
*/
static List *
FindExecutableTasks(List *allTasks, HTAB *completedTasks)
{
List *curTasks = NIL;
Task *task = NULL;
foreach_ptr(task, allTasks)
{
if (IsAllDependencyCompleted(task, completedTasks) &&
!IsTaskAlreadyCompleted(task, completedTasks))
{
curTasks = lappend(curTasks, task);
}
}
return curTasks;
}
/*
* RemoveMergeTasks returns a copy of taskList that excludes all the
* merge tasks. We do this because merge tasks are currently only a
* logical concept that does not need to be executed.
*/
static List *
RemoveMergeTasks(List *taskList)
{
List *prunedTaskList = NIL;
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (task->taskType != MERGE_TASK)
{
prunedTaskList = lappend(prunedTaskList, task);
}
}
return prunedTaskList;
}
/*
* AddCompletedTasks adds the givens tasks to completedTasks HTAB.
*/
static void
AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks)
{
bool found;
Task *task = NULL;
foreach_ptr(task, curCompletedTasks)
{
TaskHashKey taskKey = { task->jobId, task->taskId };
hash_search(completedTasks, &taskKey, HASH_ENTER, &found);
}
}
/*
* CreateTaskHashTable creates a HTAB with the necessary initialization.
*/
static HTAB *
CreateTaskHashTable()
{
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
HASHCTL info = InitHashTableInfo();
return hash_create("citus task completed list (jobId, taskId)",
64, &info, hashFlags);
}
/*
* IsTaskAlreadyCompleted returns true if the given task
* is found in the completedTasks HTAB.
*/
static bool
IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks)
{
bool found;
TaskHashKey taskKey = { task->jobId, task->taskId };
hash_search(completedTasks, &taskKey, HASH_ENTER, &found);
return found;
}
/*
* IsAllDependencyCompleted return true if the given task's
* dependencies are completed.
*/
static bool
IsAllDependencyCompleted(Task *targetTask, HTAB *completedTasks)
{
bool found = false;
Task *task = NULL;
foreach_ptr(task, targetTask->dependentTaskList)
{
TaskHashKey taskKey = { task->jobId, task->taskId };
hash_search(completedTasks, &taskKey, HASH_FIND, &found);
if (!found)
{
return false;
}
}
return true;
}
/*
* InitHashTableInfo returns hash table info, the hash table is
* configured to be created in the CurrentMemoryContext so that
* it will be cleaned when this memory context gets freed/reset.
*/
static HASHCTL
InitHashTableInfo()
{
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(TaskHashKey);
info.entrysize = sizeof(TaskHashEntry);
info.hash = TaskHash;
info.match = TaskHashCompare;
info.hcxt = CurrentMemoryContext;
return info;
}
static uint32
TaskHash(const void *key, Size keysize)
{
TaskHashKey *taskKey = (TaskHashKey *) key;
uint32 hash = 0;
hash = hash_combine(hash, hash_any((unsigned char *) &taskKey->jobId,
sizeof(int64)));
hash = hash_combine(hash, hash_uint32(taskKey->taskId));
return hash;
}
static int
TaskHashCompare(const void *key1, const void *key2, Size keysize)
{
TaskHashKey *taskKey1 = (TaskHashKey *) key1;
TaskHashKey *taskKey2 = (TaskHashKey *) key2;
if (taskKey1->jobId != taskKey2->jobId || taskKey1->taskId != taskKey2->taskId)
{
return 1;
}
else
{
return 0;
}
}