mirror of https://github.com/citusdata/citus.git
Add adaptive executor support for repartition joins (#3169)
* WIP * wip * add basic logic to run a single job with repartioning joins with adaptive executor * fix some warnings and return in ExecuteDependedTasks if there is none * Add the logic to run depended jobs in adaptive executor The execution of depended tasks logic is changed. With the current logic: - All tasks are created from the top level task list. - At one iteration: - CurTasks whose dependencies are executed are found. - CurTasks are executed in parallel with adapter executor main logic. - The iteration is repeated until all tasks are completed. * Separate adaptive executor repartioning logic * Remove duplicate parts * cleanup directories and schemas * add basic repartion tests for adaptive executor * Use the first placement to fetch data In task tracker, when there are replicas, we try to fetch from a replica for which a map task is succeeded. TaskExecution is used for this, however TaskExecution is not used in adaptive executor. So we cannot use the same thing as task tracker. Since adaptive executor fails when a map task fails (There is no retry logic yet). We know that if we try to execute a fetch task, all of its map tasks already succeeded, so we can just use the first one to fetch from. * fix clean directories logic * do not change the search path while creating a udf * Enable repartition joins with adaptive executor with only enable_reparitition_joins guc * Add comments to adaptive_executor_repartition * dont run adaptive executor repartition test in paralle with other tests * execute cleanup only in the top level execution * do cleanup only in the top level ezecution * not begin a transaction if repartition query is used * use new connections for repartititon specific queries New connections are opened to send repartition specific queries. The opened connections will be closed at the FinishDistributedExecution. While sending repartition queries no transaction is begun so that we can see all changes. * error if a modification was done prior to repartition execution * not start a transaction if a repartition query and sql task, and clean temporary files and schemas at each subplan level * fix cleanup logic * update tests * add missing function comments * add test for transaction with DDL before repartition query * do not close repartition connections in adaptive executor * rollback instead of commit in repartition join test * use close connection instead of shutdown connection * remove unnecesary connection list, ensure schema owner before removing directory * rename ExecuteTaskListRepartition * put fetch query string in planner not executor as we currently support only replication factor = 1 with adaptive executor and repartition query and we know the query string in the planner phase in that case * split adaptive executor repartition to DAG execution logic and repartition logic * apply review items * apply review items * use an enum for remote transaction state and fix cleanup for repartition * add outside transaction flag to find connections that are unclaimed instead of always opening a new transaction * fix style * wip * rename removejobdir to partition cleanup * do not close connections at the end of repartition queries * do repartition cleanup in pg catch * apply review items * decide whether to use transaction or not at execution creation * rename isOutsideTransaction and add missing comment * not error in pg catch while doing cleanup * use replication factor of the creation time, not current time to decide if task tracker should be chosen * apply review items * apply review items * apply review itempull/3304/head
parent
8cea662f17
commit
7ff4ce2169
|
@ -24,6 +24,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
|
|
@ -351,6 +351,17 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
|||
MultiConnection *connection =
|
||||
dlist_container(MultiConnection, connectionNode, iter.cur);
|
||||
|
||||
if (flags & OUTSIDE_TRANSACTION)
|
||||
{
|
||||
/* dont return connections that are used in transactions */
|
||||
if (connection->remoteTransaction.transactionState !=
|
||||
REMOTE_TRANS_NOT_STARTED)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/* don't return claimed connections */
|
||||
if (connection->claimedExclusively)
|
||||
{
|
||||
/* connection is in use for an ongoing operation */
|
||||
|
@ -949,6 +960,7 @@ static MultiConnection *
|
|||
StartConnectionEstablishment(ConnectionHashKey *key)
|
||||
{
|
||||
bool found = false;
|
||||
static uint64 connectionId = 1;
|
||||
|
||||
/* search our cache for precomputed connection settings */
|
||||
ConnParamsHashEntry *entry = hash_search(ConnParamsHash, key, HASH_ENTER, &found);
|
||||
|
@ -979,6 +991,7 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
|||
(const char **) entry->values,
|
||||
false);
|
||||
connection->connectionStart = GetCurrentTimestamp();
|
||||
connection->connectionId = connectionId++;
|
||||
connection->purpose = CONNECTION_PURPOSE_ANY;
|
||||
|
||||
/*
|
||||
|
|
|
@ -353,8 +353,9 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
|
|||
}
|
||||
|
||||
ereport(LOG, (errmsg("issuing %s", ApplyLogRedaction(command)),
|
||||
errdetail("on server %s@%s:%d", connection->user, connection->hostname,
|
||||
connection->port)));
|
||||
errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
|
||||
connection->hostname,
|
||||
connection->port, connection->connectionId)));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -150,7 +150,10 @@
|
|||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "commands/schemacmds.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/int8.h"
|
||||
|
@ -158,6 +161,7 @@
|
|||
#include "utils/memutils.h"
|
||||
#include "utils/timestamp.h"
|
||||
|
||||
|
||||
/*
|
||||
* DistributedExecution represents the execution of a distributed query
|
||||
* plan.
|
||||
|
@ -272,8 +276,15 @@ typedef struct DistributedExecution
|
|||
*/
|
||||
AttInMetadata *attributeInputMetadata;
|
||||
char **columnArray;
|
||||
|
||||
/*
|
||||
* jobIdList contains all jobs in the job tree, this is used to
|
||||
* do cleanup for repartition queries.
|
||||
*/
|
||||
List *jobIdList;
|
||||
} DistributedExecution;
|
||||
|
||||
|
||||
/*
|
||||
* WorkerPool represents a pool of sessions on the same worker.
|
||||
*
|
||||
|
@ -416,6 +427,13 @@ typedef struct WorkerSession
|
|||
|
||||
struct TaskPlacementExecution;
|
||||
|
||||
/* GUC, determining whether Citus opens 1 connection per task */
|
||||
bool ForceMaxQueryParallelization = false;
|
||||
int MaxAdaptiveExecutorPoolSize = 16;
|
||||
|
||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||
int ExecutorSlowStartInterval = 10;
|
||||
|
||||
|
||||
/*
|
||||
* TaskExecutionState indicates whether or not a command on a shard
|
||||
|
@ -520,18 +538,10 @@ typedef struct TaskPlacementExecution
|
|||
} TaskPlacementExecution;
|
||||
|
||||
|
||||
/* GUC, determining whether Citus opens 1 connection per task */
|
||||
bool ForceMaxQueryParallelization = false;
|
||||
int MaxAdaptiveExecutorPoolSize = 16;
|
||||
|
||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||
int ExecutorSlowStartInterval = 10;
|
||||
|
||||
|
||||
/* local functions */
|
||||
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
|
||||
List *taskList,
|
||||
bool hasReturning,
|
||||
List *taskList, bool
|
||||
hasReturning,
|
||||
ParamListInfo paramListInfo,
|
||||
TupleDesc tupleDescriptor,
|
||||
Tuplestorestate *tupleStore,
|
||||
|
@ -540,7 +550,9 @@ static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel
|
|||
xactProperties);
|
||||
static TransactionProperties DecideTransactionPropertiesForTaskList(RowModifyLevel
|
||||
modLevel,
|
||||
List *taskList);
|
||||
List *taskList,
|
||||
bool
|
||||
exludeFromTransaction);
|
||||
static void StartDistributedExecution(DistributedExecution *execution);
|
||||
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
|
||||
static void RunDistributedExecution(DistributedExecution *execution);
|
||||
|
@ -598,11 +610,11 @@ static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution
|
|||
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
|
||||
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
|
||||
shardCommandExecution);
|
||||
static bool HasDependentJobs(Job *mainJob);
|
||||
static void ExtractParametersForRemoteExecution(ParamListInfo paramListInfo,
|
||||
Oid **parameterTypes,
|
||||
const char ***parameterValues);
|
||||
|
||||
|
||||
/*
|
||||
* AdaptiveExecutor is called via CitusExecScan on the
|
||||
* first call of CitusExecScan. The function fills the tupleStore
|
||||
|
@ -620,6 +632,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
bool randomAccess = true;
|
||||
bool interTransactions = false;
|
||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||
List *jobIdList = NIL;
|
||||
|
||||
Job *job = distributedPlan->workerJob;
|
||||
List *taskList = job->taskList;
|
||||
|
@ -636,6 +649,12 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
|
||||
ExecuteSubPlans(distributedPlan);
|
||||
|
||||
bool hasDependentJobs = HasDependentJobs(job);
|
||||
if (hasDependentJobs)
|
||||
{
|
||||
jobIdList = ExecuteDependentTasks(taskList, job);
|
||||
}
|
||||
|
||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||
{
|
||||
/* defer decision after ExecuteSubPlans() */
|
||||
|
@ -645,8 +664,10 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
scanState->tuplestorestate =
|
||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||
|
||||
TransactionProperties xactProperties =
|
||||
DecideTransactionPropertiesForTaskList(distributedPlan->modLevel, taskList);
|
||||
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
|
||||
distributedPlan->modLevel, taskList,
|
||||
hasDependentJobs);
|
||||
|
||||
|
||||
DistributedExecution *execution = CreateDistributedExecution(
|
||||
distributedPlan->modLevel,
|
||||
|
@ -704,6 +725,11 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
|
||||
FinishDistributedExecution(execution);
|
||||
|
||||
if (hasDependentJobs)
|
||||
{
|
||||
DoRepartitionCleanup(jobIdList);
|
||||
}
|
||||
|
||||
if (SortReturning && distributedPlan->hasReturning)
|
||||
{
|
||||
SortTupleStore(scanState);
|
||||
|
@ -713,6 +739,17 @@ AdaptiveExecutor(CitusScanState *scanState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasDependentJobs returns true if there is any dependent job
|
||||
* for the mainjob(top level) job.
|
||||
*/
|
||||
static bool
|
||||
HasDependentJobs(Job *mainJob)
|
||||
{
|
||||
return list_length(mainJob->dependentJobList) > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RunLocalExecution runs the localTaskList in the execution, fills the tuplestore
|
||||
* and sets the es_processed if necessary.
|
||||
|
@ -764,6 +801,28 @@ ExecuteUtilityTaskListWithoutResults(List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteTaskListRepartiton is a proxy to ExecuteTaskListExtended() with defaults
|
||||
* for some of the arguments for a repartition query.
|
||||
*/
|
||||
uint64
|
||||
ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList, int
|
||||
targetPoolSize)
|
||||
{
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = NULL;
|
||||
bool hasReturning = false;
|
||||
|
||||
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
|
||||
modLevel, taskList, true);
|
||||
|
||||
|
||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||
tupleStore, hasReturning, targetPoolSize,
|
||||
&xactProperties);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteTaskList is a proxy to ExecuteTaskListExtended() with defaults
|
||||
* for some of the arguments.
|
||||
|
@ -776,7 +835,7 @@ ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int targetPoolSize)
|
|||
bool hasReturning = false;
|
||||
|
||||
TransactionProperties xactProperties =
|
||||
DecideTransactionPropertiesForTaskList(modLevel, taskList);
|
||||
DecideTransactionPropertiesForTaskList(modLevel, taskList, false);
|
||||
|
||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||
tupleStore, hasReturning, targetPoolSize,
|
||||
|
@ -795,8 +854,8 @@ ExecuteTaskListIntoTupleStore(RowModifyLevel modLevel, List *taskList,
|
|||
{
|
||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||
|
||||
TransactionProperties xactProperties =
|
||||
DecideTransactionPropertiesForTaskList(modLevel, taskList);
|
||||
TransactionProperties xactProperties = DecideTransactionPropertiesForTaskList(
|
||||
modLevel, taskList, false);
|
||||
|
||||
return ExecuteTaskListExtended(modLevel, taskList, tupleDescriptor,
|
||||
tupleStore, hasReturning, targetPoolSize,
|
||||
|
@ -914,7 +973,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
|
|||
* errorOnAnyFailure, but not the other way around) we keep them in the same place.
|
||||
*/
|
||||
static TransactionProperties
|
||||
DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList)
|
||||
DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList, bool
|
||||
exludeFromTransaction)
|
||||
{
|
||||
TransactionProperties xactProperties = {
|
||||
.errorOnAnyFailure = false,
|
||||
|
@ -928,6 +988,12 @@ DecideTransactionPropertiesForTaskList(RowModifyLevel modLevel, List *taskList)
|
|||
return xactProperties;
|
||||
}
|
||||
|
||||
if (exludeFromTransaction)
|
||||
{
|
||||
xactProperties.useRemoteTransactionBlocks = TRANSACTION_BLOCKS_DISALLOWED;
|
||||
return xactProperties;
|
||||
}
|
||||
|
||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
|
||||
{
|
||||
/*
|
||||
|
@ -1141,7 +1207,8 @@ DistributedExecutionRequiresRollback(List *taskList)
|
|||
|
||||
if (ReadOnlyTask(task->taskType))
|
||||
{
|
||||
return SelectOpensTransactionBlock && IsTransactionBlock();
|
||||
return SelectOpensTransactionBlock &&
|
||||
IsTransactionBlock();
|
||||
}
|
||||
|
||||
if (IsMultiStatementTransaction())
|
||||
|
@ -1252,17 +1319,26 @@ TaskListRequires2PC(List *taskList)
|
|||
bool
|
||||
ReadOnlyTask(TaskType taskType)
|
||||
{
|
||||
if (taskType == SELECT_TASK)
|
||||
switch (taskType)
|
||||
{
|
||||
/*
|
||||
* TODO: We currently do not execute modifying CTEs via SELECT_TASK.
|
||||
* When we implement it, we should either not use the mentioned task types for
|
||||
* modifying CTEs detect them here.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
case SELECT_TASK:
|
||||
case MAP_OUTPUT_FETCH_TASK:
|
||||
case MAP_TASK:
|
||||
case MERGE_TASK:
|
||||
{
|
||||
/*
|
||||
* TODO: We currently do not execute modifying CTEs via ROUTER_TASK/SQL_TASK.
|
||||
* When we implement it, we should either not use the mentioned task types for
|
||||
* modifying CTEs detect them here.
|
||||
*/
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
default:
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1437,7 +1513,9 @@ CleanUpSessions(DistributedExecution *execution)
|
|||
* We cannot get MULTI_CONNECTION_LOST via the ConnectionStateMachine,
|
||||
* but we might get it via the connection API and find us here before
|
||||
* changing any states in the ConnectionStateMachine.
|
||||
*
|
||||
*/
|
||||
|
||||
CloseConnection(connection);
|
||||
}
|
||||
else if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
||||
|
@ -1581,14 +1659,20 @@ AssignTasksToConnections(DistributedExecution *execution)
|
|||
|
||||
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
|
||||
|
||||
/*
|
||||
* Determine whether the task has to be assigned to a particular connection
|
||||
* due to a preceding access to the placement in the same transaction.
|
||||
*/
|
||||
MultiConnection *connection = GetConnectionIfPlacementAccessedInXact(
|
||||
connectionFlags,
|
||||
placementAccessList,
|
||||
NULL);
|
||||
MultiConnection *connection = NULL;
|
||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||
TRANSACTION_BLOCKS_DISALLOWED)
|
||||
{
|
||||
/*
|
||||
* Determine whether the task has to be assigned to a particular connection
|
||||
* due to a preceding access to the placement in the same transaction.
|
||||
*/
|
||||
connection = GetConnectionIfPlacementAccessedInXact(
|
||||
connectionFlags,
|
||||
placementAccessList,
|
||||
NULL);
|
||||
}
|
||||
|
||||
if (connection != NULL)
|
||||
{
|
||||
/*
|
||||
|
@ -1723,14 +1807,14 @@ ExecutionOrderForTask(RowModifyLevel modLevel, Task *task)
|
|||
|
||||
case DDL_TASK:
|
||||
case VACUUM_ANALYZE_TASK:
|
||||
{
|
||||
return EXECUTION_ORDER_PARALLEL;
|
||||
}
|
||||
|
||||
case MAP_TASK:
|
||||
case MERGE_TASK:
|
||||
case MAP_OUTPUT_FETCH_TASK:
|
||||
case MERGE_FETCH_TASK:
|
||||
{
|
||||
return EXECUTION_ORDER_PARALLEL;
|
||||
}
|
||||
|
||||
default:
|
||||
{
|
||||
ereport(ERROR, (errmsg("unsupported task type %d in adaptive executor",
|
||||
|
@ -1802,6 +1886,7 @@ FindOrCreateWorkerSession(WorkerPool *workerPool, MultiConnection *connection)
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
session = (WorkerSession *) palloc0(sizeof(WorkerSession));
|
||||
session->sessionId = sessionId++;
|
||||
session->connection = connection;
|
||||
|
@ -2051,6 +2136,12 @@ RunDistributedExecution(DistributedExecution *execution)
|
|||
*/
|
||||
UnclaimAllSessionConnections(execution->sessionList);
|
||||
|
||||
/* do repartition cleanup if this is a repartition query*/
|
||||
if (list_length(execution->jobIdList) > 0)
|
||||
{
|
||||
DoRepartitionCleanup(execution->jobIdList);
|
||||
}
|
||||
|
||||
if (execution->waitEventSet != NULL)
|
||||
{
|
||||
FreeWaitEventSet(execution->waitEventSet);
|
||||
|
@ -2174,6 +2265,12 @@ ManageWorkerPool(WorkerPool *workerPool)
|
|||
/* experimental: just to see the perf benefits of caching connections */
|
||||
int connectionFlags = 0;
|
||||
|
||||
if (execution->transactionProperties->useRemoteTransactionBlocks ==
|
||||
TRANSACTION_BLOCKS_DISALLOWED)
|
||||
{
|
||||
connectionFlags |= OUTSIDE_TRANSACTION;
|
||||
}
|
||||
|
||||
/* open a new connection to the worker */
|
||||
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
|
||||
workerPool->nodeName,
|
||||
|
@ -2651,12 +2748,14 @@ static bool
|
|||
TransactionModifiedDistributedTable(DistributedExecution *execution)
|
||||
{
|
||||
/*
|
||||
* We need to explicitly check for a coordinated transaction due to
|
||||
* We need to explicitly check for TRANSACTION_BLOCKS_REQUIRED due to
|
||||
* citus.function_opens_transaction_block flag. When set to false, we
|
||||
* should not be pretending that we're in a coordinated transaction even
|
||||
* if XACT_MODIFICATION_DATA is set. That's why we implemented this workaround.
|
||||
*/
|
||||
return InCoordinatedTransaction() && XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||
return execution->transactionProperties->useRemoteTransactionBlocks ==
|
||||
TRANSACTION_BLOCKS_REQUIRED &&
|
||||
XactModificationLevel == XACT_MODIFICATION_DATA;
|
||||
}
|
||||
|
||||
|
||||
|
@ -3030,11 +3129,16 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
|||
char *queryString = task->queryString;
|
||||
int querySent = 0;
|
||||
|
||||
/*
|
||||
* Make sure that subsequent commands on the same placement
|
||||
* use the same connection.
|
||||
*/
|
||||
AssignPlacementListToConnection(placementAccessList, connection);
|
||||
if (execution->transactionProperties->useRemoteTransactionBlocks !=
|
||||
TRANSACTION_BLOCKS_DISALLOWED)
|
||||
{
|
||||
/*
|
||||
* Make sure that subsequent commands on the same placement
|
||||
* use the same connection.
|
||||
*/
|
||||
AssignPlacementListToConnection(placementAccessList, connection);
|
||||
}
|
||||
|
||||
|
||||
/* one more command is sent over the session */
|
||||
session->commandsSent++;
|
||||
|
|
|
@ -0,0 +1,225 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* directed_acylic_graph_execution_logic.c
|
||||
*
|
||||
* Logic to run tasks in their dependency order.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "access/hash.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
|
||||
#include "distributed/directed_acylic_graph_execution.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/multi_task_tracker_executor.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/transmit.h"
|
||||
|
||||
typedef struct TaskHashKey
|
||||
{
|
||||
uint64 jobId;
|
||||
uint32 taskId;
|
||||
}TaskHashKey;
|
||||
|
||||
typedef struct TaskHashEntry
|
||||
{
|
||||
TaskHashKey key;
|
||||
Task *task;
|
||||
}TaskHashEntry;
|
||||
|
||||
static HASHCTL InitHashTableInfo(void);
|
||||
static HTAB * CreateTaskHashTable(void);
|
||||
static bool IsAllDependencyCompleted(Task *task, HTAB *completedTasks);
|
||||
static void AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks);
|
||||
static List * FindExecutableTasks(List *allTasks, HTAB *completedTasks);
|
||||
static int TaskHashCompare(const void *key1, const void *key2, Size keysize);
|
||||
static uint32 TaskHash(const void *key, Size keysize);
|
||||
static bool IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks);
|
||||
|
||||
/*
|
||||
* ExecuteTasksInDependencyOrder executes the given tasks except the excluded
|
||||
* tasks in their dependency order. To do so, it iterates all
|
||||
* the tasks and finds the ones that can be executed at that time, it tries to
|
||||
* execute all of them in parallel. The parallelism is bound by MaxAdaptiveExecutorPoolSize.
|
||||
*/
|
||||
void
|
||||
ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks)
|
||||
{
|
||||
HTAB *completedTasks = CreateTaskHashTable();
|
||||
|
||||
/* We only execute depended jobs' tasks, therefore to not execute */
|
||||
/* top level tasks, we add them to the completedTasks. */
|
||||
AddCompletedTasks(excludedTasks, completedTasks);
|
||||
while (true)
|
||||
{
|
||||
List *curTasks = FindExecutableTasks(allTasks, completedTasks);
|
||||
if (list_length(curTasks) == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
ExecuteTaskListOutsideTransaction(ROW_MODIFY_NONE, curTasks,
|
||||
MaxAdaptiveExecutorPoolSize);
|
||||
|
||||
AddCompletedTasks(curTasks, completedTasks);
|
||||
curTasks = NIL;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FindExecutableTasks finds the tasks that can be executed currently,
|
||||
* which means that all of their dependencies are executed. If a task
|
||||
* is already executed, it is not added to the result.
|
||||
*/
|
||||
static List *
|
||||
FindExecutableTasks(List *allTasks, HTAB *completedTasks)
|
||||
{
|
||||
List *curTasks = NIL;
|
||||
ListCell *taskCell = NULL;
|
||||
|
||||
|
||||
foreach(taskCell, allTasks)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
|
||||
if (IsAllDependencyCompleted(task, completedTasks) &&
|
||||
!IsTaskAlreadyCompleted(task, completedTasks))
|
||||
{
|
||||
curTasks = lappend(curTasks, task);
|
||||
}
|
||||
}
|
||||
|
||||
return curTasks;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddCompletedTasks adds the givens tasks to completedTasks HTAB.
|
||||
*/
|
||||
static void
|
||||
AddCompletedTasks(List *curCompletedTasks, HTAB *completedTasks)
|
||||
{
|
||||
ListCell *taskCell = NULL;
|
||||
|
||||
bool found;
|
||||
|
||||
foreach(taskCell, curCompletedTasks)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
TaskHashKey taskKey = { task->jobId, task->taskId };
|
||||
hash_search(completedTasks, &taskKey, HASH_ENTER, &found);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTaskHashTable creates a HTAB with the necessary initialization.
|
||||
*/
|
||||
static HTAB *
|
||||
CreateTaskHashTable()
|
||||
{
|
||||
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT | HASH_COMPARE);
|
||||
HASHCTL info = InitHashTableInfo();
|
||||
return hash_create("citus task completed list (jobId, taskId)",
|
||||
64, &info, hashFlags);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsTaskAlreadyCompleted returns true if the given task
|
||||
* is found in the completedTasks HTAB.
|
||||
*/
|
||||
static bool
|
||||
IsTaskAlreadyCompleted(Task *task, HTAB *completedTasks)
|
||||
{
|
||||
bool found;
|
||||
|
||||
TaskHashKey taskKey = { task->jobId, task->taskId };
|
||||
hash_search(completedTasks, &taskKey, HASH_ENTER, &found);
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsAllDependencyCompleted return true if the given task's
|
||||
* dependencies are completed.
|
||||
*/
|
||||
static bool
|
||||
IsAllDependencyCompleted(Task *targetTask, HTAB *completedTasks)
|
||||
{
|
||||
ListCell *taskCell = NULL;
|
||||
bool found = false;
|
||||
|
||||
|
||||
foreach(taskCell, targetTask->dependentTaskList)
|
||||
{
|
||||
Task *task = (Task *) lfirst(taskCell);
|
||||
TaskHashKey taskKey = { task->jobId, task->taskId };
|
||||
|
||||
hash_search(completedTasks, &taskKey, HASH_FIND, &found);
|
||||
if (!found)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitHashTableInfo returns hash table info, the hash table is
|
||||
* configured to be created in the CurrentMemoryContext so that
|
||||
* it will be cleaned when this memory context gets freed/reset.
|
||||
*/
|
||||
static HASHCTL
|
||||
InitHashTableInfo()
|
||||
{
|
||||
HASHCTL info;
|
||||
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(TaskHashKey);
|
||||
info.entrysize = sizeof(TaskHashEntry);
|
||||
info.hash = TaskHash;
|
||||
info.match = TaskHashCompare;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
|
||||
return info;
|
||||
}
|
||||
|
||||
|
||||
static uint32
|
||||
TaskHash(const void *key, Size keysize)
|
||||
{
|
||||
TaskHashKey *taskKey = (TaskHashKey *) key;
|
||||
uint32 hash = 0;
|
||||
|
||||
hash = hash_combine(hash, hash_any((unsigned char *) &taskKey->jobId,
|
||||
sizeof(int64)));
|
||||
hash = hash_combine(hash, hash_uint32(taskKey->taskId));
|
||||
|
||||
return hash;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
TaskHashCompare(const void *key1, const void *key2, Size keysize)
|
||||
{
|
||||
TaskHashKey *taskKey1 = (TaskHashKey *) key1;
|
||||
TaskHashKey *taskKey2 = (TaskHashKey *) key2;
|
||||
if (taskKey1->jobId != taskKey2->jobId || taskKey1->taskId != taskKey2->taskId)
|
||||
{
|
||||
return 1;
|
||||
}
|
||||
else
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/recursive_planning.h"
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/multi_resowner.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/subplan_execution.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/log_utils.h"
|
||||
|
@ -35,6 +36,8 @@ bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format
|
|||
bool EnableRepartitionJoins = false;
|
||||
|
||||
|
||||
static bool HasReplicatedDistributedTable(List *relationOids);
|
||||
|
||||
/*
|
||||
* JobExecutorType selects the executor type for the given distributedPlan using the task
|
||||
* executor type config value. The function then checks if the given distributedPlan needs
|
||||
|
@ -89,6 +92,7 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
|||
|
||||
Assert(distributedPlan->modLevel == ROW_MODIFY_READONLY);
|
||||
|
||||
|
||||
if (executorType == MULTI_EXECUTOR_ADAPTIVE)
|
||||
{
|
||||
/* if we have repartition jobs with adaptive executor and repartition
|
||||
|
@ -104,12 +108,11 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
|||
errhint("Set citus.enable_repartition_joins to on "
|
||||
"to enable repartitioning")));
|
||||
}
|
||||
|
||||
ereport(DEBUG1, (errmsg(
|
||||
"cannot use adaptive executor with repartition jobs"),
|
||||
errhint("Since you enabled citus.enable_repartition_joins "
|
||||
"Citus chose to use task-tracker.")));
|
||||
return MULTI_EXECUTOR_TASK_TRACKER;
|
||||
if (HasReplicatedDistributedTable(distributedPlan->relationIdList))
|
||||
{
|
||||
return MULTI_EXECUTOR_TASK_TRACKER;
|
||||
}
|
||||
return MULTI_EXECUTOR_ADAPTIVE;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -131,6 +134,35 @@ JobExecutorType(DistributedPlan *distributedPlan)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* HasReplicatedDistributedTable returns true if there is any
|
||||
* table in the given list that is:
|
||||
* - not a reference table
|
||||
* - has replication factor > 1
|
||||
*/
|
||||
static bool
|
||||
HasReplicatedDistributedTable(List *relationOids)
|
||||
{
|
||||
ListCell *oidCell = NULL;
|
||||
|
||||
foreach(oidCell, relationOids)
|
||||
{
|
||||
Oid oid = lfirst_oid(oidCell);
|
||||
char partitionMethod = PartitionMethod(oid);
|
||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
uint32 tableReplicationFactor = TableShardReplicationFactor(oid);
|
||||
if (tableReplicationFactor > 1)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MaxMasterConnectionCount returns the number of connections a master can open.
|
||||
* A master cannot create more than a certain number of file descriptors (FDs).
|
||||
|
|
|
@ -41,6 +41,7 @@
|
|||
#include "distributed/subplan_execution.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/multi_task_tracker_executor.h"
|
||||
#include "storage/fd.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
@ -72,7 +73,7 @@ typedef struct TaskMapEntry
|
|||
|
||||
|
||||
/* Local functions forward declarations to init tasks and trackers */
|
||||
static List * TaskAndExecutionList(List *jobTaskList);
|
||||
|
||||
static HTAB * TaskHashCreate(uint32 taskHashSize);
|
||||
static Task * TaskHashEnter(HTAB *taskHash, Task *task);
|
||||
static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId,
|
||||
|
@ -454,7 +455,7 @@ MultiTaskTrackerExecute(Job *job)
|
|||
* struct, associates the task execution with the task, and adds the task and its
|
||||
* execution to a list. The function then returns the list.
|
||||
*/
|
||||
static List *
|
||||
List *
|
||||
TaskAndExecutionList(List *jobTaskList)
|
||||
{
|
||||
List *taskAndExecutionList = NIL;
|
||||
|
|
|
@ -0,0 +1,171 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* repartition_join_execution.c
|
||||
*
|
||||
* This file contains repartition specific logic.
|
||||
* ExecuteDependentTasks takes a list of top level tasks. Its logic is as follows:
|
||||
* - It generates all the tasks by descending in the tasks tree. Note that each task
|
||||
* has a dependentTaskList.
|
||||
* - It generates FetchTask queryStrings with the MapTask queries. It uses the first replicate to
|
||||
* fetch data when replication factor is > 1. Note that if a task fails in any replica adaptive executor
|
||||
* gives an error, so if we come to a fetchTask we know for sure that its dependedMapTask is executed in all
|
||||
* replicas.
|
||||
* - It creates schemas in each worker in a single transaction to store intermediate results.
|
||||
* - It iterates all tasks and finds the ones whose dependencies are already executed, and executes them with
|
||||
* adaptive executor logic.
|
||||
*
|
||||
*
|
||||
* Repartition queries do not begin a transaction even if we are in
|
||||
* a transaction block. As we dont begin a transaction, they wont see the
|
||||
* DDLs that happened earlier in the transaction because we dont have that
|
||||
* transaction id with repartition queries. Therefore we error in this case.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "access/hash.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
|
||||
#include "distributed/directed_acylic_graph_execution.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/multi_task_tracker_executor.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/transmit.h"
|
||||
|
||||
|
||||
static List * CreateTemporarySchemasForMergeTasks(Job *topLevelJob);
|
||||
static List * ExtractJobsInJobTree(Job *job);
|
||||
static void TraverseJobTree(Job *curJob, List **jobs);
|
||||
static char * GenerateCreateSchemasCommand(List *jobIds);
|
||||
static char * GenerateJobCommands(List *jobIds, char *templateCommand);
|
||||
static char * GenerateDeleteJobsCommand(List *jobIds);
|
||||
|
||||
|
||||
/*
|
||||
* ExecuteDependentTasks executes all tasks except the top level tasks
|
||||
* in order from the task tree. At a time, it can execute different tasks from
|
||||
* different jobs.
|
||||
*/
|
||||
List *
|
||||
ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
|
||||
{
|
||||
EnsureNoModificationsHaveBeenDone();
|
||||
|
||||
List *allTasks = TaskAndExecutionList(topLevelTasks);
|
||||
|
||||
List *jobIds = CreateTemporarySchemasForMergeTasks(topLevelJob);
|
||||
|
||||
ExecuteTasksInDependencyOrder(allTasks, topLevelTasks);
|
||||
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTemporarySchemasForMergeTasks creates the necessary schemas that will be used
|
||||
* later in each worker. Single transaction is used to create the schemas.
|
||||
*/
|
||||
static List *
|
||||
CreateTemporarySchemasForMergeTasks(Job *topLeveLJob)
|
||||
{
|
||||
List *jobIds = ExtractJobsInJobTree(topLeveLJob);
|
||||
char *createSchemasCommand = GenerateCreateSchemasCommand(jobIds);
|
||||
SendCommandToAllWorkers(createSchemasCommand, CitusExtensionOwnerName());
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractJobsInJobTree returns all job ids in the job tree
|
||||
* where the given job is root.
|
||||
*/
|
||||
static List *
|
||||
ExtractJobsInJobTree(Job *job)
|
||||
{
|
||||
List *jobIds = NIL;
|
||||
TraverseJobTree(job, &jobIds);
|
||||
return jobIds;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TraverseJobTree does a dfs in the current job and adds
|
||||
* all of its job ids.
|
||||
*/
|
||||
static void
|
||||
TraverseJobTree(Job *curJob, List **jobIds)
|
||||
{
|
||||
ListCell *jobCell = NULL;
|
||||
*jobIds = lappend(*jobIds, (void *) curJob->jobId);
|
||||
|
||||
foreach(jobCell, curJob->dependentJobList)
|
||||
{
|
||||
Job *childJob = (Job *) lfirst(jobCell);
|
||||
TraverseJobTree(childJob, jobIds);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateCreateSchemasCommand returns concatanated create schema commands.
|
||||
*/
|
||||
static char *
|
||||
GenerateCreateSchemasCommand(List *jobIds)
|
||||
{
|
||||
return GenerateJobCommands(jobIds, WORKER_CREATE_SCHEMA_QUERY);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateJobCommands returns concatenated commands with the given template
|
||||
* command for each job id from the given job ids. The returned command is
|
||||
* exactly list_length(jobIds) subcommands.
|
||||
* E.g create_schema(jobId1); create_schema(jobId2); ...
|
||||
* This way we can send the command in just one latency to a worker.
|
||||
*/
|
||||
static char *
|
||||
GenerateJobCommands(List *jobIds, char *templateCommand)
|
||||
{
|
||||
StringInfo createSchemaCommand = makeStringInfo();
|
||||
ListCell *jobIdCell = NULL;
|
||||
|
||||
foreach(jobIdCell, jobIds)
|
||||
{
|
||||
uint64 jobId = (uint64) lfirst(jobIdCell);
|
||||
appendStringInfo(createSchemaCommand, templateCommand, jobId);
|
||||
}
|
||||
return createSchemaCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DoRepartitionCleanup removes the temporary job directories and schemas that are
|
||||
* used for repartition queries for the given job ids.
|
||||
*/
|
||||
void
|
||||
DoRepartitionCleanup(List *jobIds)
|
||||
{
|
||||
SendOptionalCommandListToAllWorkers(list_make1(GenerateDeleteJobsCommand(
|
||||
jobIds)),
|
||||
CitusExtensionOwnerName());
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateDeleteJobsCommand returns concatanated remove job dir commands.
|
||||
*/
|
||||
static char *
|
||||
GenerateDeleteJobsCommand(List *jobIds)
|
||||
{
|
||||
return GenerateJobCommands(jobIds, WORKER_REPARTITION_CLEANUP_QUERY);
|
||||
}
|
|
@ -28,6 +28,7 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/listutils.h"
|
||||
|
|
|
@ -4499,11 +4499,28 @@ MergeTaskList(MapMergeJob *mapMergeJob, List *mapTaskList, uint32 taskIdIndex)
|
|||
{
|
||||
Task *mapTask = (Task *) lfirst(mapTaskCell);
|
||||
|
||||
/* we need node names for the query, and we'll resolve them later */
|
||||
char *undefinedQueryString = NULL;
|
||||
/* find the node name/port for map task's execution */
|
||||
List *mapTaskPlacementList = mapTask->taskPlacementList;
|
||||
|
||||
ShardPlacement *mapTaskPlacement = linitial(mapTaskPlacementList);
|
||||
char *mapTaskNodeName = mapTaskPlacement->nodeName;
|
||||
uint32 mapTaskNodePort = mapTaskPlacement->nodePort;
|
||||
|
||||
/*
|
||||
* If replication factor is 1, then we know that we will use the first and
|
||||
* the only placement. If task tracker is used, then it will regenerate the
|
||||
* query string because if there are multiple placements then it does not
|
||||
* know in which placement the parent map task was successful.
|
||||
*/
|
||||
StringInfo mapFetchQueryString = makeStringInfo();
|
||||
appendStringInfo(mapFetchQueryString, MAP_OUTPUT_FETCH_COMMAND,
|
||||
mapTask->jobId, mapTask->taskId, partitionId,
|
||||
mergeTaskId, /* fetch results to merge task */
|
||||
mapTaskNodeName, mapTaskNodePort);
|
||||
|
||||
Task *mapOutputFetchTask = CreateBasicTask(jobId, taskIdIndex,
|
||||
MAP_OUTPUT_FETCH_TASK,
|
||||
undefinedQueryString);
|
||||
mapFetchQueryString->data);
|
||||
mapOutputFetchTask->partitionId = partitionId;
|
||||
mapOutputFetchTask->upstreamTaskId = mergeTaskId;
|
||||
mapOutputFetchTask->dependentTaskList = list_make1(mapTask);
|
||||
|
|
|
@ -62,6 +62,7 @@
|
|||
#include "distributed/worker_manager.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/worker_shard_visibility.h"
|
||||
#include "distributed/adaptive_executor.h"
|
||||
#include "port/atomics.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "optimizer/planner.h"
|
||||
|
|
|
@ -4,6 +4,8 @@ COMMENT ON COLUMN pg_catalog.pg_dist_node.shouldhaveshards IS
|
|||
|
||||
#include "udfs/master_set_node_property/9.1-1.sql"
|
||||
#include "udfs/master_drain_node/9.1-1.sql"
|
||||
#include "udfs/worker_create_schema/9.1-1.sql"
|
||||
#include "udfs/worker_repartition_cleanup/9.1-1.sql"
|
||||
#include "udfs/rebalance_table_shards/9.1-1.sql"
|
||||
#include "udfs/get_rebalance_table_shards_plan/9.1-1.sql"
|
||||
#include "udfs/master_add_node/9.1-1.sql"
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
CREATE FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_create_schema$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
IS 'create schema in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC;
|
|
@ -0,0 +1,8 @@
|
|||
CREATE FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_create_schema$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_create_schema(bigint)
|
||||
IS 'create schema in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_create_schema(bigint) FROM PUBLIC;
|
|
@ -0,0 +1,8 @@
|
|||
CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_repartition_cleanup$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
|
||||
IS 'remove job in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) FROM PUBLIC;
|
|
@ -0,0 +1,8 @@
|
|||
CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_repartition_cleanup$$;
|
||||
COMMENT ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint)
|
||||
IS 'remove job in remote node';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.worker_repartition_cleanup(bigint) FROM PUBLIC;
|
|
@ -414,7 +414,7 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
case XACT_EVENT_PARALLEL_PRE_COMMIT:
|
||||
case XACT_EVENT_PRE_PREPARE:
|
||||
{
|
||||
if (CurrentCoordinatedTransactionState > COORD_TRANS_NONE)
|
||||
if (InCoordinatedTransaction())
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot use 2PC in transactions involving "
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/pg_dist_transaction.h"
|
||||
|
@ -42,6 +43,8 @@ static void SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet,
|
|||
Oid *parameterTypes,
|
||||
const char *const *parameterValues);
|
||||
static void ErrorIfAnyMetadataNodeOutOfSync(List *metadataNodeList);
|
||||
static void SendCommandListToAllWorkersInternal(List *commandList, bool failOnError,
|
||||
char *superuser);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -117,6 +120,72 @@ SendCommandToWorkersWithMetadata(const char *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendCommandToAllWorkers sends the given command to
|
||||
* all workers as a superuser.
|
||||
*/
|
||||
void
|
||||
SendCommandToAllWorkers(char *command, char *superuser)
|
||||
{
|
||||
SendCommandListToAllWorkers(list_make1(command), superuser);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendCommandListToAllWorkers sends the given command to all workers in
|
||||
* a single transaction.
|
||||
*/
|
||||
void
|
||||
SendCommandListToAllWorkers(List *commandList, char *superuser)
|
||||
{
|
||||
SendCommandListToAllWorkersInternal(commandList, true, superuser);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendCommandListToAllWorkersInternal sends the given command to all workers in a single
|
||||
* transaction as a superuser. If failOnError is false, then it continues sending the commandList to other
|
||||
* workers even if it fails in one of them.
|
||||
*/
|
||||
static void
|
||||
SendCommandListToAllWorkersInternal(List *commandList, bool failOnError, char *superuser)
|
||||
{
|
||||
ListCell *workerNodeCell = NULL;
|
||||
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);
|
||||
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
if (failOnError)
|
||||
{
|
||||
SendCommandListToWorkerInSingleTransaction(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
superuser,
|
||||
commandList);
|
||||
}
|
||||
else
|
||||
{
|
||||
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
|
||||
workerNode->workerPort,
|
||||
superuser,
|
||||
commandList);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendOptionalCommandListToAllWorkers sends the given command to all works in
|
||||
* a single transaction as a superuser. If there is an error during the command, it is ignored
|
||||
* so this method doesnt return any error.
|
||||
*/
|
||||
void
|
||||
SendOptionalCommandListToAllWorkers(List *commandList, char *superuser)
|
||||
{
|
||||
SendCommandListToAllWorkersInternal(commandList, false, superuser);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TargetWorkerSetNodeList returns a list of WorkerNode's that satisfies the
|
||||
* TargetWorkerSet.
|
||||
|
|
|
@ -75,7 +75,6 @@ static void TrackerSigHupHandler(SIGNAL_ARGS);
|
|||
static void TrackerShutdownHandler(SIGNAL_ARGS);
|
||||
|
||||
/* Local functions forward declarations */
|
||||
static void TrackerCleanupJobDirectories(void);
|
||||
static void TrackerCleanupJobSchemas(void);
|
||||
static void TrackerCleanupConnections(HTAB *WorkerTasksHash);
|
||||
static void TrackerRegisterShutDown(HTAB *WorkerTasksHash);
|
||||
|
@ -338,7 +337,7 @@ WorkerTasksHashFind(uint64 jobId, uint32 taskId)
|
|||
* associated files cannot be cleaned up safely. We therefore perform this
|
||||
* cleanup when the process restarts.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
TrackerCleanupJobDirectories(void)
|
||||
{
|
||||
/* use the default tablespace in {datadir}/base */
|
||||
|
|
|
@ -42,7 +42,6 @@
|
|||
|
||||
/* Local functions forward declarations */
|
||||
static bool TaskTrackerRunning(void);
|
||||
static void CreateJobSchema(StringInfo schemaName);
|
||||
static void CreateTask(uint64 jobId, uint32 taskId, char *taskCallString);
|
||||
static void UpdateTask(WorkerTask *workerTask, char *taskCallString);
|
||||
static void CleanupTask(WorkerTask *workerTask);
|
||||
|
@ -308,7 +307,7 @@ TaskTrackerRunning(void)
|
|||
* Further note that the created schema does not become visible to other
|
||||
* processes until the transaction commits.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
CreateJobSchema(StringInfo schemaName)
|
||||
{
|
||||
const char *queryString = NULL;
|
||||
|
|
|
@ -31,6 +31,8 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/task_tracker_protocol.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "executor/spi.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "parser/parse_type.h"
|
||||
|
@ -39,6 +41,8 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/snapmgr.h"
|
||||
#include "utils/syscache.h"
|
||||
#include "commands/schemacmds.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
|
||||
|
||||
/* Local functions forward declarations */
|
||||
|
@ -53,6 +57,50 @@ static void CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relatio
|
|||
PG_FUNCTION_INFO_V1(worker_merge_files_into_table);
|
||||
PG_FUNCTION_INFO_V1(worker_merge_files_and_run_query);
|
||||
PG_FUNCTION_INFO_V1(worker_cleanup_job_schema_cache);
|
||||
PG_FUNCTION_INFO_V1(worker_create_schema);
|
||||
PG_FUNCTION_INFO_V1(worker_repartition_cleanup);
|
||||
|
||||
|
||||
/*
|
||||
* worker_create_schema creates a schema with the given job id in local.
|
||||
*/
|
||||
Datum
|
||||
worker_create_schema(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint64 jobId = PG_GETARG_INT64(0);
|
||||
|
||||
StringInfo jobSchemaName = JobSchemaName(jobId);
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
bool schemaExists = JobSchemaExists(jobSchemaName);
|
||||
if (!schemaExists)
|
||||
{
|
||||
CreateJobSchema(jobSchemaName);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* worker_repartition_cleanup removes the job directory and schema with the given job id .
|
||||
*/
|
||||
Datum
|
||||
worker_repartition_cleanup(PG_FUNCTION_ARGS)
|
||||
{
|
||||
uint64 jobId = PG_GETARG_INT64(0);
|
||||
StringInfo jobDirectoryName = JobDirectoryName(jobId);
|
||||
StringInfo jobSchemaName = JobSchemaName(jobId);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||
|
||||
EnsureSchemaOwner(schemaId);
|
||||
CitusRemoveDirectory(jobDirectoryName);
|
||||
RemoveJobSchema(jobSchemaName);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
|
@ -135,7 +183,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
|||
userId);
|
||||
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
#ifndef ADAPTIVE_EXECUTOR_H
|
||||
#define ADAPTIVE_EXECUTOR_H
|
||||
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
||||
/* GUC, determining whether Citus opens 1 connection per task */
|
||||
extern bool ForceMaxQueryParallelization;
|
||||
extern int MaxAdaptiveExecutorPoolSize;
|
||||
|
||||
/* GUC, number of ms to wait between opening connections to the same worker */
|
||||
extern int ExecutorSlowStartInterval;
|
||||
|
||||
extern uint64 ExecuteTaskList(RowModifyLevel modLevel, List *taskList, int
|
||||
targetPoolSize);
|
||||
extern uint64 ExecuteTaskListOutsideTransaction(RowModifyLevel modLevel, List *taskList,
|
||||
int
|
||||
targetPoolSize);
|
||||
|
||||
|
||||
#endif /* ADAPTIVE_EXECUTOR_H */
|
|
@ -49,8 +49,10 @@ enum MultiConnectionMode
|
|||
/* open a connection per (co-located set of) placement(s) */
|
||||
CONNECTION_PER_PLACEMENT = 1 << 3,
|
||||
|
||||
OUTSIDE_TRANSACTION = 1 << 4,
|
||||
|
||||
/* connection has not been used to access data */
|
||||
REQUIRE_SIDECHANNEL = 1 << 4
|
||||
REQUIRE_SIDECHANNEL = 1 << 5
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -93,6 +95,9 @@ typedef struct MultiConnection
|
|||
/* underlying libpq connection */
|
||||
struct pg_conn *pgConn;
|
||||
|
||||
/* connection id */
|
||||
uint64 connectionId;
|
||||
|
||||
/* state of the connection */
|
||||
MultiConnectionState connectionState;
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* directed_acylic_graph_execution.h
|
||||
* Execution logic for directed acylic graph tasks.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef DIRECTED_ACYLIC_GRAPH_EXECUTION_H
|
||||
#define DIRECTED_ACYLIC_GRAPH_EXECUTION_H
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
extern void ExecuteTasksInDependencyOrder(List *allTasks, List *excludedTasks);
|
||||
|
||||
|
||||
#endif /* DIRECTED_ACYLIC_GRAPH_EXECUTION_H */
|
|
@ -36,6 +36,12 @@
|
|||
#define JOB_CLEANUP_QUERY "SELECT task_tracker_cleanup_job("UINT64_FORMAT ")"
|
||||
#define JOB_CLEANUP_TASK_ID INT_MAX
|
||||
|
||||
/* Adaptive executor repartioning related defines */
|
||||
#define WORKER_CREATE_SCHEMA_QUERY "SELECT worker_create_schema (" UINT64_FORMAT ");"
|
||||
#define WORKER_REPARTITION_CLEANUP_QUERY "SELECT worker_repartition_cleanup (" \
|
||||
UINT64_FORMAT \
|
||||
");"
|
||||
|
||||
|
||||
/* Enumeration to track one task's execution status */
|
||||
typedef enum
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
|
||||
|
||||
#ifndef MULTI_TASK_TRACKER_EXECUTOR_H
|
||||
#define MULTI_TASK_TRACKER_EXECUTOR_H
|
||||
|
||||
extern List * TaskAndExecutionList(List *jobTaskList);
|
||||
|
||||
#endif /* MULTI_TASK_TRACKER_EXECUTOR_H */
|
|
@ -0,0 +1,19 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* repartition_join_execution.h
|
||||
* Execution logic for repartition queries.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef REPARTITION_JOIN_EXECUTION_H
|
||||
#define REPARTITION_JOIN_EXECUTION_H
|
||||
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob);
|
||||
extern void DoRepartitionCleanup(List *jobIds);
|
||||
|
||||
|
||||
#endif /* REPARTITION_JOIN_EXECUTION_H */
|
|
@ -109,6 +109,8 @@ typedef struct WorkerTasksSharedStateData
|
|||
} WorkerTasksSharedStateData;
|
||||
|
||||
|
||||
extern void TrackerCleanupJobDirectories(void);
|
||||
|
||||
/* Config variables managed via guc.c */
|
||||
extern int TaskTrackerDelay;
|
||||
extern int MaxTrackedTasksPerNode;
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
#include "fmgr.h"
|
||||
|
||||
extern void CreateJobSchema(StringInfo schemaName);
|
||||
|
||||
/* Function declarations for distributed task management */
|
||||
extern Datum task_tracker_assign_task(PG_FUNCTION_ARGS);
|
||||
|
|
|
@ -148,6 +148,7 @@ extern Datum worker_apply_shard_ddl_command(PG_FUNCTION_ARGS);
|
|||
extern Datum worker_range_partition_table(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_hash_partition_table(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_merge_files_into_table(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_create_schema(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_merge_files_and_run_query(PG_FUNCTION_ARGS);
|
||||
extern Datum worker_cleanup_job_schema_cache(PG_FUNCTION_ARGS);
|
||||
|
||||
|
|
|
@ -42,6 +42,9 @@ extern void SendBareCommandListToMetadataWorkers(List *commandList);
|
|||
extern int SendBareOptionalCommandListToAllWorkersAsUser(List *commandList,
|
||||
const char *user);
|
||||
extern void EnsureNoModificationsHaveBeenDone(void);
|
||||
extern void SendCommandListToAllWorkers(List *commandList, char *superuser);
|
||||
extern void SendOptionalCommandListToAllWorkers(List *commandList, char *superuser);
|
||||
extern void SendCommandToAllWorkers(char *command, char *superuser);
|
||||
extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName,
|
||||
int32 nodePort,
|
||||
const char *nodeUser,
|
||||
|
|
|
@ -0,0 +1,124 @@
|
|||
CREATE SCHEMA adaptive_executor;
|
||||
SET search_path TO adaptive_executor;
|
||||
SET citus.task_executor_type to 'adaptive';
|
||||
SET citus.shard_replication_factor to 1;
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
CREATE TABLE ab(a int, b int);
|
||||
SELECT create_distributed_table('ab', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO ab SELECT *,* FROM generate_series(1,10);
|
||||
SELECT COUNT(*) FROM ab k, ab l
|
||||
WHERE k.a = l.b;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(*) FROM ab k, ab l, ab m, ab t
|
||||
WHERE k.a = l.b AND k.a = m.b AND t.b = l.a;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
count
|
||||
-------
|
||||
10
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
INSERT INTO ab values(1, 2);
|
||||
-- DDL happened before repartition query in a transaction block, so this should error.
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
ROLLBACK;
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
|
||||
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
|
||||
CREATE TABLE ref_table (id int, sum int, avg float);
|
||||
SELECT create_distributed_table('single_hash_repartition_first', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('single_hash_repartition_second', 'id');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('ref_table');
|
||||
create_reference_table
|
||||
------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- single hash repartition after bcast joins
|
||||
EXPLAIN SELECT
|
||||
count(*)
|
||||
FROM
|
||||
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
|
||||
WHERE
|
||||
r1.id = t1.id AND t2.sum = t1.id;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------
|
||||
Aggregate (cost=0.00..0.00 rows=0 width=0)
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
Map Task Count: 4
|
||||
Merge Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
-- a more complicated join order, first colocated join, later single hash repartition join
|
||||
EXPLAIN SELECT
|
||||
count(*)
|
||||
FROM
|
||||
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
|
||||
WHERE
|
||||
t1.id = t2.id AND t1.sum = t3.id;
|
||||
QUERY PLAN
|
||||
----------------------------------------------------------------------
|
||||
Aggregate (cost=0.00..0.00 rows=0 width=0)
|
||||
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
Map Task Count: 4
|
||||
Merge Task Count: 4
|
||||
(7 rows)
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO OFF;
|
||||
DROP SCHEMA adaptive_executor CASCADE;
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table ab
|
||||
drop cascades to table single_hash_repartition_first
|
||||
drop cascades to table single_hash_repartition_second
|
||||
drop cascades to table ref_table
|
|
@ -1,4 +1,5 @@
|
|||
SET citus.enable_repartition_joins to ON;
|
||||
SET citus.task_executor_type to 'task-tracker';
|
||||
SET citus.max_intermediate_result_size TO 2;
|
||||
-- should fail because the copy size is ~4kB for each cte
|
||||
WITH cte AS
|
||||
|
@ -295,3 +296,4 @@ SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
|
|||
1 | Wed Nov 22 22:51:43.132261 2017 | 4 | 1 | 2
|
||||
(10 rows)
|
||||
|
||||
SET citus.task_executor_type to 'adaptive';
|
||||
|
|
|
@ -121,12 +121,10 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Task-Tracker)
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
|
@ -188,12 +186,10 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Task-Tracker)
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
|
@ -257,12 +253,10 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 12
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
QUERY PLAN
|
||||
-------------------------------------------------------------------
|
||||
Aggregate
|
||||
-> Custom Scan (Citus Task-Tracker)
|
||||
-> Custom Scan (Citus Adaptive)
|
||||
Task Count: 4
|
||||
Tasks Shown: None, not supported for re-partition queries
|
||||
-> MapMergeJob
|
||||
|
|
|
@ -423,8 +423,6 @@ GROUP BY
|
|||
types
|
||||
ORDER BY
|
||||
types;
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 16_1 for subquery SELECT max(events."time") AS max, 0 AS event, events.user_id FROM public.events_table events, public.users_table users WHERE ((events.user_id OPERATOR(pg_catalog.=) users.value_2) AND (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2]))) GROUP BY events.user_id
|
||||
DEBUG: generating subplan 16_2 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 0 AS event, events.user_id FROM public.events_table events WHERE (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2]))) events_subquery_1
|
||||
DEBUG: generating subplan 16_3 for subquery SELECT "time", event, user_id FROM (SELECT events."time", 2 AS event, events.user_id FROM public.events_table events WHERE (events.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[3, 4]))) events_subquery_3
|
||||
|
|
|
@ -42,8 +42,6 @@ SET citus.shard_replication_factor to 1;
|
|||
SET citus.enable_repartition_joins to ON;
|
||||
SELECT count(*) FROM test_table_1, test_table_2 WHERE test_table_1.id = test_table_2.id;
|
||||
LOG: join order: [ "test_table_1" ][ single range partition join "test_table_2" ]
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
count
|
||||
-------
|
||||
9
|
||||
|
|
|
@ -36,7 +36,6 @@ FROM
|
|||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8)) as bar
|
||||
WHERE
|
||||
foo.user_id = bar.user_id;$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 1_1 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: Plan 1 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('1_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)
|
||||
valid
|
||||
|
@ -52,9 +51,7 @@ FROM
|
|||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (5,6,7,8)) as bar
|
||||
WHERE
|
||||
foo.user_id = bar.user_id;$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 3_1 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 3_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: Plan 3 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('3_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('3_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)
|
||||
valid
|
||||
|
@ -76,7 +73,6 @@ WHERE
|
|||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.value_2 AND event_type IN (5,6));$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 6_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6])))
|
||||
DEBUG: Plan 6 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM public.users_table WHERE (value_1 OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('6_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))
|
||||
valid
|
||||
|
@ -94,7 +90,6 @@ SELECT count(*) FROM q1, (SELECT
|
|||
WHERE
|
||||
users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) as bar WHERE bar.user_id = q1.user_id ;$$);
|
||||
DEBUG: generating subplan 8_1 for CTE q1: SELECT user_id FROM public.users_table
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 8_2 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) q1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('8_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) bar WHERE (bar.user_id OPERATOR(pg_catalog.=) q1.user_id)
|
||||
valid
|
||||
|
@ -106,7 +101,6 @@ DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS cou
|
|||
SELECT true AS valid FROM explain_json($$
|
||||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) UNION
|
||||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8));$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 11_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: generating subplan 11_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: Plan 11 query after replacing subqueries and CTEs: SELECT intermediate_result.user_id FROM read_intermediate_result('11_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION SELECT intermediate_result.user_id FROM read_intermediate_result('11_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)
|
||||
|
@ -143,11 +137,9 @@ FROM (
|
|||
) q
|
||||
ORDER BY 2 DESC, 1;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 14_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: push down of limit count: 5
|
||||
DEBUG: generating subplan 14_2 for subquery SELECT user_id FROM public.users_table WHERE ((value_2 OPERATOR(pg_catalog.>=) 5) AND (EXISTS (SELECT intermediate_result.user_id FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) LIMIT 5
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 14_3 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: generating subplan 14_4 for subquery SELECT DISTINCT ON ((e.event_type)::text) (e.event_type)::text AS event, e."time", e.user_id FROM public.users_table u, public.events_table e, (SELECT intermediate_result.user_id FROM read_intermediate_result('14_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('14_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))))
|
||||
DEBUG: generating subplan 14_5 for subquery SELECT t.event, array_agg(t.user_id) AS events_table FROM (SELECT intermediate_result.event, intermediate_result."time", intermediate_result.user_id FROM read_intermediate_result('14_4'::text, 'binary'::citus_copy_format) intermediate_result(event text, "time" timestamp without time zone, user_id integer)) t, public.users_table WHERE (users_table.value_1 OPERATOR(pg_catalog.=) (t.event)::integer) GROUP BY t.event
|
||||
|
|
|
@ -162,7 +162,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo.user_id = bar.user_id AND
|
||||
foo.event_type IN (SELECT event_type FROM events_table WHERE user_id < 4);
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 16_1 for subquery SELECT users_table.user_id, events_table.event_type FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: generating subplan 16_2 for subquery SELECT event_type FROM public.events_table WHERE (user_id OPERATOR(pg_catalog.<) 4)
|
||||
DEBUG: Plan 16 query after replacing subqueries and CTEs: SELECT foo.user_id FROM (SELECT intermediate_result.user_id, intermediate_result.event_type FROM read_intermediate_result('16_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event_type integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE ((foo.user_id OPERATOR(pg_catalog.=) bar.user_id) AND (foo.event_type OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.event_type FROM read_intermediate_result('16_2'::text, 'binary'::citus_copy_format) intermediate_result(event_type integer))))
|
||||
|
@ -187,9 +186,7 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
|
||||
) as foo_top, events_table WHERE events_table.user_id = foo_top.user_id;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 19_1 for subquery SELECT users_table.user_id, events_table.event_type FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 19_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.event_type) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: generating subplan 19_3 for subquery SELECT event_type FROM public.events_table WHERE (user_id OPERATOR(pg_catalog.=) 5)
|
||||
DEBUG: generating subplan 19_4 for subquery SELECT foo.user_id, random() AS random FROM (SELECT intermediate_result.user_id, intermediate_result.event_type FROM read_intermediate_result('19_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, event_type integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('19_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((foo.user_id OPERATOR(pg_catalog.=) bar.user_id) AND (foo.event_type OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.event_type FROM read_intermediate_result('19_3'::text, 'binary'::citus_copy_format) intermediate_result(event_type integer))))
|
||||
|
@ -254,7 +251,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo1.user_id = foo5.user_id
|
||||
) as foo_top;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 26_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20])))
|
||||
DEBUG: Plan 26 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo5.user_id))) foo_top
|
||||
valid
|
||||
|
@ -284,7 +280,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo1.user_id = foo5.value_1
|
||||
) as foo_top;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 28_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: generating subplan 28_2 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20])))
|
||||
DEBUG: Plan 28 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('28_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('28_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo5.value_1))) foo_top
|
||||
|
@ -316,7 +311,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo2.user_id = foo5.value_1
|
||||
) as foo_top;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 31_1 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: generating subplan 31_2 for subquery SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[17, 18, 19, 20])))
|
||||
DEBUG: Plan 31 query after replacing subqueries and CTEs: SELECT user_id, random FROM (SELECT foo1.user_id, random() AS random FROM (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo1, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('31_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo2, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo3, (SELECT users_table.user_id, users_table.value_1 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))) foo4, (SELECT intermediate_result.user_id, intermediate_result.value_1 FROM read_intermediate_result('31_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, value_1 integer)) foo5 WHERE ((foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo2.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo3.user_id) AND (foo1.user_id OPERATOR(pg_catalog.=) foo4.user_id) AND (foo2.user_id OPERATOR(pg_catalog.=) foo5.value_1))) foo_top
|
||||
|
@ -350,9 +344,7 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo.user_id = bar.user_id) as bar_top
|
||||
ON (foo_top.user_id = bar_top.user_id);
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 34_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 34_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: Plan 34 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foo.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) foo_top JOIN (SELECT foo.user_id FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('34_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) foo, (SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) bar_top ON ((foo_top.user_id OPERATOR(pg_catalog.=) bar_top.user_id)))
|
||||
valid
|
||||
|
@ -417,7 +409,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
foo.user_id = bar.user_id) as bar_top
|
||||
ON (foo_top.value_2 = bar_top.user_id);
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 39_1 for subquery SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[13, 14, 15, 16])))
|
||||
DEBUG: generating subplan 39_2 for subquery SELECT foo.user_id FROM (SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[9, 10, 11, 12])))) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('39_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)
|
||||
DEBUG: Plan 39 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT foo.user_id, foo.value_2 FROM (SELECT DISTINCT users_table.user_id, users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))) foo, (SELECT DISTINCT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))) bar WHERE (foo.user_id OPERATOR(pg_catalog.=) bar.user_id)) foo_top JOIN (SELECT intermediate_result.user_id FROM read_intermediate_result('39_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar_top ON ((foo_top.value_2 OPERATOR(pg_catalog.=) bar_top.user_id)))
|
||||
|
@ -439,7 +430,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
WHERE foo.my_users = users_table.user_id) as mid_level_query
|
||||
) as bar;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 42_1 for subquery SELECT events_table.user_id AS my_users FROM public.events_table, public.users_table WHERE (events_table.event_type OPERATOR(pg_catalog.=) users_table.user_id)
|
||||
DEBUG: Plan 42 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT mid_level_query.user_id FROM (SELECT DISTINCT users_table.user_id FROM public.users_table, (SELECT intermediate_result.my_users FROM read_intermediate_result('42_1'::text, 'binary'::citus_copy_format) intermediate_result(my_users integer)) foo WHERE (foo.my_users OPERATOR(pg_catalog.=) users_table.user_id)) mid_level_query) bar
|
||||
valid
|
||||
|
@ -536,7 +526,6 @@ WHERE
|
|||
users_table, events_table
|
||||
WHERE
|
||||
users_table.user_id = events_table.value_2 AND event_type IN (5,6));$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 50_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6])))
|
||||
DEBUG: Plan 50 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM public.users_table WHERE (value_1 OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('50_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))
|
||||
valid
|
||||
|
@ -554,7 +543,6 @@ SELECT count(*) FROM q1, (SELECT
|
|||
WHERE
|
||||
users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) as bar WHERE bar.user_id = q1.user_id ;$$);
|
||||
DEBUG: generating subplan 52_1 for CTE q1: SELECT user_id FROM public.users_table
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 52_2 for subquery SELECT users_table.user_id, random() AS random FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: Plan 52 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.user_id FROM read_intermediate_result('52_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) q1, (SELECT intermediate_result.user_id, intermediate_result.random FROM read_intermediate_result('52_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, random double precision)) bar WHERE (bar.user_id OPERATOR(pg_catalog.=) q1.user_id)
|
||||
valid
|
||||
|
@ -582,7 +570,6 @@ DEBUG: Plan 55 query after replacing subqueries and CTEs: SELECT count(*) AS co
|
|||
SELECT true AS valid FROM explain_json_2($$
|
||||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.value_2 AND event_type IN (1,2,3,4)) UNION
|
||||
(SELECT users_table.user_id FROM users_table, events_table WHERE users_table.user_id = events_table.user_id AND event_type IN (5,6,7,8));$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 57_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: generating subplan 57_2 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.user_id) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: Plan 57 query after replacing subqueries and CTEs: SELECT intermediate_result.user_id FROM read_intermediate_result('57_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer) UNION SELECT intermediate_result.user_id FROM read_intermediate_result('57_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)
|
||||
|
@ -619,11 +606,9 @@ FROM (
|
|||
) q
|
||||
ORDER BY 2 DESC, 1;
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 60_1 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[1, 2, 3, 4])))
|
||||
DEBUG: push down of limit count: 5
|
||||
DEBUG: generating subplan 60_2 for subquery SELECT user_id FROM public.users_table WHERE ((value_2 OPERATOR(pg_catalog.>=) 5) AND (EXISTS (SELECT intermediate_result.user_id FROM read_intermediate_result('60_1'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)))) LIMIT 5
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 60_3 for subquery SELECT users_table.user_id FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (events_table.event_type OPERATOR(pg_catalog.=) ANY (ARRAY[5, 6, 7, 8])))
|
||||
DEBUG: generating subplan 60_4 for subquery SELECT DISTINCT ON ((e.event_type)::text) (e.event_type)::text AS event, e."time", e.user_id FROM public.users_table u, public.events_table e, (SELECT intermediate_result.user_id FROM read_intermediate_result('60_3'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar WHERE ((u.user_id OPERATOR(pg_catalog.=) e.user_id) AND (u.user_id OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.user_id FROM read_intermediate_result('60_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer))))
|
||||
DEBUG: generating subplan 60_5 for subquery SELECT t.event, array_agg(t.user_id) AS events_table FROM (SELECT intermediate_result.event, intermediate_result."time", intermediate_result.user_id FROM read_intermediate_result('60_4'::text, 'binary'::citus_copy_format) intermediate_result(event text, "time" timestamp without time zone, user_id integer)) t, public.users_table WHERE (users_table.value_1 OPERATOR(pg_catalog.=) (t.event)::integer) GROUP BY t.event
|
||||
|
@ -654,7 +639,6 @@ SELECT true AS valid FROM explain_json_2($$
|
|||
FROM
|
||||
(SELECT * FROM users_table u1 JOIN users_table u2 using(value_1)) a JOIN (SELECT value_1, random() FROM users_table) as u3 USING (value_1);
|
||||
$$);
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
DEBUG: generating subplan 68_1 for subquery SELECT u1.value_1, u1.user_id, u1."time", u1.value_2, u1.value_3, u1.value_4, u2.user_id, u2."time", u2.value_2, u2.value_3, u2.value_4 FROM (public.users_table u1 JOIN public.users_table u2 USING (value_1))
|
||||
DEBUG: Plan 68 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.value_1, intermediate_result.user_id, intermediate_result."time", intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4, intermediate_result.user_id_1 AS user_id, intermediate_result.time_1 AS "time", intermediate_result.value_2_1 AS value_2, intermediate_result.value_3_1 AS value_3, intermediate_result.value_4_1 AS value_4 FROM read_intermediate_result('68_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer, user_id integer, "time" timestamp without time zone, value_2 integer, value_3 double precision, value_4 bigint, user_id_1 integer, time_1 timestamp without time zone, value_2_1 integer, value_3_1 double precision, value_4_1 bigint)) a(value_1, user_id, "time", value_2, value_3, value_4, user_id_1, time_1, value_2_1, value_3_1, value_4_1) JOIN (SELECT users_table.value_1, random() AS random FROM public.users_table) u3 USING (value_1))
|
||||
valid
|
||||
|
|
|
@ -56,8 +56,6 @@ UPDATE distributed_table SET dept = foo.some_tenants::int FROM
|
|||
DISTINCT second_distributed_table.tenant_id as some_tenants
|
||||
FROM second_distributed_table, distributed_table WHERE second_distributed_table.dept = distributed_table.dept
|
||||
) as foo;
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT second_distributed_table.tenant_id AS some_tenants FROM recursive_dml_with_different_planner_executors.second_distributed_table, recursive_dml_with_different_planner_executors.distributed_table WHERE (second_distributed_table.dept OPERATOR(pg_catalog.=) distributed_table.dept)
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: UPDATE recursive_dml_with_different_planner_executors.distributed_table SET dept = (foo.some_tenants)::integer FROM (SELECT intermediate_result.some_tenants FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(some_tenants text)) foo
|
||||
SET citus.enable_repartition_joins to OFF;
|
||||
|
|
|
@ -326,7 +326,7 @@ DEBUG: Plan is router executable
|
|||
2
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
-- repartition is recursively planned before the set operation
|
||||
(SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y LIMIT 2) INTERSECT (((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i)) ORDER BY 1 DESC;
|
||||
DEBUG: Local tables cannot be used in distributed queries.
|
||||
|
@ -360,8 +360,6 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 53_1 for subquery SELECT t1.x FROM recursive_set_local.test t1, recursive_set_local.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y) LIMIT 2
|
||||
DEBUG: generating subplan 53_2 for subquery SELECT x FROM recursive_set_local.local_test
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
|
@ -377,7 +375,7 @@ DEBUG: Plan is router executable
|
|||
1
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_repartition_joins TO OFF;
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
RESET client_min_messages;
|
||||
DROP SCHEMA recursive_set_local CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
|
|
|
@ -913,8 +913,6 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 164_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y) LIMIT 0
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: generating subplan 164_2 for subquery SELECT x FROM recursive_union.test
|
||||
|
@ -957,8 +955,6 @@ DEBUG: pruning merge fetch taskId 10
|
|||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: pruning merge fetch taskId 11
|
||||
DETAIL: Creating dependency on merge taskId 20
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 167_1 for subquery SELECT t1.x FROM recursive_union.test t1, recursive_union.test t2 WHERE (t1.x OPERATOR(pg_catalog.=) t2.y)
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: generating subplan 167_2 for subquery SELECT x FROM recursive_union.test
|
||||
|
|
|
@ -72,8 +72,6 @@ FROM
|
|||
SELECT user_id FROM users_table
|
||||
) as bar
|
||||
WHERE foo.value_2 = bar.user_id;
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 8_1 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2))
|
||||
DEBUG: Plan 8 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('8_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_2 OPERATOR(pg_catalog.=) bar.user_id)
|
||||
count
|
||||
|
@ -100,8 +98,6 @@ FROM
|
|||
WHERE foo.value_2 = bar.user_id AND baz.value_2 = bar.user_id AND bar.user_id = baw.user_id;
|
||||
DEBUG: generating subplan 10_1 for subquery SELECT value_2 FROM public.users_table WHERE (user_id OPERATOR(pg_catalog.=) 15) OFFSET 0
|
||||
DEBUG: generating subplan 10_2 for subquery SELECT user_id FROM public.users_table OFFSET 0
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 10_3 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2))
|
||||
DEBUG: generating subplan 10_4 for subquery SELECT user_id FROM subquery_executor.users_table_local WHERE (user_id OPERATOR(pg_catalog.=) 2)
|
||||
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('10_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar, (SELECT intermediate_result.value_2 FROM read_intermediate_result('10_3'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) baz, (SELECT intermediate_result.user_id FROM read_intermediate_result('10_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) baw WHERE ((foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (baz.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (bar.user_id OPERATOR(pg_catalog.=) baw.user_id))
|
||||
|
|
|
@ -157,9 +157,7 @@ FROM
|
|||
(
|
||||
SELECT user_id FROM users_table
|
||||
) as bar
|
||||
WHERE foo.value_1 = bar.user_id;
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
WHERE foo.value_1 = bar.user_id;
|
||||
DEBUG: generating subplan 14_1 for subquery SELECT DISTINCT p1.value_1 FROM subquery_and_partitioning.partitioning_test p1, subquery_and_partitioning.partitioning_test p2 WHERE (p1.id OPERATOR(pg_catalog.=) p2.value_1)
|
||||
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.value_1 FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(value_1 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_1 OPERATOR(pg_catalog.=) bar.user_id)
|
||||
count
|
||||
|
|
|
@ -311,8 +311,6 @@ SELECT
|
|||
*
|
||||
FROM
|
||||
repartition_view;
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 23_1 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2))
|
||||
DEBUG: generating subplan 23_2 for subquery SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('23_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT users_table.user_id FROM public.users_table) bar WHERE (foo.value_2 OPERATOR(pg_catalog.=) bar.user_id)
|
||||
DEBUG: Plan 23 query after replacing subqueries and CTEs: SELECT count FROM (SELECT intermediate_result.count FROM read_intermediate_result('23_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) repartition_view
|
||||
|
@ -344,8 +342,6 @@ FROM
|
|||
all_executors_view;
|
||||
DEBUG: generating subplan 26_1 for subquery SELECT value_2 FROM public.users_table WHERE (user_id OPERATOR(pg_catalog.=) 15) OFFSET 0
|
||||
DEBUG: generating subplan 26_2 for subquery SELECT user_id FROM public.users_table OFFSET 0
|
||||
DEBUG: cannot use adaptive executor with repartition jobs
|
||||
HINT: Since you enabled citus.enable_repartition_joins Citus chose to use task-tracker.
|
||||
DEBUG: generating subplan 26_3 for subquery SELECT DISTINCT users_table.value_2 FROM public.users_table, public.events_table WHERE ((users_table.user_id OPERATOR(pg_catalog.=) events_table.value_2) AND (users_table.user_id OPERATOR(pg_catalog.<) 2))
|
||||
DEBUG: generating subplan 26_4 for subquery SELECT user_id FROM subquery_view.users_table_local WHERE (user_id OPERATOR(pg_catalog.=) 2)
|
||||
DEBUG: generating subplan 26_5 for subquery SELECT count(*) AS count FROM (SELECT intermediate_result.value_2 FROM read_intermediate_result('26_1'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) foo, (SELECT intermediate_result.user_id FROM read_intermediate_result('26_2'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) bar, (SELECT intermediate_result.value_2 FROM read_intermediate_result('26_3'::text, 'binary'::citus_copy_format) intermediate_result(value_2 integer)) baz, (SELECT intermediate_result.user_id FROM read_intermediate_result('26_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer)) baw WHERE ((foo.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (baz.value_2 OPERATOR(pg_catalog.=) bar.user_id) AND (bar.user_id OPERATOR(pg_catalog.=) baw.user_id))
|
||||
|
|
|
@ -113,6 +113,7 @@ test: multi_join_order_tpch_repartition
|
|||
# the output.
|
||||
# ----------
|
||||
test: multi_repartition_join_planning multi_repartition_join_pruning multi_repartition_join_task_assignment
|
||||
test: adaptive_executor_repartition
|
||||
|
||||
# ---------
|
||||
# Tests that modify data should run sequentially
|
||||
|
@ -124,6 +125,7 @@ test: with_prepare
|
|||
# ---------
|
||||
test: with_nested with_where with_basics with_set_operations
|
||||
test: with_modifying cte_prepared_modify cte_nested_modification
|
||||
test: ensure_no_intermediate_data_leak
|
||||
test: with_executors with_join with_partitioning with_transactions with_dml
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
CREATE SCHEMA adaptive_executor;
|
||||
SET search_path TO adaptive_executor;
|
||||
|
||||
SET citus.task_executor_type to 'adaptive';
|
||||
SET citus.shard_replication_factor to 1;
|
||||
SET citus.enable_repartition_joins TO true;
|
||||
|
||||
CREATE TABLE ab(a int, b int);
|
||||
SELECT create_distributed_table('ab', 'a');
|
||||
INSERT INTO ab SELECT *,* FROM generate_series(1,10);
|
||||
|
||||
SELECT COUNT(*) FROM ab k, ab l
|
||||
WHERE k.a = l.b;
|
||||
|
||||
SELECT COUNT(*) FROM ab k, ab l, ab m, ab t
|
||||
WHERE k.a = l.b AND k.a = m.b AND t.b = l.a;
|
||||
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
|
||||
BEGIN;
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
INSERT INTO ab values(1, 2);
|
||||
-- DDL happened before repartition query in a transaction block, so this should error.
|
||||
SELECT count(*) FROM (SELECT k.a FROM ab k, ab l WHERE k.a = l.b) first, (SELECT * FROM ab) second WHERE first.a = second.b;
|
||||
ROLLBACK;
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO ON;
|
||||
|
||||
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
|
||||
CREATE TABLE single_hash_repartition_second (id int, sum int, avg float);
|
||||
CREATE TABLE ref_table (id int, sum int, avg float);
|
||||
|
||||
|
||||
SELECT create_distributed_table('single_hash_repartition_first', 'id');
|
||||
SELECT create_distributed_table('single_hash_repartition_second', 'id');
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
||||
|
||||
-- single hash repartition after bcast joins
|
||||
EXPLAIN SELECT
|
||||
count(*)
|
||||
FROM
|
||||
ref_table r1, single_hash_repartition_second t1, single_hash_repartition_first t2
|
||||
WHERE
|
||||
r1.id = t1.id AND t2.sum = t1.id;
|
||||
|
||||
-- a more complicated join order, first colocated join, later single hash repartition join
|
||||
EXPLAIN SELECT
|
||||
count(*)
|
||||
FROM
|
||||
single_hash_repartition_first t1, single_hash_repartition_first t2, single_hash_repartition_second t3
|
||||
WHERE
|
||||
t1.id = t2.id AND t1.sum = t3.id;
|
||||
|
||||
SET citus.enable_single_hash_repartition_joins TO OFF;
|
||||
|
||||
DROP SCHEMA adaptive_executor CASCADE;
|
|
@ -1,6 +1,6 @@
|
|||
SET citus.enable_repartition_joins to ON;
|
||||
|
||||
|
||||
SET citus.task_executor_type to 'task-tracker';
|
||||
SET citus.max_intermediate_result_size TO 2;
|
||||
-- should fail because the copy size is ~4kB for each cte
|
||||
WITH cte AS
|
||||
|
@ -229,3 +229,5 @@ WITH cte AS (
|
|||
cte2.user_id = cte3.user_id AND cte2.user_id = 1
|
||||
)
|
||||
SELECT * FROM cte ORDER BY 1,2,3,4,5 LIMIT 10;
|
||||
|
||||
SET citus.task_executor_type to 'adaptive';
|
||||
|
|
|
@ -91,12 +91,12 @@ SELECT * FROM ((SELECT * FROM local_test) INTERSECT (SELECT * FROM test ORDER BY
|
|||
-- set operations and the sublink can be recursively planned
|
||||
SELECT * FROM ((SELECT x FROM test) UNION (SELECT x FROM (SELECT x FROM local_test) as foo WHERE x IN (SELECT x FROM test))) u ORDER BY 1;
|
||||
|
||||
SET citus.enable_repartition_joins TO ON;
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
|
||||
-- repartition is recursively planned before the set operation
|
||||
(SELECT x FROM test) INTERSECT (SELECT t1.x FROM test as t1, test as t2 WHERE t1.x = t2.y LIMIT 2) INTERSECT (((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i)) ORDER BY 1 DESC;
|
||||
|
||||
SET citus.enable_repartition_joins TO OFF;
|
||||
SET citus.task_executor_type TO 'adaptive';
|
||||
|
||||
RESET client_min_messages;
|
||||
DROP SCHEMA recursive_set_local CASCADE;
|
||||
|
|
|
@ -144,7 +144,6 @@ ORDER BY
|
|||
1, 2, 3, 4
|
||||
LIMIT 10;
|
||||
|
||||
|
||||
-- All combined
|
||||
WITH cte AS (
|
||||
WITH task_tracker AS (
|
||||
|
|
Loading…
Reference in New Issue