diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 3cf173860..4e6981fe8 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -24,6 +24,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" #include "distributed/multi_physical_planner.h" +#include "distributed/adaptive_executor.h" #include "distributed/remote_commands.h" #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 03b45e48c..b4965047b 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -351,6 +351,17 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) MultiConnection *connection = dlist_container(MultiConnection, connectionNode, iter.cur); + if (flags & OUTSIDE_TRANSACTION) + { + /* dont return connections that are used in transactions */ + if (connection->remoteTransaction.transactionState != + REMOTE_TRANS_NOT_STARTED) + { + continue; + } + } + + /* don't return claimed connections */ if (connection->claimedExclusively) { /* connection is in use for an ongoing operation */ @@ -949,6 +960,7 @@ static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key) { bool found = false; + static uint64 connectionId = 1; /* search our cache for precomputed connection settings */ ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found); @@ -979,6 +991,7 @@ StartConnectionEstablishment(ConnectionHashKey *key) (const char **) entry->values, false); connection->connectionStart = GetCurrentTimestamp(); + connection->connectionId = connectionId++; connection->purpose = CONNECTION_PURPOSE_ANY; /* diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index 875c448a4..0a774ef87 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -353,8 +353,9 @@ LogRemoteCommand(MultiConnection *connection, const char *command) } ereport(LOG, (errmsg("issuing %s", ApplyLogRedaction(command)), - errdetail("on server %s@%s:%d", connection->user, connection->hostname, - connection->port))); + errdetail("on server %s@%s:%d connectionId: %ld", connection->user, + connection->hostname, + connection->port, connection->connectionId))); } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e8769bde0..d266bae67 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -150,7 +150,10 @@ #include "distributed/transaction_management.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/adaptive_executor.h" +#include "distributed/repartition_join_execution.h" #include "lib/ilist.h" +#include "commands/schemacmds.h" #include "storage/fd.h" #include "storage/latch.h" #include "utils/int8.h" @@ -158,6 +161,7 @@ #include "utils/memutils.h" #include "utils/timestamp.h" + /* * DistributedExecution represents the execution of a distributed query * plan. @@ -272,8 +276,15 @@ typedef struct DistributedExecution */ AttInMetadata *attributeInputMetadata; char **columnArray; + + /* + * jobIdList contains all jobs in the job tree, this is used to + * do cleanup for repartition queries. + */ + List *jobIdList; } DistributedExecution; + /* * WorkerPool represents a pool of sessions on the same worker. * @@ -416,6 +427,13 @@ typedef struct WorkerSession struct TaskPlacementExecution; +/* GUC, determining whether Citus opens 1 connection per task */ +bool ForceMaxQueryParallelization = false; +int MaxAdaptiveExecutorPoolSize = 16; + +/* GUC, number of ms to wait between opening connections to the same worker */ +int ExecutorSlowStartInterval = 10; + /* * TaskExecutionState indicates whether or not a command on a shard @@ -520,18 +538,10 @@ typedef struct TaskPlacementExecution } TaskPlacementExecution; -/* GUC, determining whether Citus opens 1 connection per task */ -bool ForceMaxQueryParallelization = false; -int MaxAdaptiveExecutorPoolSize = 16; - -/* GUC, number of ms to wait between opening connections to the same worker */ -int ExecutorSlowStartInterval = 10; - - /* local functions */ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel, - List *taskList, - bool hasReturning, + List *taskList, bool + hasReturning, ParamListInfo paramListInfo, TupleDesc tupleDescriptor, Tuplestorestate *tupleStore, @@ -540,7 +550,9 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel xactProperties); static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, - List *taskList); + List *taskList, + bool + exludeFromTransaction); static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution); @@ -598,11 +610,11 @@ static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution static void PlacementExecutionReady(TaskPlacementExecution *placementExecution); static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * shardCommandExecution); +static bool HasDependentJobs(Job *mainJob); static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); - /* * AdaptiveExecutor is called via CitusExecScan on the * first call of CitusExecScan. The function fills the tupleStore @@ -620,6 +632,7 @@ AdaptiveExecutor(CitusScanState *scanState) bool randomAccess = true; bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; + List *jobIdList = NIL; Job *job = distributedPlan->workerJob; List *taskList = job->taskList; @@ -636,6 +649,12 @@ AdaptiveExecutor(CitusScanState *scanState) ExecuteSubPlans(distributedPlan); + bool hasDependentJobs = HasDependentJobs(job); + if (hasDependentJobs) + { + jobIdList = ExecuteDependentTasks(taskList, job); + } + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { /* defer decision after ExecuteSubPlans() */ @@ -645,8 +664,10 @@ AdaptiveExecutor(CitusScanState *scanState) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - TransactionProperties xactProperties = - DecideTransactionPropertiesForTaskList(distributedPlan->modLevel, taskList); + TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( + distributedPlan->modLevel, taskList, + hasDependentJobs); + DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, @@ -704,6 +725,11 @@ AdaptiveExecutor(CitusScanState *scanState) FinishDistributedExecution(execution); + if (hasDependentJobs) + { + DoRepartitionCleanup(jobIdList); + } + if (SortReturning && distributedPlan->hasReturning) { SortTupleStore(scanState); @@ -713,6 +739,17 @@ AdaptiveExecutor(CitusScanState *scanState) } +/* + * HasDependentJobs returns true if there is any dependent job + * for the mainjob(top level) job. + */ +static bool +HasDependentJobs(Job *mainJob) +{ + return list_length(mainJob->dependentJobList) > 0; +} + + /* * RunLocalExecution runs the localTaskList in the execution, fills the tuplestore * and sets the es_processed if necessary. @@ -764,6 +801,28 @@ ExecuteUtilityTaskListWithoutResults(List *taskList) } +/* + * ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults + * for some of the arguments for a repartition query. + */ +uint64 +ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int + targetPoolSize) +{ + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = NULL; + bool hasReturning = false; + + TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, true); + + + return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, + tupleStore, hasReturning, targetPoolSize, + &xactProperties); +} + + /* * ExecuteTaskList is a proxy to ExecuteTaskListExtended() with defaults * for some of the arguments. @@ -776,7 +835,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize) bool hasReturning = false; TransactionProperties xactProperties = - DecideTransactionPropertiesForTaskList(modLevel, taskList); + DecideTransactionPropertiesForTaskList(modLevel, taskList, false); return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, @@ -795,8 +854,8 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList, { int targetPoolSize = MaxAdaptiveExecutorPoolSize; - TransactionProperties xactProperties = - DecideTransactionPropertiesForTaskList(modLevel, taskList); + TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList( + modLevel, taskList, false); return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor, tupleStore, hasReturning, targetPoolSize, @@ -914,7 +973,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu * errorOnAnyFailure, but not the other way around) we keep them in the same place. */ static TransactionProperties -DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList) +DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, bool + exludeFromTransaction) { TransactionProperties xactProperties = { .errorOnAnyFailure = false, @@ -928,6 +988,12 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList) return xactProperties; } + if (exludeFromTransaction) + { + xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED; + return xactProperties; + } + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) { /* @@ -1141,7 +1207,8 @@ DistributedExecutionRequiresRollback(List *taskList) if (ReadOnlyTask(task->taskType)) { - return SelectOpensTransactionBlock && IsTransactionBlock(); + return SelectOpensTransactionBlock && + IsTransactionBlock(); } if (IsMultiStatementTransaction()) @@ -1252,17 +1319,26 @@ TaskListRequires2PC(List *taskList) bool ReadOnlyTask(TaskType taskType) { - if (taskType == SELECT_TASK) + switch (taskType) { - /* - * TODO: We currently do not execute modifying CTEs via SELECT_TASK. - * When we implement it, we should either not use the mentioned task types for - * modifying CTEs detect them here. - */ - return true; - } + case SELECT_TASK: + case MAP_OUTPUT_FETCH_TASK: + case MAP_TASK: + case MERGE_TASK: + { + /* + * TODO: We currently do not execute modifying CTEs via ROUTER_TASK/SQL_TASK. + * When we implement it, we should either not use the mentioned task types for + * modifying CTEs detect them here. + */ + return true; + } - return false; + default: + { + return false; + } + } } @@ -1437,7 +1513,9 @@ CleanUpSessions(DistributedExecution *execution) * We cannot get MULTI_CONNECTION_LOST via the ConnectionStateMachine, * but we might get it via the connection API and find us here before * changing any states in the ConnectionStateMachine. + * */ + CloseConnection(connection); } else if (connection->connectionState == MULTI_CONNECTION_CONNECTED) @@ -1581,14 +1659,20 @@ AssignTasksToConnections(DistributedExecution *execution) List *placementAccessList = PlacementAccessListForTask(task, taskPlacement); - /* - * Determine whether the task has to be assigned to a particular connection - * due to a preceding access to the placement in the same transaction. - */ - MultiConnection *connection = GetConnectionIfPlacementAccessedInXact( - connectionFlags, - placementAccessList, - NULL); + MultiConnection *connection = NULL; + if (execution->transactionProperties->useRemoteTransactionBlocks != + TRANSACTION_BLOCKS_DISALLOWED) + { + /* + * Determine whether the task has to be assigned to a particular connection + * due to a preceding access to the placement in the same transaction. + */ + connection = GetConnectionIfPlacementAccessedInXact( + connectionFlags, + placementAccessList, + NULL); + } + if (connection != NULL) { /* @@ -1723,14 +1807,14 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task) case DDL_TASK: case VACUUM_ANALYZE_TASK: - { - return EXECUTION_ORDER_PARALLEL; - } - case MAP_TASK: case MERGE_TASK: case MAP_OUTPUT_FETCH_TASK: case MERGE_FETCH_TASK: + { + return EXECUTION_ORDER_PARALLEL; + } + default: { ereport(ERROR, (errmsg("unsupported task type %d in adaptive executor", @@ -1802,6 +1886,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection) } } + session = (WorkerSession *) palloc0(sizeof(WorkerSession)); session->sessionId = sessionId++; session->connection = connection; @@ -2051,6 +2136,12 @@ RunDistributedExecution(DistributedExecution *execution) */ UnclaimAllSessionConnections(execution->sessionList); + /* do repartition cleanup if this is a repartition query*/ + if (list_length(execution->jobIdList) > 0) + { + DoRepartitionCleanup(execution->jobIdList); + } + if (execution->waitEventSet != NULL) { FreeWaitEventSet(execution->waitEventSet); @@ -2174,6 +2265,12 @@ ManageWorkerPool(WorkerPool *workerPool) /* experimental: just to see the perf benefits of caching connections */ int connectionFlags = 0; + if (execution->transactionProperties->useRemoteTransactionBlocks == + TRANSACTION_BLOCKS_DISALLOWED) + { + connectionFlags |= OUTSIDE_TRANSACTION; + } + /* open a new connection to the worker */ MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, workerPool->nodeName, @@ -2651,12 +2748,14 @@ static bool TransactionModifiedDistributedTable(DistributedExecution *execution) { /* - * We need to explicitly check for a coordinated transaction due to + * We need to explicitly check for TRANSACTION_BLOCKS_REQUIRED due to * citus.function_opens_transaction_block flag. When set to false, we * should not be pretending that we're in a coordinated transaction even * if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround. */ - return InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA; + return execution->transactionProperties->useRemoteTransactionBlocks == + TRANSACTION_BLOCKS_REQUIRED && + XactModificationLevel == XACT_MODIFICATION_DATA; } @@ -3030,11 +3129,16 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, char *queryString = task->queryString; int querySent = 0; - /* - * Make sure that subsequent commands on the same placement - * use the same connection. - */ - AssignPlacementListToConnection(placementAccessList, connection); + if (execution->transactionProperties->useRemoteTransactionBlocks != + TRANSACTION_BLOCKS_DISALLOWED) + { + /* + * Make sure that subsequent commands on the same placement + * use the same connection. + */ + AssignPlacementListToConnection(placementAccessList, connection); + } + /* one more command is sent over the session */ session->commandsSent++; diff --git a/src/backend/distributed/executor/directed_acylic_graph_execution.c b/src/backend/distributed/executor/directed_acylic_graph_execution.c new file mode 100644 index 000000000..d8f31d47e --- /dev/null +++ b/src/backend/distributed/executor/directed_acylic_graph_execution.c @@ -0,0 +1,225 @@ +/*------------------------------------------------------------------------- + * + * directed_acylic_graph_execution_logic.c + * + * Logic to run tasks in their dependency order. + * + * Copyright (c) Citus Data, Inc. + */ + +#include "postgres.h" +#include "access/hash.h" +#include "distributed/hash_helpers.h" + +#include "distributed/directed_acylic_graph_execution.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/adaptive_executor.h" +#include "distributed/worker_manager.h" +#include "distributed/multi_server_executor.h" +#include "distributed/worker_transaction.h" +#include "distributed/worker_manager.h" +#include "distributed/transaction_management.h" +#include "distributed/multi_task_tracker_executor.h" +#include "distributed/metadata_cache.h" +#include "distributed/transmit.h" + +typedef struct TaskHashKey +{ + uint64 jobId; + uint32 taskId; +}TaskHashKey; + +typedef struct TaskHashEntry +{ + TaskHashKey key; + Task *task; +}TaskHashEntry; + +static HASHCTL InitHashTableInfo(void); +static HTAB * CreateTaskHashTable(void); +static bool IsAllDependencyCompleted(Task *task, HTAB *completedTasks); +static void AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks); +static List * FindExecutableTasks(List *allTasks, HTAB *completedTasks); +static int TaskHashCompare(const void *key1, const void *key2, Size keysize); +static uint32 TaskHash(const void *key, Size keysize); +static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks); + +/* + * ExecuteTasksInDependencyOrder executes the given tasks except the excluded + * tasks in their dependency order. To do so, it iterates all + * the tasks and finds the ones that can be executed at that time, it tries to + * execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize. + */ +void +ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks) +{ + HTAB *completedTasks = CreateTaskHashTable(); + + /* We only execute depended jobs' tasks, therefore to not execute */ + /* top level tasks, we add them to the completedTasks. */ + AddCompletedTasks(excludedTasks, completedTasks); + while (true) + { + List *curTasks = FindExecutableTasks(allTasks, completedTasks); + if (list_length(curTasks) == 0) + { + break; + } + ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks, + MaxAdaptiveExecutorPoolSize); + + AddCompletedTasks(curTasks, completedTasks); + curTasks = NIL; + } +} + + +/* + * FindExecutableTasks finds the tasks that can be executed currently, + * which means that all of their dependencies are executed. If a task + * is already executed, it is not added to the result. + */ +static List * +FindExecutableTasks(List *allTasks, HTAB *completedTasks) +{ + List *curTasks = NIL; + ListCell *taskCell = NULL; + + + foreach(taskCell, allTasks) + { + Task *task = (Task *) lfirst(taskCell); + + if (IsAllDependencyCompleted(task, completedTasks) && + !IsTaskAlreadyCompleted(task, completedTasks)) + { + curTasks = lappend(curTasks, task); + } + } + + return curTasks; +} + + +/* + * AddCompletedTasks adds the givens tasks to completedTasks HTAB. + */ +static void +AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks) +{ + ListCell *taskCell = NULL; + + bool found; + + foreach(taskCell, curCompletedTasks) + { + Task *task = (Task *) lfirst(taskCell); + TaskHashKey taskKey = { task->jobId, task->taskId }; + hash_search(completedTasks, &taskKey, HASH_ENTER, &found); + } +} + + +/* + * CreateTaskHashTable creates a HTAB with the necessary initialization. + */ +static HTAB * +CreateTaskHashTable() +{ + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE); + HASHCTL info = InitHashTableInfo(); + return hash_create("citus task completed list (jobId, taskId)", + 64, &info, hashFlags); +} + + +/* + * IsTaskAlreadyCompleted returns true if the given task + * is found in the completedTasks HTAB. + */ +static bool +IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks) +{ + bool found; + + TaskHashKey taskKey = { task->jobId, task->taskId }; + hash_search(completedTasks, &taskKey, HASH_ENTER, &found); + return found; +} + + +/* + * IsAllDependencyCompleted return true if the given task's + * dependencies are completed. + */ +static bool +IsAllDependencyCompleted(Task *targetTask, HTAB *completedTasks) +{ + ListCell *taskCell = NULL; + bool found = false; + + + foreach(taskCell, targetTask->dependentTaskList) + { + Task *task = (Task *) lfirst(taskCell); + TaskHashKey taskKey = { task->jobId, task->taskId }; + + hash_search(completedTasks, &taskKey, HASH_FIND, &found); + if (!found) + { + return false; + } + } + return true; +} + + +/* + * InitHashTableInfo returns hash table info, the hash table is + * configured to be created in the CurrentMemoryContext so that + * it will be cleaned when this memory context gets freed/reset. + */ +static HASHCTL +InitHashTableInfo() +{ + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(TaskHashKey); + info.entrysize = sizeof(TaskHashEntry); + info.hash = TaskHash; + info.match = TaskHashCompare; + info.hcxt = CurrentMemoryContext; + + return info; +} + + +static uint32 +TaskHash(const void *key, Size keysize) +{ + TaskHashKey *taskKey = (TaskHashKey *) key; + uint32 hash = 0; + + hash = hash_combine(hash, hash_any((unsigned char *) &taskKey->jobId, + sizeof(int64))); + hash = hash_combine(hash, hash_uint32(taskKey->taskId)); + + return hash; +} + + +static int +TaskHashCompare(const void *key1, const void *key2, Size keysize) +{ + TaskHashKey *taskKey1 = (TaskHashKey *) key1; + TaskHashKey *taskKey2 = (TaskHashKey *) key2; + if (taskKey1->jobId != taskKey2->jobId || taskKey1->taskId != taskKey2->taskId) + { + return 1; + } + else + { + return 0; + } +} diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index bcf4d99e1..98b683ce0 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -20,6 +20,7 @@ #include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" +#include "distributed/adaptive_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c index 92f70f7ed..188d1668b 100644 --- a/src/backend/distributed/executor/multi_server_executor.c +++ b/src/backend/distributed/executor/multi_server_executor.c @@ -24,6 +24,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_resowner.h" #include "distributed/multi_server_executor.h" +#include "distributed/master_protocol.h" #include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" #include "distributed/log_utils.h" @@ -35,6 +36,8 @@ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format bool EnableRepartitionJoins = false; +static bool HasReplicatedDistributedTable(List *relationOids); + /* * JobExecutorType selects the executor type for the given distributedPlan using the task * executor type config value. The function then checks if the given distributedPlan needs @@ -89,6 +92,7 @@ JobExecutorType(DistributedPlan *distributedPlan) Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY); + if (executorType == MULTI_EXECUTOR_ADAPTIVE) { /* if we have repartition jobs with adaptive executor and repartition @@ -104,12 +108,11 @@ JobExecutorType(DistributedPlan *distributedPlan) errhint("Set citus.enable_repartition_joins to on " "to enable repartitioning"))); } - - ereport(DEBUG1, (errmsg( - "cannot use adaptive executor with repartition jobs"), - errhint("Since you enabled citus.enable_repartition_joins " - "Citus chose to use task-tracker."))); - return MULTI_EXECUTOR_TASK_TRACKER; + if (HasReplicatedDistributedTable(distributedPlan->relationIdList)) + { + return MULTI_EXECUTOR_TASK_TRACKER; + } + return MULTI_EXECUTOR_ADAPTIVE; } } else @@ -131,6 +134,35 @@ JobExecutorType(DistributedPlan *distributedPlan) } +/* + * HasReplicatedDistributedTable returns true if there is any + * table in the given list that is: + * - not a reference table + * - has replication factor > 1 + */ +static bool +HasReplicatedDistributedTable(List *relationOids) +{ + ListCell *oidCell = NULL; + + foreach(oidCell, relationOids) + { + Oid oid = lfirst_oid(oidCell); + char partitionMethod = PartitionMethod(oid); + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + continue; + } + uint32 tableReplicationFactor = TableShardReplicationFactor(oid); + if (tableReplicationFactor > 1) + { + return true; + } + } + return false; +} + + /* * MaxMasterConnectionCount returns the number of connections a master can open. * A master cannot create more than a certain number of file descriptors (FDs). diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 7abc4246f..06d9a403d 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -41,6 +41,7 @@ #include "distributed/subplan_execution.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/multi_task_tracker_executor.h" #include "storage/fd.h" #include "utils/builtins.h" #include "utils/hsearch.h" @@ -72,7 +73,7 @@ typedef struct TaskMapEntry /* Local functions forward declarations to init tasks and trackers */ -static List * TaskAndExecutionList(List *jobTaskList); + static HTAB * TaskHashCreate(uint32 taskHashSize); static Task * TaskHashEnter(HTAB *taskHash, Task *task); static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId, @@ -454,7 +455,7 @@ MultiTaskTrackerExecute(Job *job) * struct, associates the task execution with the task, and adds the task and its * execution to a list. The function then returns the list. */ -static List * +List * TaskAndExecutionList(List *jobTaskList) { List *taskAndExecutionList = NIL; diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c new file mode 100644 index 000000000..2415f598f --- /dev/null +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -0,0 +1,171 @@ +/*------------------------------------------------------------------------- + * + * repartition_join_execution.c + * + * This file contains repartition specific logic. + * ExecuteDependentTasks takes a list of top level tasks. Its logic is as follows: + * - It generates all the tasks by descending in the tasks tree. Note that each task + * has a dependentTaskList. + * - It generates FetchTask queryStrings with the MapTask queries. It uses the first replicate to + * fetch data when replication factor is > 1. Note that if a task fails in any replica adaptive executor + * gives an error, so if we come to a fetchTask we know for sure that its dependedMapTask is executed in all + * replicas. + * - It creates schemas in each worker in a single transaction to store intermediate results. + * - It iterates all tasks and finds the ones whose dependencies are already executed, and executes them with + * adaptive executor logic. + * + * + * Repartition queries do not begin a transaction even if we are in + * a transaction block. As we dont begin a transaction, they wont see the + * DDLs that happened earlier in the transaction because we dont have that + * transaction id with repartition queries. Therefore we error in this case. + * + * Copyright (c) Citus Data, Inc. + */ + +#include "postgres.h" +#include "access/hash.h" +#include "distributed/hash_helpers.h" + +#include "distributed/directed_acylic_graph_execution.h" +#include "distributed/multi_physical_planner.h" +#include "distributed/adaptive_executor.h" +#include "distributed/worker_manager.h" +#include "distributed/multi_server_executor.h" +#include "distributed/repartition_join_execution.h" +#include "distributed/worker_transaction.h" +#include "distributed/worker_manager.h" +#include "distributed/transaction_management.h" +#include "distributed/multi_task_tracker_executor.h" +#include "distributed/worker_transaction.h" +#include "distributed/metadata_cache.h" +#include "distributed/listutils.h" +#include "distributed/transmit.h" + + +static List * CreateTemporarySchemasForMergeTasks(Job *topLevelJob); +static List * ExtractJobsInJobTree(Job *job); +static void TraverseJobTree(Job *curJob, List **jobs); +static char * GenerateCreateSchemasCommand(List *jobIds); +static char * GenerateJobCommands(List *jobIds, char *templateCommand); +static char * GenerateDeleteJobsCommand(List *jobIds); + + +/* + * ExecuteDependentTasks executes all tasks except the top level tasks + * in order from the task tree. At a time, it can execute different tasks from + * different jobs. + */ +List * +ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) +{ + EnsureNoModificationsHaveBeenDone(); + + List *allTasks = TaskAndExecutionList(topLevelTasks); + + List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob); + + ExecuteTasksInDependencyOrder(allTasks, topLevelTasks); + + return jobIds; +} + + +/* + * CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used + * later in each worker. Single transaction is used to create the schemas. + */ +static List * +CreateTemporarySchemasForMergeTasks(Job *topLeveLJob) +{ + List *jobIds = ExtractJobsInJobTree(topLeveLJob); + char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds); + SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName()); + return jobIds; +} + + +/* + * ExtractJobsInJobTree returns all job ids in the job tree + * where the given job is root. + */ +static List * +ExtractJobsInJobTree(Job *job) +{ + List *jobIds = NIL; + TraverseJobTree(job, &jobIds); + return jobIds; +} + + +/* + * TraverseJobTree does a dfs in the current job and adds + * all of its job ids. + */ +static void +TraverseJobTree(Job *curJob, List **jobIds) +{ + ListCell *jobCell = NULL; + *jobIds = lappend(*jobIds, (void *) curJob->jobId); + + foreach(jobCell, curJob->dependentJobList) + { + Job *childJob = (Job *) lfirst(jobCell); + TraverseJobTree(childJob, jobIds); + } +} + + +/* + * GenerateCreateSchemasCommand returns concatanated create schema commands. + */ +static char * +GenerateCreateSchemasCommand(List *jobIds) +{ + return GenerateJobCommands(jobIds, WORKER_CREATE_SCHEMA_QUERY); +} + + +/* + * GenerateJobCommands returns concatenated commands with the given template + * command for each job id from the given job ids. The returned command is + * exactly list_length(jobIds) subcommands. + * E.g create_schema(jobId1); create_schema(jobId2); ... + * This way we can send the command in just one latency to a worker. + */ +static char * +GenerateJobCommands(List *jobIds, char *templateCommand) +{ + StringInfo createSchemaCommand = makeStringInfo(); + ListCell *jobIdCell = NULL; + + foreach(jobIdCell, jobIds) + { + uint64 jobId = (uint64) lfirst(jobIdCell); + appendStringInfo(createSchemaCommand, templateCommand, jobId); + } + return createSchemaCommand->data; +} + + +/* + * DoRepartitionCleanup removes the temporary job directories and schemas that are + * used for repartition queries for the given job ids. + */ +void +DoRepartitionCleanup(List *jobIds) +{ + SendOptionalCommandListToAllWorkers(list_make1(GenerateDeleteJobsCommand( + jobIds)), + CitusExtensionOwnerName()); +} + + +/* + * GenerateDeleteJobsCommand returns concatanated remove job dir commands. + */ +static char * +GenerateDeleteJobsCommand(List *jobIds) +{ + return GenerateJobCommands(jobIds, WORKER_REPARTITION_CLEANUP_QUERY); +} diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 4c5394c35..3f02d9e74 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -28,6 +28,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" +#include "distributed/adaptive_executor.h" #include "distributed/connection_management.h" #include "distributed/distributed_planner.h" #include "distributed/listutils.h" diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 014c164a5..2f6af1bd1 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -4499,11 +4499,28 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex) { Task *mapTask = (Task *) lfirst(mapTaskCell); - /* we need node names for the query, and we'll resolve them later */ - char *undefinedQueryString = NULL; + /* find the node name/port for map task's execution */ + List *mapTaskPlacementList = mapTask->taskPlacementList; + + ShardPlacement *mapTaskPlacement = linitial(mapTaskPlacementList); + char *mapTaskNodeName = mapTaskPlacement->nodeName; + uint32 mapTaskNodePort = mapTaskPlacement->nodePort; + + /* + * If replication factor is 1, then we know that we will use the first and + * the only placement. If task tracker is used, then it will regenerate the + * query string because if there are multiple placements then it does not + * know in which placement the parent map task was successful. + */ + StringInfo mapFetchQueryString = makeStringInfo(); + appendStringInfo(mapFetchQueryString, MAP_OUTPUT_FETCH_COMMAND, + mapTask->jobId, mapTask->taskId, partitionId, + mergeTaskId, /* fetch results to merge task */ + mapTaskNodeName, mapTaskNodePort); + Task *mapOutputFetchTask = CreateBasicTask(jobId, taskIdIndex, MAP_OUTPUT_FETCH_TASK, - undefinedQueryString); + mapFetchQueryString->data); mapOutputFetchTask->partitionId = partitionId; mapOutputFetchTask->upstreamTaskId = mergeTaskId; mapOutputFetchTask->dependentTaskList = list_make1(mapTask); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 2235bf5f2..1919e59e8 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -62,6 +62,7 @@ #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" +#include "distributed/adaptive_executor.h" #include "port/atomics.h" #include "postmaster/postmaster.h" #include "optimizer/planner.h" diff --git a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql index df7a4418f..e326d3b93 100644 --- a/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql +++ b/src/backend/distributed/sql/citus--9.0-2--9.1-1.sql @@ -4,6 +4,8 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS #include "udfs/master_set_node_property/9.1-1.sql" #include "udfs/master_drain_node/9.1-1.sql" +#include "udfs/worker_create_schema/9.1-1.sql" +#include "udfs/worker_repartition_cleanup/9.1-1.sql" #include "udfs/rebalance_table_shards/9.1-1.sql" #include "udfs/get_rebalance_table_shards_plan/9.1-1.sql" #include "udfs/master_add_node/9.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/worker_create_schema/9.1-1.sql b/src/backend/distributed/sql/udfs/worker_create_schema/9.1-1.sql new file mode 100644 index 000000000..c8d311823 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_create_schema/9.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.worker_create_schema(bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_create_schema$$; +COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint) + IS 'create schema in remote node'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql b/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql new file mode 100644 index 000000000..c8d311823 --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_create_schema/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.worker_create_schema(bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_create_schema$$; +COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint) + IS 'create schema in remote node'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_repartition_cleanup/9.1-1.sql b/src/backend/distributed/sql/udfs/worker_repartition_cleanup/9.1-1.sql new file mode 100644 index 000000000..b4e21990e --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_repartition_cleanup/9.1-1.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_repartition_cleanup$$; +COMMENT ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) + IS 'remove job in remote node'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/worker_repartition_cleanup/latest.sql b/src/backend/distributed/sql/udfs/worker_repartition_cleanup/latest.sql new file mode 100644 index 000000000..b4e21990e --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_repartition_cleanup/latest.sql @@ -0,0 +1,8 @@ +CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$worker_repartition_cleanup$$; +COMMENT ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) + IS 'remove job in remote node'; + +REVOKE ALL ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) FROM PUBLIC; diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 5c369e19d..0d898236a 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -414,7 +414,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_PREPARE: { - if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE) + if (InCoordinatedTransaction()) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot use 2PC in transactions involving " diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index fb081d046..b9db46014 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -23,6 +23,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/resource_lock.h" +#include "distributed/metadata_sync.h" #include "distributed/remote_commands.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_transaction.h" @@ -42,6 +43,8 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, Oid *parameterTypes, const char *const *parameterValues); static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList); +static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, + char *superuser); /* @@ -117,6 +120,72 @@ SendCommandToWorkersWithMetadata(const char *command) } +/* + * SendCommandToAllWorkers sends the given command to + * all workers as a superuser. + */ +void +SendCommandToAllWorkers(char *command, char *superuser) +{ + SendCommandListToAllWorkers(list_make1(command), superuser); +} + + +/* + * SendCommandListToAllWorkers sends the given command to all workers in + * a single transaction. + */ +void +SendCommandListToAllWorkers(List *commandList, char *superuser) +{ + SendCommandListToAllWorkersInternal(commandList, true, superuser); +} + + +/* + * SendCommandListToAllWorkersInternal sends the given command to all workers in a single + * transaction as a superuser. If failOnError is false, then it continues sending the commandList to other + * workers even if it fails in one of them. + */ +static void +SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, char *superuser) +{ + ListCell *workerNodeCell = NULL; + List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + if (failOnError) + { + SendCommandListToWorkerInSingleTransaction(workerNode->workerName, + workerNode->workerPort, + superuser, + commandList); + } + else + { + SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, + workerNode->workerPort, + superuser, + commandList); + } + } +} + + +/* + * SendOptionalCommandListToAllWorkers sends the given command to all works in + * a single transaction as a superuser. If there is an error during the command, it is ignored + * so this method doesnt return any error. + */ +void +SendOptionalCommandListToAllWorkers(List *commandList, char *superuser) +{ + SendCommandListToAllWorkersInternal(commandList, false, superuser); +} + + /* * TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the * TargetWorkerSet. diff --git a/src/backend/distributed/worker/task_tracker.c b/src/backend/distributed/worker/task_tracker.c index b8099f302..1105e3187 100644 --- a/src/backend/distributed/worker/task_tracker.c +++ b/src/backend/distributed/worker/task_tracker.c @@ -75,7 +75,6 @@ static void TrackerSigHupHandler(SIGNAL_ARGS); static void TrackerShutdownHandler(SIGNAL_ARGS); /* Local functions forward declarations */ -static void TrackerCleanupJobDirectories(void); static void TrackerCleanupJobSchemas(void); static void TrackerCleanupConnections(HTAB *WorkerTasksHash); static void TrackerRegisterShutDown(HTAB *WorkerTasksHash); @@ -338,7 +337,7 @@ WorkerTasksHashFind(uint64 jobId, uint32 taskId) * associated files cannot be cleaned up safely. We therefore perform this * cleanup when the process restarts. */ -static void +void TrackerCleanupJobDirectories(void) { /* use the default tablespace in {datadir}/base */ diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 1d445ccc3..7be58e1dd 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -42,7 +42,6 @@ /* Local functions forward declarations */ static bool TaskTrackerRunning(void); -static void CreateJobSchema(StringInfo schemaName); static void CreateTask(uint64 jobId, uint32 taskId, char *taskCallString); static void UpdateTask(WorkerTask *workerTask, char *taskCallString); static void CleanupTask(WorkerTask *workerTask); @@ -308,7 +307,7 @@ TaskTrackerRunning(void) * Further note that the created schema does not become visible to other * processes until the transaction commits. */ -static void +void CreateJobSchema(StringInfo schemaName) { const char *queryString = NULL; diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 44291fbaa..43a4dc042 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -31,6 +31,8 @@ #include "distributed/metadata_cache.h" #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" +#include "distributed/task_tracker_protocol.h" +#include "distributed/task_tracker.h" #include "executor/spi.h" #include "nodes/makefuncs.h" #include "parser/parse_type.h" @@ -39,6 +41,8 @@ #include "utils/builtins.h" #include "utils/snapmgr.h" #include "utils/syscache.h" +#include "commands/schemacmds.h" +#include "distributed/resource_lock.h" /* Local functions forward declarations */ @@ -53,6 +57,50 @@ static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relatio PG_FUNCTION_INFO_V1(worker_merge_files_into_table); PG_FUNCTION_INFO_V1(worker_merge_files_and_run_query); PG_FUNCTION_INFO_V1(worker_cleanup_job_schema_cache); +PG_FUNCTION_INFO_V1(worker_create_schema); +PG_FUNCTION_INFO_V1(worker_repartition_cleanup); + + +/* + * worker_create_schema creates a schema with the given job id in local. + */ +Datum +worker_create_schema(PG_FUNCTION_ARGS) +{ + uint64 jobId = PG_GETARG_INT64(0); + + StringInfo jobSchemaName = JobSchemaName(jobId); + CheckCitusVersion(ERROR); + + bool schemaExists = JobSchemaExists(jobSchemaName); + if (!schemaExists) + { + CreateJobSchema(jobSchemaName); + } + + PG_RETURN_VOID(); +} + + +/* + * worker_repartition_cleanup removes the job directory and schema with the given job id . + */ +Datum +worker_repartition_cleanup(PG_FUNCTION_ARGS) +{ + uint64 jobId = PG_GETARG_INT64(0); + StringInfo jobDirectoryName = JobDirectoryName(jobId); + StringInfo jobSchemaName = JobSchemaName(jobId); + + CheckCitusVersion(ERROR); + + Oid schemaId = get_namespace_oid(jobSchemaName->data, false); + + EnsureSchemaOwner(schemaId); + CitusRemoveDirectory(jobDirectoryName); + RemoveJobSchema(jobSchemaName); + PG_RETURN_VOID(); +} /* @@ -135,7 +183,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) userId); SetUserIdAndSecContext(savedUserId, savedSecurityContext); - PG_RETURN_VOID(); } diff --git a/src/include/distributed/adaptive_executor.h b/src/include/distributed/adaptive_executor.h new file mode 100644 index 000000000..c9d871839 --- /dev/null +++ b/src/include/distributed/adaptive_executor.h @@ -0,0 +1,20 @@ +#ifndef ADAPTIVE_EXECUTOR_H +#define ADAPTIVE_EXECUTOR_H + +#include "distributed/multi_physical_planner.h" + +/* GUC, determining whether Citus opens 1 connection per task */ +extern bool ForceMaxQueryParallelization; +extern int MaxAdaptiveExecutorPoolSize; + +/* GUC, number of ms to wait between opening connections to the same worker */ +extern int ExecutorSlowStartInterval; + +extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int + targetPoolSize); +extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, + int + targetPoolSize); + + +#endif /* ADAPTIVE_EXECUTOR_H */ diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 716d4617f..17ff36d12 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -49,8 +49,10 @@ enum MultiConnectionMode /* open a connection per (co-located set of) placement(s) */ CONNECTION_PER_PLACEMENT = 1 << 3, + OUTSIDE_TRANSACTION = 1 << 4, + /* connection has not been used to access data */ - REQUIRE_SIDECHANNEL = 1 << 4 + REQUIRE_SIDECHANNEL = 1 << 5 }; /* @@ -93,6 +95,9 @@ typedef struct MultiConnection /* underlying libpq connection */ struct pg_conn *pgConn; + /* connection id */ + uint64 connectionId; + /* state of the connection */ MultiConnectionState connectionState; diff --git a/src/include/distributed/directed_acylic_graph_execution.h b/src/include/distributed/directed_acylic_graph_execution.h new file mode 100644 index 000000000..be6987564 --- /dev/null +++ b/src/include/distributed/directed_acylic_graph_execution.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * directed_acylic_graph_execution.h + * Execution logic for directed acylic graph tasks. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef DIRECTED_ACYLIC_GRAPH_EXECUTION_H +#define DIRECTED_ACYLIC_GRAPH_EXECUTION_H + +#include "postgres.h" + +#include "nodes/pg_list.h" + +extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks); + + +#endif /* DIRECTED_ACYLIC_GRAPH_EXECUTION_H */ diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h index cc8b913a9..cf4fe30cb 100644 --- a/src/include/distributed/multi_server_executor.h +++ b/src/include/distributed/multi_server_executor.h @@ -36,6 +36,12 @@ #define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")" #define JOB_CLEANUP_TASK_ID INT_MAX +/* Adaptive executor repartioning related defines */ +#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ");" +#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \ + UINT64_FORMAT \ + ");" + /* Enumeration to track one task's execution status */ typedef enum diff --git a/src/include/distributed/multi_task_tracker_executor.h b/src/include/distributed/multi_task_tracker_executor.h new file mode 100644 index 000000000..4f4d412ed --- /dev/null +++ b/src/include/distributed/multi_task_tracker_executor.h @@ -0,0 +1,8 @@ + + +#ifndef MULTI_TASK_TRACKER_EXECUTOR_H +#define MULTI_TASK_TRACKER_EXECUTOR_H + +extern List * TaskAndExecutionList(List *jobTaskList); + +#endif /* MULTI_TASK_TRACKER_EXECUTOR_H */ diff --git a/src/include/distributed/repartition_join_execution.h b/src/include/distributed/repartition_join_execution.h new file mode 100644 index 000000000..784d0d9cf --- /dev/null +++ b/src/include/distributed/repartition_join_execution.h @@ -0,0 +1,19 @@ +/*------------------------------------------------------------------------- + * + * repartition_join_execution.h + * Execution logic for repartition queries. + * + * Copyright (c) Citus Data, Inc. + *------------------------------------------------------------------------- + */ + +#ifndef REPARTITION_JOIN_EXECUTION_H +#define REPARTITION_JOIN_EXECUTION_H + +#include "nodes/pg_list.h" + +extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob); +extern void DoRepartitionCleanup(List *jobIds); + + +#endif /* REPARTITION_JOIN_EXECUTION_H */ diff --git a/src/include/distributed/task_tracker.h b/src/include/distributed/task_tracker.h index 9e1208b47..3c94606af 100644 --- a/src/include/distributed/task_tracker.h +++ b/src/include/distributed/task_tracker.h @@ -109,6 +109,8 @@ typedef struct WorkerTasksSharedStateData } WorkerTasksSharedStateData; +extern void TrackerCleanupJobDirectories(void); + /* Config variables managed via guc.c */ extern int TaskTrackerDelay; extern int MaxTrackedTasksPerNode; diff --git a/src/include/distributed/task_tracker_protocol.h b/src/include/distributed/task_tracker_protocol.h index 69fcb349b..38028cc00 100644 --- a/src/include/distributed/task_tracker_protocol.h +++ b/src/include/distributed/task_tracker_protocol.h @@ -17,6 +17,7 @@ #include "fmgr.h" +extern void CreateJobSchema(StringInfo schemaName); /* Function declarations for distributed task management */ extern Datum task_tracker_assign_task(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index e05a9a330..420f3ac3b 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -148,6 +148,7 @@ extern Datum worker_apply_shard_ddl_command(PG_FUNCTION_ARGS); extern Datum worker_range_partition_table(PG_FUNCTION_ARGS); extern Datum worker_hash_partition_table(PG_FUNCTION_ARGS); extern Datum worker_merge_files_into_table(PG_FUNCTION_ARGS); +extern Datum worker_create_schema(PG_FUNCTION_ARGS); extern Datum worker_merge_files_and_run_query(PG_FUNCTION_ARGS); extern Datum worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 66c522c2f..00cff4758 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -42,6 +42,9 @@ extern void SendBareCommandListToMetadataWorkers(List *commandList); extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList, const char *user); extern void EnsureNoModificationsHaveBeenDone(void); +extern void SendCommandListToAllWorkers(List *commandList, char *superuser); +extern void SendOptionalCommandListToAllWorkers(List *commandList, char *superuser); +extern void SendCommandToAllWorkers(char *command, char *superuser); extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, diff --git a/src/test/regress/expected/adaptive_executor_repartition.out b/src/test/regress/expected/adaptive_executor_repartition.out new file mode 100644 index 000000000..ab40dd4c0 --- /dev/null +++ b/src/test/regress/expected/adaptive_executor_repartition.out @@ -0,0 +1,124 @@ +CREATE SCHEMA adaptive_executor; +SET search_path TO adaptive_executor; +SET citus.task_executor_type to 'adaptive'; +SET citus.shard_replication_factor to 1; +SET citus.enable_repartition_joins TO true; +CREATE TABLE ab(a int, b int); +SELECT create_distributed_table('ab', 'a'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO ab SELECT *,* FROM generate_series(1,10); +SELECT COUNT(*) FROM ab k, ab l +WHERE k.a = l.b; + count +------- + 10 +(1 row) + +SELECT COUNT(*) FROM ab k, ab l, ab m, ab t +WHERE k.a = l.b AND k.a = m.b AND t.b = l.a; + count +------- + 10 +(1 row) + +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; + count +------- + 10 +(1 row) + +BEGIN; +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; + count +------- + 10 +(1 row) + +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; + count +------- + 10 +(1 row) + +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; + count +------- + 10 +(1 row) + +ROLLBACK; +BEGIN; +INSERT INTO ab values(1, 2); +-- DDL happened before repartition query in a transaction block, so this should error. +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; +ERROR: cannot open new connections after the first modification command within a transaction +ROLLBACK; +SET citus.enable_single_hash_repartition_joins TO ON; +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); +SELECT create_distributed_table('single_hash_repartition_first', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('single_hash_repartition_second', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_reference_table('ref_table'); + create_reference_table +------------------------ + +(1 row) + +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; + QUERY PLAN +---------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 +(7 rows) + +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; + QUERY PLAN +---------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0) + Task Count: 4 + Tasks Shown: None, not supported for re-partition queries + -> MapMergeJob + Map Task Count: 4 + Merge Task Count: 4 +(7 rows) + +SET citus.enable_single_hash_repartition_joins TO OFF; +DROP SCHEMA adaptive_executor CASCADE; +NOTICE: drop cascades to 4 other objects +DETAIL: drop cascades to table ab +drop cascades to table single_hash_repartition_first +drop cascades to table single_hash_repartition_second +drop cascades to table ref_table diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out index 8f77b73f4..e1471536e 100644 --- a/src/test/regress/expected/limit_intermediate_size.out +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -1,4 +1,5 @@ SET citus.enable_repartition_joins to ON; +SET citus.task_executor_type to 'task-tracker'; SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS @@ -295,3 +296,4 @@ SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; 1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 1 | 2 (10 rows) +SET citus.task_executor_type to 'adaptive'; diff --git a/src/test/regress/expected/multi_null_minmax_value_pruning.out b/src/test/regress/expected/multi_null_minmax_value_pruning.out index af1585c9e..bd5e606d3 100644 --- a/src/test/regress/expected/multi_null_minmax_value_pruning.out +++ b/src/test/regress/expected/multi_null_minmax_value_pruning.out @@ -121,12 +121,10 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 12 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 12 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. QUERY PLAN ------------------------------------------------------------------- Aggregate - -> Custom Scan (Citus Task-Tracker) + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -188,12 +186,10 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 12 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 12 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. QUERY PLAN ------------------------------------------------------------------- Aggregate - -> Custom Scan (Citus Task-Tracker) + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -257,12 +253,10 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 12 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 12 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. QUERY PLAN ------------------------------------------------------------------- Aggregate - -> Custom Scan (Citus Task-Tracker) + -> Custom Scan (Citus Adaptive) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob diff --git a/src/test/regress/expected/multi_subquery_complex_queries.out b/src/test/regress/expected/multi_subquery_complex_queries.out index c67e6e330..2245d5e6b 100644 --- a/src/test/regress/expected/multi_subquery_complex_queries.out +++ b/src/test/regress/expected/multi_subquery_complex_queries.out @@ -423,8 +423,6 @@ GROUP BY types ORDER BY types; -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 16_1 for subquery SELECT max(events."time") AS max, 0 AS event, events.user_id FROM public.events_table events, public.users_table users WHERE ((events.user_id OPERATOR(pg_catalog.=) users.value_2) AND (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2]))) GROUP BY events.user_id DEBUG: generating subplan 16_2 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 0 AS event, events.user_id FROM public.events_table events WHERE (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2]))) events_subquery_1 DEBUG: generating subplan 16_3 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 2 AS event, events.user_id FROM public.events_table events WHERE (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[3, 4]))) events_subquery_3 diff --git a/src/test/regress/expected/non_colocated_join_order.out b/src/test/regress/expected/non_colocated_join_order.out index e84d47a8c..ba28a26c1 100644 --- a/src/test/regress/expected/non_colocated_join_order.out +++ b/src/test/regress/expected/non_colocated_join_order.out @@ -42,8 +42,6 @@ SET citus.shard_replication_factor to 1; SET citus.enable_repartition_joins to ON; SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id; LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ] -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. count ------- 9 diff --git a/src/test/regress/expected/non_colocated_leaf_subquery_joins.out b/src/test/regress/expected/non_colocated_leaf_subquery_joins.out index b7c632d8e..298093b9d 100644 --- a/src/test/regress/expected/non_colocated_leaf_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_leaf_subquery_joins.out @@ -36,7 +36,6 @@ FROM (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8)) as bar WHERE foo.user_id = bar.user_id;$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 1_1 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id) valid @@ -52,9 +51,7 @@ FROM (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (5,6,7,8)) as bar WHERE foo.user_id = bar.user_id;$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 3_1 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 3_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('3_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id) valid @@ -76,7 +73,6 @@ WHERE users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (5,6));$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 6_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6]))) DEBUG: Plan 6 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM public.users_table WHERE (value_1 OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))) valid @@ -94,7 +90,6 @@ SELECT count(*) FROM q1, (SELECT WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) as bar WHERE bar.user_id = q1.user_id ;$$); DEBUG: generating subplan 8_1 for CTE q1: SELECT user_id FROM public.users_table -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 8_2 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) q1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) bar WHERE (bar.user_id OPERATOR(pg_catalog.=) q1.user_id) valid @@ -106,7 +101,6 @@ DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS cou SELECT true AS valid FROM explain_json($$ (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) UNION (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8));$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 11_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: generating subplan 11_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: Plan 11 query after replacing subqueries and CTEs: SELECT intermediate_result.user_id FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION SELECT intermediate_result.user_id FROM read_intermediate_result('11_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) @@ -143,11 +137,9 @@ FROM ( ) q ORDER BY 2 DESC, 1; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 14_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: push down of limit count: 5 DEBUG: generating subplan 14_2 for subquery SELECT user_id FROM public.users_table WHERE ((value_2 OPERATOR(pg_catalog.>=) 5) AND (EXISTS (SELECT intermediate_result.user_id FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) LIMIT 5 -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 14_3 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: generating subplan 14_4 for subquery SELECT DISTINCT ON ((e.event_type)::text) (e.event_type)::text AS event, e."time", e.user_id FROM public.users_table u, public.events_table e, (SELECT intermediate_result.user_id FROM read_intermediate_result('14_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('14_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) DEBUG: generating subplan 14_5 for subquery SELECT t.event, array_agg(t.user_id) AS events_table FROM (SELECT intermediate_result.event, intermediate_result."time", intermediate_result.user_id FROM read_intermediate_result('14_4'::text, 'binary'::citus_copy_format) intermediate_result(event text, "time" timestamp without time zone, user_id integer)) t, public.users_table WHERE (users_table.value_1 OPERATOR(pg_catalog.=) (t.event)::integer) GROUP BY t.event diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index fd7250140..486d8bb17 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -162,7 +162,6 @@ SELECT true AS valid FROM explain_json_2($$ foo.user_id = bar.user_id AND foo.event_type IN (SELECT event_type FROM events_table WHERE user_id < 4); $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 16_1 for subquery SELECT users_table.user_id, events_table.event_type FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: generating subplan 16_2 for subquery SELECT event_type FROM public.events_table WHERE (user_id OPERATOR(pg_catalog.<) 4) DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT foo.user_id FROM (SELECT intermediate_result.user_id, intermediate_result.event_type FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event_type integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE ((foo.user_id OPERATOR(pg_catalog.=) bar.user_id) AND (foo.event_type OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.event_type FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result(event_type integer)))) @@ -187,9 +186,7 @@ SELECT true AS valid FROM explain_json_2($$ ) as foo_top, events_table WHERE events_table.user_id = foo_top.user_id; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 19_1 for subquery SELECT users_table.user_id, events_table.event_type FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 19_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.event_type) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: generating subplan 19_3 for subquery SELECT event_type FROM public.events_table WHERE (user_id OPERATOR(pg_catalog.=) 5) DEBUG: generating subplan 19_4 for subquery SELECT foo.user_id, random() AS random FROM (SELECT intermediate_result.user_id, intermediate_result.event_type FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event_type integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('19_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((foo.user_id OPERATOR(pg_catalog.=) bar.user_id) AND (foo.event_type OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.event_type FROM read_intermediate_result('19_3'::text, 'binary'::citus_copy_format) intermediate_result(event_type integer)))) @@ -254,7 +251,6 @@ SELECT true AS valid FROM explain_json_2($$ foo1.user_id = foo5.user_id ) as foo_top; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 26_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20]))) DEBUG: Plan 26 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo5.user_id))) foo_top valid @@ -284,7 +280,6 @@ SELECT true AS valid FROM explain_json_2($$ foo1.user_id = foo5.value_1 ) as foo_top; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 28_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: generating subplan 28_2 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20]))) DEBUG: Plan 28 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('28_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('28_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo5.value_1))) foo_top @@ -316,7 +311,6 @@ SELECT true AS valid FROM explain_json_2($$ foo2.user_id = foo5.value_1 ) as foo_top; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 31_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: generating subplan 31_2 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20]))) DEBUG: Plan 31 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('31_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('31_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo2.user_id OPERATOR(pg_catalog.=) foo5.value_1))) foo_top @@ -350,9 +344,7 @@ SELECT true AS valid FROM explain_json_2($$ foo.user_id = bar.user_id) as bar_top ON (foo_top.user_id = bar_top.user_id); $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 34_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 34_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan 34 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foo.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) foo_top JOIN (SELECT foo.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) bar_top ON ((foo_top.user_id OPERATOR(pg_catalog.=) bar_top.user_id))) valid @@ -417,7 +409,6 @@ SELECT true AS valid FROM explain_json_2($$ foo.user_id = bar.user_id) as bar_top ON (foo_top.value_2 = bar_top.user_id); $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 39_1 for subquery SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16]))) DEBUG: generating subplan 39_2 for subquery SELECT foo.user_id FROM (SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id) DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foo.user_id, foo.value_2 FROM (SELECT DISTINCT users_table.user_id, users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo, (SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) foo_top JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar_top ON ((foo_top.value_2 OPERATOR(pg_catalog.=) bar_top.user_id))) @@ -439,7 +430,6 @@ SELECT true AS valid FROM explain_json_2($$ WHERE foo.my_users = users_table.user_id) as mid_level_query ) as bar; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 42_1 for subquery SELECT events_table.user_id AS my_users FROM public.events_table, public.users_table WHERE (events_table.event_type OPERATOR(pg_catalog.=) users_table.user_id) DEBUG: Plan 42 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT mid_level_query.user_id FROM (SELECT DISTINCT users_table.user_id FROM public.users_table, (SELECT intermediate_result.my_users FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(my_users integer)) foo WHERE (foo.my_users OPERATOR(pg_catalog.=) users_table.user_id)) mid_level_query) bar valid @@ -536,7 +526,6 @@ WHERE users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (5,6));$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 50_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6]))) DEBUG: Plan 50 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM public.users_table WHERE (value_1 OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('50_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))) valid @@ -554,7 +543,6 @@ SELECT count(*) FROM q1, (SELECT WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) as bar WHERE bar.user_id = q1.user_id ;$$); DEBUG: generating subplan 52_1 for CTE q1: SELECT user_id FROM public.users_table -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 52_2 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: Plan 52 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('52_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) q1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('52_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) bar WHERE (bar.user_id OPERATOR(pg_catalog.=) q1.user_id) valid @@ -582,7 +570,6 @@ DEBUG: Plan 55 query after replacing subqueries and CTEs: SELECT count(*) AS co SELECT true AS valid FROM explain_json_2($$ (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) UNION (SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8));$$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 57_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: generating subplan 57_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: Plan 57 query after replacing subqueries and CTEs: SELECT intermediate_result.user_id FROM read_intermediate_result('57_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION SELECT intermediate_result.user_id FROM read_intermediate_result('57_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) @@ -619,11 +606,9 @@ FROM ( ) q ORDER BY 2 DESC, 1; $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 60_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4]))) DEBUG: push down of limit count: 5 DEBUG: generating subplan 60_2 for subquery SELECT user_id FROM public.users_table WHERE ((value_2 OPERATOR(pg_catalog.>=) 5) AND (EXISTS (SELECT intermediate_result.user_id FROM read_intermediate_result('60_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) LIMIT 5 -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 60_3 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8]))) DEBUG: generating subplan 60_4 for subquery SELECT DISTINCT ON ((e.event_type)::text) (e.event_type)::text AS event, e."time", e.user_id FROM public.users_table u, public.events_table e, (SELECT intermediate_result.user_id FROM read_intermediate_result('60_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('60_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) DEBUG: generating subplan 60_5 for subquery SELECT t.event, array_agg(t.user_id) AS events_table FROM (SELECT intermediate_result.event, intermediate_result."time", intermediate_result.user_id FROM read_intermediate_result('60_4'::text, 'binary'::citus_copy_format) intermediate_result(event text, "time" timestamp without time zone, user_id integer)) t, public.users_table WHERE (users_table.value_1 OPERATOR(pg_catalog.=) (t.event)::integer) GROUP BY t.event @@ -654,7 +639,6 @@ SELECT true AS valid FROM explain_json_2($$ FROM (SELECT * FROM users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1); $$); -DEBUG: cannot use adaptive executor with repartition jobs DEBUG: generating subplan 68_1 for subquery SELECT u1.value_1, u1.user_id, u1."time", u1.value_2, u1.value_3, u1.value_4, u2.user_id, u2."time", u2.value_2, u2.value_3, u2.value_4 FROM (public.users_table u1 JOIN public.users_table u2 USING (value_1)) DEBUG: Plan 68 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.value_1, intermediate_result.user_id, intermediate_result."time", intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4, intermediate_result.user_id_1 AS user_id, intermediate_result.time_1 AS "time", intermediate_result.value_2_1 AS value_2, intermediate_result.value_3_1 AS value_3, intermediate_result.value_4_1 AS value_4 FROM read_intermediate_result('68_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, user_id integer, "time" timestamp without time zone, value_2 integer, value_3 double precision, value_4 bigint, user_id_1 integer, time_1 timestamp without time zone, value_2_1 integer, value_3_1 double precision, value_4_1 bigint)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT users_table.value_1, random() AS random FROM public.users_table) u3 USING (value_1)) valid diff --git a/src/test/regress/expected/recursive_dml_with_different_planners_executors.out b/src/test/regress/expected/recursive_dml_with_different_planners_executors.out index 0cb9daddc..f1a801af4 100644 --- a/src/test/regress/expected/recursive_dml_with_different_planners_executors.out +++ b/src/test/regress/expected/recursive_dml_with_different_planners_executors.out @@ -56,8 +56,6 @@ UPDATE distributed_table SET dept = foo.some_tenants::int FROM DISTINCT second_distributed_table.tenant_id as some_tenants FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept ) as foo; -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT second_distributed_table.tenant_id AS some_tenants FROM recursive_dml_with_different_planner_executors.second_distributed_table, recursive_dml_with_different_planner_executors.distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.=) distributed_table.dept) DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = (foo.some_tenants)::integer FROM (SELECT intermediate_result.some_tenants FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(some_tenants text)) foo SET citus.enable_repartition_joins to OFF; diff --git a/src/test/regress/expected/set_operation_and_local_tables.out b/src/test/regress/expected/set_operation_and_local_tables.out index f3545b906..f5f5e0884 100644 --- a/src/test/regress/expected/set_operation_and_local_tables.out +++ b/src/test/regress/expected/set_operation_and_local_tables.out @@ -326,7 +326,7 @@ DEBUG: Plan is router executable 2 (2 rows) -SET citus.enable_repartition_joins TO ON; +SET citus.task_executor_type TO 'task-tracker'; -- repartition is recursively planned before the set operation (SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y LIMIT 2) INTERSECT (((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i)) ORDER BY 1 DESC; DEBUG: Local tables cannot be used in distributed queries. @@ -360,8 +360,6 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 20 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 20 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 53_1 for subquery SELECT t1.x FROM recursive_set_local.test t1, recursive_set_local.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y) LIMIT 2 DEBUG: generating subplan 53_2 for subquery SELECT x FROM recursive_set_local.local_test DEBUG: Router planner cannot handle multi-shard select queries @@ -377,7 +375,7 @@ DEBUG: Plan is router executable 1 (2 rows) -SET citus.enable_repartition_joins TO OFF; +SET citus.task_executor_type TO 'adaptive'; RESET client_min_messages; DROP SCHEMA recursive_set_local CASCADE; NOTICE: drop cascades to 3 other objects diff --git a/src/test/regress/expected/set_operations.out b/src/test/regress/expected/set_operations.out index d05cedb57..7f7f0015c 100644 --- a/src/test/regress/expected/set_operations.out +++ b/src/test/regress/expected/set_operations.out @@ -913,8 +913,6 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 20 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 20 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 164_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y) LIMIT 0 DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan 164_2 for subquery SELECT x FROM recursive_union.test @@ -957,8 +955,6 @@ DEBUG: pruning merge fetch taskId 10 DETAIL: Creating dependency on merge taskId 20 DEBUG: pruning merge fetch taskId 11 DETAIL: Creating dependency on merge taskId 20 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 167_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y) DEBUG: Router planner cannot handle multi-shard select queries DEBUG: generating subplan 167_2 for subquery SELECT x FROM recursive_union.test diff --git a/src/test/regress/expected/subquery_executors.out b/src/test/regress/expected/subquery_executors.out index 8a11643e0..43ab91eb6 100644 --- a/src/test/regress/expected/subquery_executors.out +++ b/src/test/regress/expected/subquery_executors.out @@ -72,8 +72,6 @@ FROM SELECT user_id FROM users_table ) as bar WHERE foo.value_2 = bar.user_id; -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2)) DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) count @@ -100,8 +98,6 @@ FROM WHERE foo.value_2 = bar.user_id AND baz.value_2 = bar.user_id AND bar.user_id = baw.user_id; DEBUG: generating subplan 10_1 for subquery SELECT value_2 FROM public.users_table WHERE (user_id OPERATOR(pg_catalog.=) 15) OFFSET 0 DEBUG: generating subplan 10_2 for subquery SELECT user_id FROM public.users_table OFFSET 0 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 10_3 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2)) DEBUG: generating subplan 10_4 for subquery SELECT user_id FROM subquery_executor.users_table_local WHERE (user_id OPERATOR(pg_catalog.=) 2) DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar, (SELECT intermediate_result.value_2 FROM read_intermediate_result('10_3'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) baz, (SELECT intermediate_result.user_id FROM read_intermediate_result('10_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) baw WHERE ((foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (baz.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (bar.user_id OPERATOR(pg_catalog.=) baw.user_id)) diff --git a/src/test/regress/expected/subquery_partitioning.out b/src/test/regress/expected/subquery_partitioning.out index 9e734137b..856646b0c 100644 --- a/src/test/regress/expected/subquery_partitioning.out +++ b/src/test/regress/expected/subquery_partitioning.out @@ -157,9 +157,7 @@ FROM ( SELECT user_id FROM users_table ) as bar -WHERE foo.value_1 = bar.user_id; -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. +WHERE foo.value_1 = bar.user_id; DEBUG: generating subplan 14_1 for subquery SELECT DISTINCT p1.value_1 FROM subquery_and_partitioning.partitioning_test p1, subquery_and_partitioning.partitioning_test p2 WHERE (p1.id OPERATOR(pg_catalog.=) p2.value_1) DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_1 FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_1 OPERATOR(pg_catalog.=) bar.user_id) count diff --git a/src/test/regress/expected/subquery_view.out b/src/test/regress/expected/subquery_view.out index b5ccf7270..7724d8712 100644 --- a/src/test/regress/expected/subquery_view.out +++ b/src/test/regress/expected/subquery_view.out @@ -311,8 +311,6 @@ SELECT * FROM repartition_view; -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 23_1 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2)) DEBUG: generating subplan 23_2 for subquery SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('23_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) DEBUG: Plan 23 query after replacing subqueries and CTEs: SELECT count FROM (SELECT intermediate_result.count FROM read_intermediate_result('23_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) repartition_view @@ -344,8 +342,6 @@ FROM all_executors_view; DEBUG: generating subplan 26_1 for subquery SELECT value_2 FROM public.users_table WHERE (user_id OPERATOR(pg_catalog.=) 15) OFFSET 0 DEBUG: generating subplan 26_2 for subquery SELECT user_id FROM public.users_table OFFSET 0 -DEBUG: cannot use adaptive executor with repartition jobs -HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker. DEBUG: generating subplan 26_3 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2)) DEBUG: generating subplan 26_4 for subquery SELECT user_id FROM subquery_view.users_table_local WHERE (user_id OPERATOR(pg_catalog.=) 2) DEBUG: generating subplan 26_5 for subquery SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('26_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar, (SELECT intermediate_result.value_2 FROM read_intermediate_result('26_3'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) baz, (SELECT intermediate_result.user_id FROM read_intermediate_result('26_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) baw WHERE ((foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (baz.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (bar.user_id OPERATOR(pg_catalog.=) baw.user_id)) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5de8b193a..8e0dbb715 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -113,6 +113,7 @@ test: multi_join_order_tpch_repartition # the output. # ---------- test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment +test: adaptive_executor_repartition # --------- # Tests that modify data should run sequentially @@ -124,6 +125,7 @@ test: with_prepare # --------- test: with_nested with_where with_basics with_set_operations test: with_modifying cte_prepared_modify cte_nested_modification +test: ensure_no_intermediate_data_leak test: with_executors with_join with_partitioning with_transactions with_dml diff --git a/src/test/regress/sql/adaptive_executor_repartition.sql b/src/test/regress/sql/adaptive_executor_repartition.sql new file mode 100644 index 000000000..c973014c1 --- /dev/null +++ b/src/test/regress/sql/adaptive_executor_repartition.sql @@ -0,0 +1,62 @@ +CREATE SCHEMA adaptive_executor; +SET search_path TO adaptive_executor; + +SET citus.task_executor_type to 'adaptive'; +SET citus.shard_replication_factor to 1; +SET citus.enable_repartition_joins TO true; + +CREATE TABLE ab(a int, b int); +SELECT create_distributed_table('ab', 'a'); +INSERT INTO ab SELECT *,* FROM generate_series(1,10); + +SELECT COUNT(*) FROM ab k, ab l +WHERE k.a = l.b; + +SELECT COUNT(*) FROM ab k, ab l, ab m, ab t +WHERE k.a = l.b AND k.a = m.b AND t.b = l.a; + +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; + +BEGIN; +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; +ROLLBACK; + +BEGIN; +INSERT INTO ab values(1, 2); +-- DDL happened before repartition query in a transaction block, so this should error. +SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b; +ROLLBACK; + +SET citus.enable_single_hash_repartition_joins TO ON; + +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int, sum int, avg float); +CREATE TABLE ref_table (id int, sum int, avg float); + + +SELECT create_distributed_table('single_hash_repartition_first', 'id'); +SELECT create_distributed_table('single_hash_repartition_second', 'id'); +SELECT create_reference_table('ref_table'); + + +-- single hash repartition after bcast joins +EXPLAIN SELECT + count(*) +FROM + ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2 +WHERE + r1.id = t1.id AND t2.sum = t1.id; + +-- a more complicated join order, first colocated join, later single hash repartition join +EXPLAIN SELECT + count(*) +FROM + single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3 +WHERE + t1.id = t2.id AND t1.sum = t3.id; + +SET citus.enable_single_hash_repartition_joins TO OFF; + +DROP SCHEMA adaptive_executor CASCADE; diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql index ecbef25d0..34f88dbf1 100644 --- a/src/test/regress/sql/limit_intermediate_size.sql +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -1,6 +1,6 @@ SET citus.enable_repartition_joins to ON; - +SET citus.task_executor_type to 'task-tracker'; SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS @@ -229,3 +229,5 @@ WITH cte AS ( cte2.user_id = cte3.user_id AND cte2.user_id = 1 ) SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10; + +SET citus.task_executor_type to 'adaptive'; diff --git a/src/test/regress/sql/set_operation_and_local_tables.sql b/src/test/regress/sql/set_operation_and_local_tables.sql index a6ae9b9f2..4c87784f8 100644 --- a/src/test/regress/sql/set_operation_and_local_tables.sql +++ b/src/test/regress/sql/set_operation_and_local_tables.sql @@ -91,12 +91,12 @@ SELECT * FROM ((SELECT * FROM local_test) INTERSECT (SELECT * FROM test ORDER BY -- set operations and the sublink can be recursively planned SELECT * FROM ((SELECT x FROM test) UNION (SELECT x FROM (SELECT x FROM local_test) as foo WHERE x IN (SELECT x FROM test))) u ORDER BY 1; -SET citus.enable_repartition_joins TO ON; +SET citus.task_executor_type TO 'task-tracker'; -- repartition is recursively planned before the set operation (SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y LIMIT 2) INTERSECT (((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i)) ORDER BY 1 DESC; -SET citus.enable_repartition_joins TO OFF; +SET citus.task_executor_type TO 'adaptive'; RESET client_min_messages; DROP SCHEMA recursive_set_local CASCADE; diff --git a/src/test/regress/sql/with_executors.sql b/src/test/regress/sql/with_executors.sql index 30bda09be..4a183cd96 100644 --- a/src/test/regress/sql/with_executors.sql +++ b/src/test/regress/sql/with_executors.sql @@ -144,7 +144,6 @@ ORDER BY 1, 2, 3, 4 LIMIT 10; - -- All combined WITH cte AS ( WITH task_tracker AS (