From 1f838199f8786977a1483be6906a3fe320a67fcf Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Wed, 15 Feb 2017 18:00:54 +0200 Subject: [PATCH] Use CustomScan API for query execution Custom Scan is a node in the planned statement which helps external providers to abstract data scan not just for foreign data wrappers but also for regular relations so you can benefit your version of caching or hardware optimizations. This sounds like only an abstraction on the data scan layer, but we can use it as an abstraction for our distributed queries. The only thing we need to do is to find distributable parts of the query, plan for them and replace them with a Citus Custom Scan. Then, whenever PostgreSQL hits this custom scan node in its Vulcano style execution, it will call our callback functions which run distributed plan and provides tuples to the upper node as it scans a regular relation. This means fewer code changes, fewer bugs and more supported features for us! First, in the distributed query planner phase, we create a Custom Scan which wraps the distributed plan. For real-time and task-tracker executors, we add this custom plan under the master query plan. For router executor, we directly pass the custom plan because there is not any master query. Then, we simply let the PostgreSQL executor run this plan. When it hits the custom scan node, we call the related executor parts for distributed plan, fill the tuple store in the custom scan and return results to PostgreSQL executor in Vulcano style, a tuple per XXX_ExecScan() call. * Modify planner to utilize Custom Scan node. * Create different scan methods for different executors. * Use native PostgreSQL Explain for master part of queries. --- .../distributed/executor/multi_executor.c | 525 ++++++++++++------ .../executor/multi_router_executor.c | 246 ++++---- .../distributed/planner/multi_explain.c | 43 +- .../planner/multi_master_planner.c | 79 ++- .../planner/multi_physical_planner.c | 7 - .../distributed/planner/multi_planner.c | 358 +++++++----- .../planner/multi_router_planner.c | 16 +- src/backend/distributed/shared_library_init.c | 4 +- .../distributed/utils/citus_outfuncs.c | 1 - .../distributed/utils/citus_readfuncs.c | 1 - src/include/distributed/multi_executor.h | 34 +- .../distributed/multi_master_planner.h | 2 +- .../distributed/multi_physical_planner.h | 4 +- src/include/distributed/multi_planner.h | 7 +- .../distributed/multi_router_executor.h | 13 +- src/test/regress/expected/multi_explain.out | 81 +-- src/test/regress/expected/multi_explain_0.out | 81 +-- .../expected/multi_join_order_additional.out | 50 +- .../expected/multi_join_order_tpch_large.out | 28 +- .../expected/multi_join_order_tpch_small.out | 20 +- .../regress/expected/multi_join_pruning.out | 18 +- .../regress/expected/multi_mx_explain.out | 54 +- .../regress/expected/multi_mx_explain_0.out | 54 +- .../expected/multi_mx_reference_table.out | 6 + .../multi_mx_repartition_udt_prepare.out | 9 +- .../expected/multi_mx_router_planner.out | 11 +- .../expected/multi_mx_schema_support.out | 22 + .../expected/multi_partition_pruning.out | 18 +- .../expected/multi_reference_table.out | 6 + .../expected/multi_repartition_udt.out | 9 +- .../regress/expected/multi_router_planner.out | 11 +- .../regress/expected/multi_schema_support.out | 24 + .../expected/multi_task_assignment_policy.out | 42 +- .../expected/multi_utility_statements.out | 6 + .../regress/multi_task_tracker_extra_schedule | 6 +- src/test/regress/output/multi_subquery.source | 15 +- .../regress/output/multi_subquery_0.source | 191 +++---- .../sql/multi_join_order_additional.sql | 2 +- .../sql/multi_join_order_tpch_large.sql | 2 +- .../regress/sql/multi_mx_reference_table.sql | 1 + .../regress/sql/multi_mx_router_planner.sql | 1 + .../regress/sql/multi_mx_schema_support.sql | 4 + .../regress/sql/multi_reference_table.sql | 1 + src/test/regress/sql/multi_router_planner.sql | 1 + src/test/regress/sql/multi_schema_support.sql | 4 + .../regress/sql/multi_utility_statements.sql | 1 + 46 files changed, 1155 insertions(+), 964 deletions(-) diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 502b8e862..dd2c26733 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -24,7 +24,6 @@ #include "distributed/multi_utility.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" -#include "executor/executor.h" #include "commands/copy.h" #include "nodes/makefuncs.h" #include "storage/lmgr.h" @@ -34,212 +33,389 @@ /* - * FIXME: It'd probably be better to have different set of methods for: - * - router readonly queries - * - router modify - * - router insert ... select - * - real-time/task-tracker (no point in seperating those) - * - * I think it's better however to only have one type of CitusScanState, to - * allow to easily share code between routines. + * Define executor methods for the different executor types. */ -static CustomExecMethods CitusCustomExecMethods = { - "CitusScan", - CitusBeginScan, - CitusExecScan, - CitusEndScan, - CitusReScan, -#if (PG_VERSION_NUM >= 90600) - NULL, /* NO EstimateDSMCustomScan callback */ - NULL, /* NO InitializeDSMCustomScan callback */ - NULL, /* NO InitializeWorkerCustomScan callback */ -#endif - NULL, - NULL, - CitusExplainScan +static CustomExecMethods RealTimeCustomExecMethods = { + .CustomName = "RealTimeScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RealTimeExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods TaskTrackerCustomExecMethods = { + .CustomName = "TaskTrackerScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = TaskTrackerExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSingleModifyCustomExecMethods = { + .CustomName = "RouterSingleModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterSingleModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterMultiModifyCustomExecMethods = { + .CustomName = "RouterMultiModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterMultiModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSelectCustomExecMethods = { + .CustomName = "RouterSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RouterSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan }; +/* local function forward declarations */ +static void PrepareMasterJobDirectory(Job *workerJob); +static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +static Relation StubRelation(TupleDesc tupleDescriptor); + + +/* + * RealTimeCreateScan creates the scan state for real-time executor queries. + */ Node * -CitusCreateScan(CustomScan *scan) +RealTimeCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + scanState->executorType = MULTI_EXECUTOR_REAL_TIME; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->customScanState.methods = &CitusCustomExecMethods; scanState->multiPlan = GetMultiPlan(scan); - scanState->executorType = JobExecutorType(scanState->multiPlan); + + scanState->customScanState.methods = &RealTimeCustomExecMethods; return (Node *) scanState; } -void -CitusBeginScan(CustomScanState *node, - EState *estate, - int eflags) +/* + * TaskTrackerCreateScan creates the scan state for task-tracker executor queries. + */ +Node * +TaskTrackerCreateScan(CustomScan *scan) { - CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - Assert(IsA(scanState, CustomScanState)); + scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); - /* ensure plan is executable */ - VerifyMultiPlanValidity(multiPlan); + scanState->customScanState.methods = &TaskTrackerCustomExecMethods; - /* ExecCheckRTPerms(planStatement->rtable, true); */ - - if (scanState->executorType == MULTI_EXECUTOR_ROUTER) - { - RouterBeginScan(scanState); - } + return (Node *) scanState; } -TupleTableSlot * -CitusExecScan(CustomScanState *node) +/* + * RouterCreateScan creates the scan state for router executor queries. + */ +Node * +RouterCreateScan(CustomScan *scan) { - CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + MultiPlan *multiPlan = NULL; + Job *workerJob = NULL; + List *taskList = NIL; + bool isModificationQuery = false; - if (scanState->executorType == MULTI_EXECUTOR_ROUTER) + scanState->executorType = MULTI_EXECUTOR_ROUTER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); + + multiPlan = scanState->multiPlan; + workerJob = multiPlan->workerJob; + taskList = workerJob->taskList; + + isModificationQuery = IsModifyMultiPlan(multiPlan); + + /* check if this is a single shard query */ + if (list_length(taskList) == 1) { - return RouterExecScan(scanState); + if (isModificationQuery) + { + scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods; + } + else + { + scanState->customScanState.methods = &RouterSelectCustomExecMethods; + } } else { - TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + Assert(isModificationQuery); + scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; + } - if (!scanState->finishedUnderlyingScan) + return (Node *) scanState; +} + + +/* + * DelayedErrorCreateScan is only called if we could not plan for the given + * query. This is the case when a plan is not ready for execution because + * CreateDistributedPlan() couldn't find a plan due to unresolved prepared + * statement parameters, but didn't error out, because we expect custom plans + * to come to our rescue. But sql (not plpgsql) functions unfortunately don't + * go through a codepath supporting custom plans. Here, we error out with this + * delayed error message. + */ +Node * +DelayedErrorCreateScan(CustomScan *scan) +{ + MultiPlan *multiPlan = GetMultiPlan(scan); + + /* raise the deferred error */ + RaiseDeferredError(multiPlan->planningError, ERROR); + + return NULL; +} + + +/* + * CitusSelectBeginScan is an empty function for BeginCustomScan callback. + */ +void +CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) +{ + /* just an empty function */ +} + + +/* + * RealTimeExecScan is a callback function which returns next tuple from a real-time + * execution. In the first call, it executes distributed real-time plan and loads + * results from temporary files into custom scan's tuple store. Then, it returns + * tuples one by one from this tuple store. + */ +TupleTableSlot * +RealTimeExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + + PrepareMasterJobDirectory(workerJob); + MultiRealTimeExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * PrepareMasterJobDirectory creates a directory on the master node to keep job + * execution results. We also register this directory for automatic cleanup on + * portal delete. + */ +static void +PrepareMasterJobDirectory(Job *workerJob) +{ + StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); + CreateDirectory(jobDirectoryName); + + ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); + ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); +} + + +/* + * Load data collected by real-time or task-tracker executors into the tuplestore + * of CitusScanState. For that, we first create a tuple store, and then copy the + * files one-by-one into the tuple store. + * + * Note that in the long term it'd be a lot better if Multi*Execute() directly + * filled the tuplestores, but that's a fair bit of work. + */ +static void +LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) +{ + CustomScanState customScanState = citusScanState->customScanState; + List *workerTaskList = workerJob->taskList; + List *copyOptions = NIL; + EState *executorState = NULL; + MemoryContext executorTupleContext = NULL; + ExprContext *executorExpressionContext = NULL; + TupleDesc tupleDescriptor = NULL; + Relation stubRelation = NULL; + ListCell *workerTaskCell = NULL; + uint32 columnCount = 0; + Datum *columnValues = NULL; + bool *columnNulls = NULL; + bool randomAccess = true; + bool interTransactions = false; + + executorState = citusScanState->customScanState.ss.ps.state; + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); + + tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + stubRelation = StubRelation(tupleDescriptor); + + columnCount = tupleDescriptor->natts; + columnValues = palloc0(columnCount * sizeof(Datum)); + columnNulls = palloc0(columnCount * sizeof(bool)); + + Assert(citusScanState->tuplestorestate == NULL); + citusScanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + + if (BinaryMasterCopyFormat) + { + DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary")); + copyOptions = lappend(copyOptions, copyOption); + } + + foreach(workerTaskCell, workerTaskList) + { + Task *workerTask = (Task *) lfirst(workerTaskCell); + StringInfo jobDirectoryName = NULL; + StringInfo taskFilename = NULL; + CopyState copyState = NULL; + + jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); + taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); + + copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, + copyOptions); + + while (true) { - Job *workerJob = multiPlan->workerJob; - StringInfo jobDirectoryName = NULL; - EState *executorState = scanState->customScanState.ss.ps.state; - List *workerTaskList = workerJob->taskList; - ListCell *workerTaskCell = NULL; - TupleDesc tupleDescriptor = NULL; - Relation fakeRel = NULL; - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); - ExprContext *executorExpressionContext = - GetPerTupleExprContext(executorState); - uint32 columnCount = 0; - Datum *columnValues = NULL; - bool *columnNulls = NULL; + MemoryContext oldContext = NULL; + bool nextRowFound = false; - /* - * We create a directory on the master node to keep task execution results. - * We also register this directory for automatic cleanup on portal delete. - */ - jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); - CreateDirectory(jobDirectoryName); + ResetPerTupleExprContext(executorState); + oldContext = MemoryContextSwitchTo(executorTupleContext); - ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); - ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); - - /* pick distributed executor to use */ - if (executorState->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY) + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + if (!nextRowFound) { - /* skip distributed query execution for EXPLAIN commands */ - } - else if (scanState->executorType == MULTI_EXECUTOR_REAL_TIME) - { - MultiRealTimeExecute(workerJob); - } - else if (scanState->executorType == MULTI_EXECUTOR_TASK_TRACKER) - { - MultiTaskTrackerExecute(workerJob); + MemoryContextSwitchTo(oldContext); + break; } - tupleDescriptor = node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; - - /* - * Load data, collected by Multi*Execute() above, into a - * tuplestore. For that first create a tuplestore, and then copy - * the files one-by-one. - * - * FIXME: Should probably be in a separate routine. - * - * Long term it'd be a lot better if Multi*Execute() directly - * filled the tuplestores, but that's a fair bit of work. - */ - - /* - * To be able to use copy.c, we need a Relation descriptor. As - * there's no relation corresponding to the data loaded from - * workers, fake one. We just need the bare minimal set of fields - * accessed by BeginCopyFrom(). - * - * FIXME: should be abstracted into a separate function. - */ - fakeRel = palloc0(sizeof(RelationData)); - fakeRel->rd_att = tupleDescriptor; - fakeRel->rd_rel = palloc0(sizeof(FormData_pg_class)); - fakeRel->rd_rel->relkind = RELKIND_RELATION; - - columnCount = tupleDescriptor->natts; - columnValues = palloc0(columnCount * sizeof(Datum)); - columnNulls = palloc0(columnCount * sizeof(bool)); - - Assert(scanState->tuplestorestate == NULL); - scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); - - foreach(workerTaskCell, workerTaskList) - { - Task *workerTask = (Task *) lfirst(workerTaskCell); - StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); - StringInfo taskFilename = - TaskFilename(jobDirectoryName, workerTask->taskId); - List *copyOptions = NIL; - CopyState copyState = NULL; - - if (BinaryMasterCopyFormat) - { - DefElem *copyOption = makeDefElem("format", - (Node *) makeString("binary")); - copyOptions = lappend(copyOptions, copyOption); - } - copyState = BeginCopyFrom(fakeRel, taskFilename->data, false, NULL, - copyOptions); - - while (true) - { - MemoryContext oldContext = NULL; - bool nextRowFound = false; - - ResetPerTupleExprContext(executorState); - oldContext = MemoryContextSwitchTo(executorTupleContext); - - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - tuplestore_putvalues(scanState->tuplestorestate, - tupleDescriptor, - columnValues, columnNulls); - MemoryContextSwitchTo(oldContext); - } - } - - scanState->finishedUnderlyingScan = true; + tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor, + columnValues, columnNulls); + MemoryContextSwitchTo(oldContext); } - if (scanState->tuplestorestate != NULL) - { - Tuplestorestate *tupleStore = scanState->tuplestorestate; - tuplestore_gettupleslot(tupleStore, true, false, resultSlot); - - return resultSlot; - } - - return NULL; + EndCopyFrom(copyState); } } +/* + * StubRelation creates a stub Relation from the given tuple descriptor. + * To be able to use copy.c, we need a Relation descriptor. As there is no + * relation corresponding to the data loaded from workers, we need to fake one. + * We just need the bare minimal set of fields accessed by BeginCopyFrom(). + */ +static Relation +StubRelation(TupleDesc tupleDescriptor) +{ + Relation stubRelation = palloc0(sizeof(RelationData)); + stubRelation->rd_att = tupleDescriptor; + stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); + stubRelation->rd_rel->relkind = RELKIND_RELATION; + + return stubRelation; +} + + +/* + * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the + * given Citus scan node and returns it. It returns null if all tuples are read + * from the tuple store. + */ +TupleTableSlot * +ReturnTupleFromTuplestore(CitusScanState *scanState) +{ + Tuplestorestate *tupleStore = scanState->tuplestorestate; + TupleTableSlot *resultSlot = NULL; + ScanDirection scanDirection = NoMovementScanDirection; + bool forwardScanDirection = true; + + if (tupleStore == NULL) + { + return NULL; + } + + scanDirection = scanState->customScanState.ss.ps.state->es_direction; + Assert(ScanDirectionIsValid(scanDirection)); + + if (ScanDirectionIsBackward(scanDirection)) + { + forwardScanDirection = false; + } + + resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot); + + return resultSlot; +} + + +/* + * TaskTrackerExecScan is a callback function which returns next tuple from a + * task-tracker execution. In the first call, it executes distributed task-tracker + * plan and loads results from temporary files into custom scan's tuple store. + * Then, it returns tuples one by one from this tuple store. + */ +TupleTableSlot * +TaskTrackerExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + + PrepareMasterJobDirectory(workerJob); + MultiTaskTrackerExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * CitusEndScan is used to clean up tuple store of the given custom scan state. + */ void CitusEndScan(CustomScanState *node) { @@ -253,17 +429,14 @@ CitusEndScan(CustomScanState *node) } +/* + * CitusReScan is just a place holder for rescan callback. Currently, we don't + * support rescan given that there is not any way to reach this code path. + */ void CitusReScan(CustomScanState *node) { - CitusScanState *scanState = (CitusScanState *) node; - - scanState->tuplestorestate = NULL; - scanState->finishedUnderlyingScan = true; - - /* - * XXX: this probably already works, but if not should be easily - * supportable - probably hard to exercise right now though. - */ - elog(WARNING, "unsupported at this point"); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("rescan is unsupported"), + errdetail("We don't expect this code path to be executed."))); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6233b0d90..9a26090f8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -77,26 +77,24 @@ static void ReacquireMetadataLocks(List *taskList); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * GetModifyConnections(List *taskPlacementList, - bool markCritical, +static List * GetModifyConnections(List *taskPlacementList, bool markCritical, bool startedInTransaction); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, - ParamListInfo paramListInfo, - CitusScanState *scanState, - TupleDesc tupleDescriptor); + ParamListInfo paramListInfo, CitusScanState *scanState); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); +static void ProcessMasterEvaluableFunctions(Job *workerJob); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - TupleDesc tupleDescriptor, bool failOnError, int64 *rows); + bool failOnError, int64 *rows); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows); @@ -407,9 +405,14 @@ RequiresConsistentSnapshot(Task *task) } +/* + * CitusModifyBeginScan checks the validity of the given custom scan node and + * gets locks on the shards involved in the task list of the distributed plan. + */ void -RouterBeginScan(CitusScanState *scanState) +CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) { + CitusScanState *scanState = (CitusScanState *) node; MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; @@ -428,77 +431,115 @@ RouterBeginScan(CitusScanState *scanState) } +/* + * RouterSingleModifyExecScan executes a single modification query on a + * distributed plan and returns results if there is any. + */ TupleTableSlot * -RouterExecScan(CitusScanState *scanState) +RouterSingleModifyExecScan(CustomScanState *node) { - MultiPlan *multiPlan = scanState->multiPlan; - TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; - if (!scanState->finishedUnderlyingScan) + if (!scanState->finishedRemoteScan) { + MultiPlan *multiPlan = scanState->multiPlan; + bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; - bool isModificationQuery = false; - CmdType operation = multiPlan->operation; + Task *task = (Task *) linitial(taskList); - /* should use IsModificationStmt or such */ - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) - { - isModificationQuery = true; - } + ProcessMasterEvaluableFunctions(workerJob); - if (requiresMasterEvaluation) - { - Query *jobQuery = workerJob->jobQuery; + ExecuteSingleModifyTask(scanState, task, hasReturning); - ExecuteMasterEvaluableFunctions(jobQuery); - RebuildQueryStrings(jobQuery, taskList); - } - - if (list_length(taskList) == 1) - { - Task *task = (Task *) linitial(taskList); - - if (isModificationQuery) - { - bool sendTuples = multiPlan->hasReturning; - ExecuteSingleModifyTask(scanState, task, sendTuples); - } - else - { - ExecuteSingleSelectTask(scanState, task); - } - } - else - { - bool sendTuples = multiPlan->hasReturning; - ExecuteMultipleTasks(scanState, taskList, isModificationQuery, - sendTuples); - } - - /* mark underlying query as having executed */ - scanState->finishedUnderlyingScan = true; + scanState->finishedRemoteScan = true; } - /* if the underlying query produced output, return it */ + resultSlot = ReturnTupleFromTuplestore(scanState); - /* - * FIXME: centralize this into function to be shared between router and - * other executors? - */ - if (scanState->tuplestorestate != NULL) + return resultSlot; +} + + +/* + * ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds + * the query strings in task lists. + */ +static void +ProcessMasterEvaluableFunctions(Job *workerJob) +{ + if (workerJob->requiresMasterEvaluation) { - Tuplestorestate *tupleStore = scanState->tuplestorestate; + Query *jobQuery = workerJob->jobQuery; + List *taskList = workerJob->taskList; - /* XXX: could trivially support backward scans here */ - tuplestore_gettupleslot(tupleStore, true, false, resultSlot); + ExecuteMasterEvaluableFunctions(jobQuery); + RebuildQueryStrings(jobQuery, taskList); + } +} - return resultSlot; + +/* + * RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves + * the results and, if RETURNING is used, stores them in custom scan's tuple store. + * Then, it returns tuples one by one from this tuple store. + */ +TupleTableSlot * +RouterMultiModifyExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; + bool hasReturning = multiPlan->hasReturning; + bool isModificationQuery = true; + + ProcessMasterEvaluableFunctions(workerJob); + + ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); + + scanState->finishedRemoteScan = true; } - return NULL; + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * RouterSelectExecScan executes a single select task on the remote node, + * retrieves the results and stores them in custom scan's tuple store. Then, it + * returns tuples one by one from this tuple store. + */ +TupleTableSlot * +RouterSelectExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; + Task *task = (Task *) linitial(taskList); + + ProcessMasterEvaluableFunctions(workerJob); + + ExecuteSingleSelectTask(scanState, task); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; } @@ -512,8 +553,6 @@ RouterExecScan(CitusScanState *scanState) static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; ParamListInfo paramListInfo = scanState->customScanState.ss.ps.state->es_param_list_info; List *taskPlacementList = task->taskPlacementList; @@ -547,8 +586,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) continue; } - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - dontFailOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, dontFailOnError, + ¤tAffectedTupleCount); if (queryOK) { return; @@ -569,21 +608,19 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) * framework), or errors out (failed on all placements). */ static void -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults) { CmdType operation = scanState->multiPlan->operation; - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; EState *executorState = scanState->customScanState.ss.ps.state; ParamListInfo paramListInfo = executorState->es_param_list_info; - bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; List *connectionList = NIL; ListCell *taskPlacementCell = NULL; ListCell *connectionCell = NULL; int64 affectedTupleCount = -1; + bool resultsOK = false; bool gotResults = false; + char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); bool startedInTransaction = @@ -669,8 +706,8 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, */ if (!gotResults && expectResults) { - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - failOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, failOnError, + ¤tAffectedTupleCount); } else { @@ -804,8 +841,6 @@ static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; EState *executorState = scanState->customScanState.ss.ps.state; ParamListInfo paramListInfo = executorState->es_param_list_info; int64 affectedTupleCount = -1; @@ -813,9 +848,8 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, /* can only support modifications right now */ Assert(isModificationQuery); - /* XXX: Seems very redundant to pass both scanState and tupleDescriptor */ affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo, - scanState, tupleDescriptor); + scanState); executorState->es_processed = affectedTupleCount; } @@ -831,7 +865,7 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, int64 ExecuteModifyTasksWithoutResults(List *taskList) { - return ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); + return ExecuteModifyTasks(taskList, false, NULL, NULL); } @@ -845,7 +879,7 @@ ExecuteModifyTasksWithoutResults(List *taskList) */ static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, - CitusScanState *scanState, TupleDesc tupleDescriptor) + CitusScanState *scanState) { int64 totalAffectedTupleCount = 0; ListCell *taskCell = NULL; @@ -929,8 +963,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - connection = - (MultiConnection *) list_nth(connectionList, placementIndex); + connection = (MultiConnection *) list_nth(connectionList, placementIndex); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) @@ -975,10 +1008,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn */ if (placementIndex == 0 && expectResults) { - Assert(scanState != NULL && tupleDescriptor != NULL); + Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - failOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, failOnError, + ¤tAffectedTupleCount); } else { @@ -1184,13 +1217,17 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - TupleDesc tupleDescriptor, bool failOnError, int64 *rows) + bool failOnError, int64 *rows) { + TupleDesc tupleDescriptor = + scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); - Tuplestorestate *tupleStore = NULL; List *targetList = scanState->customScanState.ss.ps.plan->targetlist; uint32 expectedColumnCount = ExecCleanTargetListLength(targetList); char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); + Tuplestorestate *tupleStore = NULL; + bool randomAccess = true; + bool interTransactions = false; bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", @@ -1201,7 +1238,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, if (scanState->tuplestorestate == NULL) { - scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + scanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); } else if (!failOnError) { @@ -1403,39 +1441,3 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) return gotResponse && !commandFailed; } - - -/* - * RouterExecutorFinish cleans up after a distributed execution. - */ -void -RouterExecutorFinish(QueryDesc *queryDesc) -{ - EState *estate = queryDesc->estate; - Assert(estate != NULL); - - estate->es_finished = true; -} - - -/* - * RouterExecutorEnd cleans up the executor state after a distributed - * execution. - */ -void -RouterExecutorEnd(QueryDesc *queryDesc) -{ - EState *estate = queryDesc->estate; - MaterialState *routerState = (MaterialState *) queryDesc->planstate; - - if (routerState->tuplestorestate) - { - tuplestore_end(routerState->tuplestorestate); - } - - Assert(estate != NULL); - - FreeExecutorState(estate); - queryDesc->estate = NULL; - queryDesc->totaltime = NULL; -} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 97ef0ee7b..25fa2ab9c 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -84,12 +84,17 @@ static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es); static void ExplainJSONLineEnding(ExplainState *es); static void ExplainYAMLLineStarting(ExplainState *es); + +/* + * CitusExplainScan is a custom scan explain callback function which is used to + * print explain information of a Citus plan which includes both master and + * distributed plan. + */ void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; MultiPlan *multiPlan = scanState->multiPlan; - const char *executorName = NULL; if (!ExplainDistributedQueries) { @@ -99,44 +104,8 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es return; } - /* - * XXX: can we get by without the open/close group somehow - then we'd not - * copy any code from explain.c? Seems unlikely. - */ ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); - /* - * XXX: might be worthwhile to put this somewhere central, e.g. for - * debugging output. - */ - switch (scanState->executorType) - { - case MULTI_EXECUTOR_ROUTER: - { - executorName = "Router"; - } - break; - - case MULTI_EXECUTOR_REAL_TIME: - { - executorName = "Real-Time"; - } - break; - - case MULTI_EXECUTOR_TASK_TRACKER: - { - executorName = "Task-Tracker"; - } - break; - - default: - { - executorName = "Other"; - } - break; - } - ExplainPropertyText("Executor", executorName, es); - ExplainJob(multiPlan->workerJob, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index facff1f9d..6b4be984f 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -15,6 +15,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" @@ -34,7 +35,7 @@ * a target target list for the master node. This master target list keeps the * temporary table's columns on the master node. */ -List * +static List * MasterTargetList(List *workerTargetList) { List *masterTargetList = NIL; @@ -164,67 +165,57 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) /* * BuildSelectStatement builds the final select statement to run on the master - * node, before returning results to the user. The function first builds a scan - * statement for all results fetched to the master, and layers aggregation, sort + * node, before returning results to the user. The function first gets the custom + * scan node for all results fetched to the master, and layers aggregation, sort * and limit plans on top of the scan statement if necessary. */ static PlannedStmt * -BuildSelectStatement(Query *masterQuery, char *masterTableName, - List *masterTargetList, CustomScan *dataScan) +BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan) { PlannedStmt *selectStatement = NULL; - RangeTblEntry *rangeTableEntry = NULL; - RangeTblEntry *queryRangeTableEntry = NULL; + RangeTblEntry *customScanRangeTableEntry = NULL; Agg *aggregationPlan = NULL; Plan *topLevelPlan = NULL; - ListCell *lc = NULL; - List *columnNames = NULL; - - /* (0) compute column names */ - foreach(lc, masterTargetList) - { - TargetEntry *te = lfirst(lc); - columnNames = lappend(columnNames, makeString(te->resname)); - } + ListCell *targetEntryCell = NULL; + List *columnNameList = NULL; /* (1) make PlannedStmt and set basic information */ selectStatement = makeNode(PlannedStmt); selectStatement->canSetTag = true; - selectStatement->relationOids = NIL; /* to be filled in exec_Start */ + selectStatement->relationOids = NIL; selectStatement->commandType = CMD_SELECT; - /* prepare the range table entry for our temporary table */ + /* top level select query should have only one range table entry */ Assert(list_length(masterQuery->rtable) == 1); - queryRangeTableEntry = (RangeTblEntry *) linitial(masterQuery->rtable); - rangeTableEntry = copyObject(queryRangeTableEntry); - rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */ - rangeTableEntry->eref = makeAlias("remote scan", columnNames); - rangeTableEntry->inh = false; - rangeTableEntry->inFromCl = true; + /* compute column names for the custom range table entry */ + foreach(targetEntryCell, masterTargetList) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + columnNameList = lappend(columnNameList, makeString(targetEntry->resname)); + } + + customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); /* set the single element range table list */ - selectStatement->rtable = list_make1(rangeTableEntry); + selectStatement->rtable = list_make1(customScanRangeTableEntry); - /* (2) build and initialize sequential scan node */ - /* Gone */ - - /* (3) add an aggregation plan if needed */ + /* (2) add an aggregation plan if needed */ if (masterQuery->hasAggs || masterQuery->groupClause) { - dataScan->scan.plan.targetlist = masterTargetList; + remoteScan->scan.plan.targetlist = masterTargetList; - aggregationPlan = BuildAggregatePlan(masterQuery, &dataScan->scan.plan); + aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan); topLevelPlan = (Plan *) aggregationPlan; } else { /* otherwise set the final projections on the scan plan directly */ - dataScan->scan.plan.targetlist = masterQuery->targetList; - topLevelPlan = &dataScan->scan.plan; + remoteScan->scan.plan.targetlist = masterQuery->targetList; + topLevelPlan = &remoteScan->scan.plan; } - /* (4) add a sorting plan if needed */ + /* (3) add a sorting plan if needed */ if (masterQuery->sortClause) { List *sortClauseList = masterQuery->sortClause; @@ -242,7 +233,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, topLevelPlan = (Plan *) sortPlan; } - /* (5) add a limit plan if needed */ + /* (4) add a limit plan if needed */ if (masterQuery->limitCount || masterQuery->limitOffset) { Node *limitCount = masterQuery->limitCount; @@ -259,7 +250,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, topLevelPlan = (Plan *) limitPlan; } - /* (6) finally set our top level plan in the plan tree */ + /* (5) finally set our top level plan in the plan tree */ selectStatement->planTree = topLevelPlan; return selectStatement; @@ -267,24 +258,24 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, /* - * MasterNodeSelectPlan takes in a distributed plan, finds the master node query - * structure in that plan, and builds the final select plan to execute on the - * master node. Note that this select plan is executed after result files are - * retrieved from worker nodes and are merged into a temporary table. + * MasterNodeSelectPlan takes in a distributed plan and a custom scan node which + * wraps remote part of the plan. This function finds the master node query + * structure in the multi plan, and builds the final select plan to execute on + * the tuples returned by remote scan on the master node. Note that this select + * plan is executed after result files are retrieved from worker nodes and + * filled into the tuple store inside provided custom scan. */ PlannedStmt * -MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan) +MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan) { Query *masterQuery = multiPlan->masterQuery; - char *tableName = multiPlan->masterTableName; PlannedStmt *masterSelectPlan = NULL; Job *workerJob = multiPlan->workerJob; List *workerTargetList = workerJob->jobQuery->targetList; List *masterTargetList = MasterTargetList(workerTargetList); - masterSelectPlan = - BuildSelectStatement(masterQuery, tableName, masterTargetList, dataScan); + masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan); return masterSelectPlan; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a4a87c372..a33808d5b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -196,9 +196,7 @@ MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) { MultiPlan *multiPlan = NULL; - StringInfo jobSchemaName = NULL; Job *workerJob = NULL; - uint64 workerJobId = 0; Query *masterQuery = NULL; List *masterDependedJobList = NIL; @@ -207,10 +205,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) /* create the tree of executable tasks for the worker job */ workerJob = BuildJobTreeTaskList(workerJob); - workerJobId = workerJob->jobId; - - /* get job schema name */ - jobSchemaName = JobSchemaName(workerJobId); /* build the final merge query to execute on the master */ masterDependedJobList = list_make1(workerJob); @@ -219,7 +213,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) multiPlan = CitusMakeNode(MultiPlan); multiPlan->workerJob = workerJob; multiPlan->masterQuery = masterQuery; - multiPlan->masterTableName = jobSchemaName->data; multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan); multiPlan->operation = CMD_SELECT; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index bc0c1335f..7839c7cfc 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -23,30 +23,48 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" - #include "executor/executor.h" - #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" - #include "optimizer/planner.h" - #include "utils/memutils.h" static List *relationRestrictionContextList = NIL; +/* create custom scan methods for separate executors */ +static CustomScanMethods RealTimeCustomScanMethods = { + "Citus Real-Time", + RealTimeCreateScan +}; + +static CustomScanMethods TaskTrackerCustomScanMethods = { + "Citus Task-Tracker", + TaskTrackerCreateScan +}; + +static CustomScanMethods RouterCustomScanMethods = { + "Citus Router", + RouterCreateScan +}; + +static CustomScanMethods DelayedErrorCustomScanMethods = { + "Citus Delayed Error", + DelayedErrorCreateScan +}; + /* local function forward declarations */ +static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, + Query *query, ParamListInfo boundParams, + RelationRestrictionContext *restrictionContext); +static Node * SerializeMultiPlan(struct MultiPlan *multiPlan); +static MultiPlan * DeserializeMultiPlan(Node *node); +static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); +static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, + CustomScan *customScan); +static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static void CheckNodeIsDumpable(Node *node); -static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, - struct MultiPlan *multiPlan); -static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, - Query *originalQuery, - Query *query, - ParamListInfo boundParams, - RelationRestrictionContext * - restrictionContext); static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void); static void PopRestrictionContext(void); @@ -144,22 +162,21 @@ IsModifyCommand(Query *query) /* - * VerifyMultiPlanValidity verifies that multiPlan is ready for execution, or - * errors out if not. - * - * A plan may e.g. not be ready for execution because CreateDistributedPlan() - * couldn't find a plan due to unresolved prepared statement parameters, but - * didn't error out, because we expect custom plans to come to our rescue. - * But sql (not plpgsql) functions unfortunately don't go through a codepath - * supporting custom plans. + * IsModifyMultiPlan returns true if the multi plan performs modifications, + * false otherwise. */ -void -VerifyMultiPlanValidity(MultiPlan *multiPlan) +bool +IsModifyMultiPlan(MultiPlan *multiPlan) { - if (multiPlan->planningError) + bool isModifyMultiPlan = false; + CmdType operation = multiPlan->operation; + + if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) { - RaiseDeferredError(multiPlan->planningError, ERROR); + isModifyMultiPlan = true; } + + return isModifyMultiPlan; } @@ -274,8 +291,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query RaiseDeferredError(distributedPlan->planningError, ERROR); } - /* store required data into the planned statement */ - resultPlan = MultiQueryContainerNode(localPlan, distributedPlan); + /* create final plan by combining local plan with distributed plan */ + resultPlan = FinalizePlan(localPlan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -294,12 +311,6 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } -static CustomScanMethods CitusCustomScanMethods = { - "CitusScan", - CitusCreateScan -}; - - /* * GetMultiPlan returns the associated MultiPlan for a CustomScan. */ @@ -308,41 +319,34 @@ GetMultiPlan(CustomScan *customScan) { MultiPlan *multiPlan = NULL; - Assert(IsA(customScan, CustomScan)); - Assert(customScan->methods == &CitusCustomScanMethods); Assert(list_length(customScan->custom_private) == 1); - multiPlan = DeSerializeMultiPlan(linitial(customScan->custom_private)); + multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private)); return multiPlan; } -/* Does the passed in statement require distributed execution? */ -bool -HasCitusToplevelNode(PlannedStmt *result) +/* + * SerializeMultiPlan returns the string representing the distributed plan in a + * Const node. + * + * Note that this should be improved for 9.6+, we we can copy trees efficiently. + * I.e. we should introduce copy support for relevant node types, and just + * return the MultiPlan as-is for 9.6. + */ +static Node * +SerializeMultiPlan(MultiPlan *multiPlan) { - elog(ERROR, "gone"); -} - - -Node * -SerializableMultiPlan(MultiPlan *multiPlan) -{ - /* - * FIXME: This should be improved for 9.6+, we we can copy trees - * efficiently. I.e. we should introduce copy support for relevant node - * types, and just return the MultiPlan as-is for 9.6. - */ - char *serializedPlan = NULL; + char *serializedMultiPlan = NULL; Const *multiPlanData = NULL; - serializedPlan = CitusNodeToString(multiPlan); + serializedMultiPlan = CitusNodeToString(multiPlan); multiPlanData = makeNode(Const); multiPlanData->consttype = CSTRINGOID; - multiPlanData->constlen = strlen(serializedPlan); - multiPlanData->constvalue = CStringGetDatum(serializedPlan); + multiPlanData->constlen = strlen(serializedMultiPlan); + multiPlanData->constvalue = CStringGetDatum(serializedMultiPlan); multiPlanData->constbyval = false; multiPlanData->location = -1; @@ -350,8 +354,12 @@ SerializableMultiPlan(MultiPlan *multiPlan) } -MultiPlan * -DeSerializeMultiPlan(Node *node) +/* + * DeserializeMultiPlan returns the deserialized distributed plan from the string + * representation in a Const node. + */ +static MultiPlan * +DeserializeMultiPlan(Node *node) { Const *multiPlanData = NULL; char *serializedMultiPlan = NULL; @@ -369,107 +377,171 @@ DeSerializeMultiPlan(Node *node) /* - * CreateCitusToplevelNode creates the top-level planTree node for a - * distributed statement. That top-level node is a) recognizable by the - * executor hooks, allowing them to redirect execution, b) contains the - * parameters required for distributed execution. - * - * The exact representation of the top-level node is an implementation detail - * which should not be referred to outside this file, as it's likely to become - * version dependant. Use GetMultiPlan() and HasCitusToplevelNode() to access. - * - * FIXME - * - * Internally the data is stored as arguments to a 'citus_extradata_container' - * function, which has to be removed from the really executed plan tree before - * query execution. + * FinalizePlan combines local plan with distributed plan and creates a plan + * which can be run by the PostgreSQL executor. */ -PlannedStmt * -MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan) +static PlannedStmt * +FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) { - PlannedStmt *resultPlan = NULL; + PlannedStmt *finalPlan = NULL; CustomScan *customScan = makeNode(CustomScan); - Node *multiPlanData = SerializableMultiPlan(multiPlan); + Node *multiPlanData = NULL; + MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; - customScan->methods = &CitusCustomScanMethods; - customScan->custom_private = list_make1(multiPlanData); - - /* FIXME: This probably ain't correct */ - if (ExecSupportsBackwardScan(originalPlan->planTree)) + if (!multiPlan->planningError) { - customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; + executorType = JobExecutorType(multiPlan); } - /* - * FIXME: these two branches/pieces of code should probably be moved into - * router / logical planner code respectively. - */ + switch (executorType) + { + case MULTI_EXECUTOR_REAL_TIME: + { + customScan->methods = &RealTimeCustomScanMethods; + break; + } + + case MULTI_EXECUTOR_TASK_TRACKER: + { + customScan->methods = &TaskTrackerCustomScanMethods; + break; + } + + case MULTI_EXECUTOR_ROUTER: + { + customScan->methods = &RouterCustomScanMethods; + break; + } + + default: + { + customScan->methods = &DelayedErrorCustomScanMethods; + break; + } + } + + multiPlanData = SerializeMultiPlan(multiPlan); + + customScan->custom_private = list_make1(multiPlanData); + customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; + + /* check if we have a master query */ if (multiPlan->masterQuery) { - resultPlan = MasterNodeSelectPlan(multiPlan, customScan); - resultPlan->queryId = originalPlan->queryId; - resultPlan->utilityStmt = originalPlan->utilityStmt; + finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); } else { - ListCell *lc = NULL; - List *targetList = NIL; - bool foundJunk = false; - RangeTblEntry *rangeTableEntry = NULL; - List *columnNames = NIL; - int newRTI = list_length(originalPlan->rtable) + 1; - - /* - * XXX: This basically just builds a targetlist to "read" from the - * custom scan output. - */ - foreach(lc, originalPlan->planTree->targetlist) - { - TargetEntry *te = lfirst(lc); - Var *newVar = NULL; - TargetEntry *newTargetEntry = NULL; - - Assert(IsA(te, TargetEntry)); - - /* - * XXX: I can't think of a case where we'd need resjunk stuff at - * the toplevel of a router query - all things needing it have - * been pushed down. - */ - if (te->resjunk) - { - foundJunk = true; - continue; - } - - if (foundJunk) - { - ereport(ERROR, (errmsg("unexpected !junk entry after resjunk entry"))); - } - - /* build TE pointing to custom scan */ - newVar = makeVarFromTargetEntry(newRTI, te); - newTargetEntry = flatCopyTargetEntry(te); - newTargetEntry->expr = (Expr *) newVar; - targetList = lappend(targetList, newTargetEntry); - - columnNames = lappend(columnNames, makeString(te->resname)); - } - - /* XXX: can't think of a better RTE type than VALUES */ - rangeTableEntry = makeNode(RangeTblEntry); - rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */ - rangeTableEntry->eref = makeAlias("remote_scan", columnNames); - rangeTableEntry->inh = false; - rangeTableEntry->inFromCl = true; - - resultPlan = originalPlan; - resultPlan->planTree = (Plan *) customScan; - resultPlan->rtable = lappend(resultPlan->rtable, rangeTableEntry); - customScan->scan.plan.targetlist = targetList; + finalPlan = FinalizeRouterPlan(localPlan, customScan); } - return resultPlan; + return finalPlan; +} + + +/* + * FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the + * final master select plan on the top of this distributed plan for real-time + * and task-tracker executors. + */ +static PlannedStmt * +FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, + CustomScan *customScan) +{ + PlannedStmt *finalPlan = NULL; + + finalPlan = MasterNodeSelectPlan(multiPlan, customScan); + finalPlan->queryId = localPlan->queryId; + finalPlan->utilityStmt = localPlan->utilityStmt; + + return finalPlan; +} + + +/* + * FinalizeRouterPlan gets a CustomScan node which already wrapped distributed + * part of a router plan and sets it as the direct child of the router plan + * because we don't run any query on master node for router executable queries. + * Here, we also rebuild the column list to read from the remote scan. + */ +static PlannedStmt * +FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) +{ + PlannedStmt *routerPlan = NULL; + RangeTblEntry *remoteScanRangeTableEntry = NULL; + ListCell *targetEntryCell = NULL; + List *targetList = NIL; + List *columnNameList = NIL; + + /* we will have only one range table entry */ + int customScanRangeTableIndex = 1; + + /* build a targetlist to read from the custom scan output */ + foreach(targetEntryCell, localPlan->planTree->targetlist) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + TargetEntry *newTargetEntry = NULL; + Var *newVar = NULL; + Value *columnName = NULL; + + Assert(IsA(targetEntry, TargetEntry)); + + /* + * This is unlikely to be hit because we would not need resjunk stuff + * at the toplevel of a router query - all things needing it have been + * pushed down. + */ + if (targetEntry->resjunk) + { + continue; + } + + /* build target entry pointing to remote scan range table entry */ + newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry); + newTargetEntry = flatCopyTargetEntry(targetEntry); + newTargetEntry->expr = (Expr *) newVar; + targetList = lappend(targetList, newTargetEntry); + + columnName = makeString(targetEntry->resname); + columnNameList = lappend(columnNameList, columnName); + } + + customScan->scan.plan.targetlist = targetList; + + routerPlan = makeNode(PlannedStmt); + routerPlan->planTree = (Plan *) customScan; + + remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); + routerPlan->rtable = list_make1(remoteScanRangeTableEntry); + + routerPlan->canSetTag = true; + routerPlan->relationOids = NIL; + + routerPlan->queryId = localPlan->queryId; + routerPlan->utilityStmt = localPlan->utilityStmt; + routerPlan->commandType = localPlan->commandType; + routerPlan->hasReturning = localPlan->hasReturning; + + return routerPlan; +} + + +/* + * RemoteScanRangeTableEntry creates a range table entry from given column name + * list to represent a remote scan. + */ +RangeTblEntry * +RemoteScanRangeTableEntry(List *columnNameList) +{ + RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry); + + /* we use RTE_VALUES for custom scan because we can't look up relation */ + remoteScanRangeTableEntry->rtekind = RTE_VALUES; + remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList); + remoteScanRangeTableEntry->inh = false; + remoteScanRangeTableEntry->inFromCl = true; + + return remoteScanRangeTableEntry; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 11212ec81..39bf01782 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -236,9 +236,13 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, multiPlan->workerJob = job; multiPlan->masterQuery = NULL; - multiPlan->masterTableName = NULL; multiPlan->routerExecutable = true; - multiPlan->hasReturning = list_length(originalQuery->returningList) > 0; + multiPlan->hasReturning = false; + + if (list_length(originalQuery->returningList) > 0) + { + multiPlan->hasReturning = true; + } return multiPlan; } @@ -321,10 +325,14 @@ CreateInsertSelectRouterPlan(Query *originalQuery, /* and finally the multi plan */ multiPlan->workerJob = workerJob; - multiPlan->masterTableName = NULL; multiPlan->masterQuery = NULL; multiPlan->routerExecutable = true; - multiPlan->hasReturning = list_length(originalQuery->returningList) > 0; + multiPlan->hasReturning = false; + + if (list_length(originalQuery->returningList) > 0) + { + multiPlan->hasReturning = true; + } return multiPlan; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 05f92f486..689296b08 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -114,9 +114,7 @@ _PG_init(void) * (thus as the innermost/last running hook) to be able to do our * duties. For simplicity insist that all hooks are previously unused. */ - if (planner_hook != NULL || - ExplainOneQuery_hook != NULL || - ProcessUtility_hook != NULL) + if (planner_hook != NULL || ProcessUtility_hook != NULL) { ereport(ERROR, (errmsg("Citus has to be loaded first"), errhint("Place citus at the beginning of " diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 8da52e506..8aa485857 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -281,7 +281,6 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); - WRITE_STRING_FIELD(masterTableName); WRITE_BOOL_FIELD(routerExecutable); WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index fca056e4e..37c9245fa 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -188,7 +188,6 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); - READ_STRING_FIELD(masterTableName); READ_BOOL_FIELD(routerExecutable); READ_NODE_FIELD(planningError); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 53fc327b9..e3b53a327 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -17,9 +17,6 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" -/* signal currently executed statement is a master select statement or router execution */ -#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100 -#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200 #if (PG_VERSION_NUM >= 90600) #define tuplecount_t uint64 @@ -30,23 +27,26 @@ typedef struct CitusScanState { - CustomScanState customScanState; - MultiPlan *multiPlan; - MultiExecutorType executorType; - - /* state for router */ - bool finishedUnderlyingScan; - Tuplestorestate *tuplestorestate; + CustomScanState customScanState; /* underlying custom scan node */ + MultiPlan *multiPlan; /* distributed execution plan */ + MultiExecutorType executorType; /* distributed executor type */ + bool finishedRemoteScan; /* flag to check if remote scan is finished */ + Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ } CitusScanState; -Node * CitusCreateScan(CustomScan *scan); -extern void CitusBeginScan(CustomScanState *node, - EState *estate, - int eflags); -extern TupleTableSlot * CitusExecScan(CustomScanState *node); + +extern Node * RealTimeCreateScan(CustomScan *scan); +extern Node * TaskTrackerCreateScan(CustomScan *scan); +extern Node * RouterCreateScan(CustomScan *scan); +extern Node * DelayedErrorCreateScan(CustomScan *scan); +extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); +extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); +extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); extern void CitusEndScan(CustomScanState *node); extern void CitusReScan(CustomScanState *node); -extern void CitusExplainScan(CustomScanState *node, List *ancestors, - struct ExplainState *es); +extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct + ExplainState *es); +extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); + #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_master_planner.h b/src/include/distributed/multi_master_planner.h index c58b42717..9873ce4ff 100644 --- a/src/include/distributed/multi_master_planner.h +++ b/src/include/distributed/multi_master_planner.h @@ -24,6 +24,6 @@ struct MultiPlan; struct CustomScan; extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan, struct CustomScan *dataScan); -extern List * MasterTargetList(List *workerTargetList); + #endif /* MULTI_MASTER_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ccf3b44a2..0cf340899 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,13 +213,11 @@ typedef struct JoinSequenceNode typedef struct MultiPlan { CitusNode type; - CmdType operation; - bool hasReturning; + bool hasReturning; Job *workerJob; Query *masterQuery; - char *masterTableName; bool routerExecutable; /* diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index e2f255082..8a24b2efa 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -51,14 +51,13 @@ typedef struct RelationShard extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); -extern bool HasCitusToplevelNode(PlannedStmt *planStatement); struct MultiPlan; extern struct MultiPlan * GetMultiPlan(CustomScan *node); -extern Node * SerializableMultiPlan(struct MultiPlan *multiPlan); -extern struct MultiPlan * DeSerializeMultiPlan(Node *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); extern bool IsModifyCommand(Query *query); -extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); +extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); +extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); + #endif /* MULTI_PLANNER_H */ diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b42f6e002..8c9eafb7d 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -35,15 +35,12 @@ typedef struct XactShardConnSet extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; -extern void RouterBeginScan(CitusScanState *scanState); - -extern TupleTableSlot * RouterExecScan(CitusScanState *scanState); - -extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList); -extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); -extern void RouterExecutorFinish(QueryDesc *queryDesc); -extern void RouterExecutorEnd(QueryDesc *queryDesc); +extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); +extern TupleTableSlot * RouterSingleModifyExecScan(CustomScanState *node); +extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); +extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); + #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index cffad3c26..e2ef27342 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -43,8 +43,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -74,10 +73,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Parallel Aware": false, "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 8, "Tasks Shown": "One of 8", @@ -150,10 +148,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time false - Real-Time 8 One of 8 @@ -221,10 +218,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Parallel Aware: false Distributed Query: - Executor: "Real-Time" Job: Task Count: 8 Tasks Shown: "One of 8" @@ -253,8 +249,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -267,9 +262,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -286,8 +280,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -304,8 +297,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -317,8 +309,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -333,8 +324,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -348,8 +338,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -368,8 +357,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -381,10 +369,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING sum(l_quantity) > 100; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - Filter: (sum("remote scan".worker_column_4) > '100'::numeric) - -> Custom Scan (CitusScan) + Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2", worker_column_4 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -400,11 +387,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING l_quantity > (100 * random()); HashAggregate Output: l_quantity - Group Key: "remote scan".l_quantity - Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random())) - -> Custom Scan (CitusScan) + Group Key: remote_scan.l_quantity + Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) + -> Custom Scan (Citus Real-Time) Output: l_quantity, worker_column_2 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -419,8 +405,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: All -> Task @@ -455,8 +440,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -473,8 +457,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 1 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -500,10 +483,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Parallel Aware": false, "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 1, "Tasks Shown": "None, not supported for re-partition queries", @@ -550,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker false - Task-Tracker 1 None, not supported for re-partition queries @@ -610,10 +591,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Parallel Aware: false Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 1 Tasks Shown: "None, not supported for re-partition queries" @@ -639,8 +619,7 @@ Finalize Aggregate -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -652,8 +631,7 @@ PREPARE task_tracker_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -664,8 +642,7 @@ Aggregate SET citus.task_executor_type TO 'real-time'; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task @@ -678,8 +655,7 @@ PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -691,8 +667,7 @@ Aggregate -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index be57df725..af839b514 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -43,8 +43,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -71,9 +70,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 8, "Tasks Shown": "One of 8", @@ -140,9 +138,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time - Real-Time 8 One of 8 @@ -204,9 +201,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Distributed Query: - Executor: "Real-Time" Job: Task Count: 8 Tasks Shown: "One of 8" @@ -232,8 +228,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -246,9 +241,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -265,8 +259,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -283,8 +276,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -296,8 +288,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -312,8 +303,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -327,8 +317,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -347,8 +336,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -360,10 +348,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING sum(l_quantity) > 100; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - Filter: (sum("remote scan".worker_column_4) > '100'::numeric) - -> Custom Scan (CitusScan) + Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2", worker_column_4 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -379,11 +366,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING l_quantity > (100 * random()); HashAggregate Output: l_quantity - Group Key: "remote scan".l_quantity - Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random())) - -> Custom Scan (CitusScan) + Group Key: remote_scan.l_quantity + Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) + -> Custom Scan (Citus Real-Time) Output: l_quantity, worker_column_2 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -398,8 +384,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: All -> Task @@ -434,8 +419,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -452,8 +436,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 1 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -477,9 +460,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 1, "Tasks Shown": "None, not supported for re-partition queries", @@ -524,9 +506,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker - Task-Tracker 1 None, not supported for re-partition queries @@ -581,9 +562,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 1 Tasks Shown: "None, not supported for re-partition queries" @@ -610,8 +590,7 @@ Aggregate -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -623,8 +602,7 @@ PREPARE task_tracker_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -635,8 +613,7 @@ Aggregate SET citus.task_executor_type TO 'real-time'; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task @@ -649,8 +626,7 @@ PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -662,8 +638,7 @@ Aggregate -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 84b3d55e3..da8ddd2f5 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000; -- Set configuration to print table join order and pruned shards SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO DEBUG2; -- Create new table definitions for use in testing in distributed planning and -- execution functionality. Also create indexes to boost performance. @@ -140,9 +140,9 @@ DEBUG: join prunable for intervals [13473,14947] and [2951,4455] DEBUG: join prunable for intervals [13473,14947] and [4480,5986] DEBUG: join prunable for intervals [13473,14947] and [8997,10560] DEBUG: join prunable for intervals [13473,14947] and [10560,12036] - QUERY PLAN --------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (2 rows) @@ -156,10 +156,10 @@ EXPLAIN SELECT count(*) FROM lineitem, orders WHERE (l_orderkey = o_orderkey AND l_quantity > 5) OR (l_orderkey = o_orderkey AND l_quantity < 10); LOG: join order: [ "lineitem" ][ local partition join "orders" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -177,10 +177,10 @@ BEGIN; EXPLAIN SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -188,10 +188,10 @@ LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -199,10 +199,10 @@ LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] EXPLAIN SELECT count(*) FROM customer_hash, nation WHERE c_nationkey = n_nationkey; LOG: join order: [ "customer_hash" ][ broadcast join "nation" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -213,10 +213,10 @@ SET citus.large_table_shard_count TO 1; EXPLAIN SELECT count(*) FROM orders, lineitem, customer WHERE o_custkey = l_partkey AND o_custkey = c_nationkey; LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -225,10 +225,10 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition EXPLAIN SELECT count(*) FROM orders, customer_hash WHERE c_custkey = o_custkey; LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -237,10 +237,10 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] EXPLAIN SELECT count(*) FROM orders_hash, customer WHERE c_custkey = o_custkey; LOG: join order: [ "orders_hash" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_large.out b/src/test/regress/expected/multi_join_order_tpch_large.out index 8fe01093c..9489b567e 100644 --- a/src/test/regress/expected/multi_join_order_tpch_large.out +++ b/src/test/regress/expected/multi_join_order_tpch_large.out @@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000; -- Enable configuration to print table join order SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO LOG; -- Change configuration to treat lineitem, orders, customer, and part tables as -- large. The following queries are basically the same as the ones in tpch_small @@ -24,10 +24,10 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -55,13 +55,13 @@ ORDER BY revenue DESC, o_orderdate; LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: sum((sum(revenue))) DESC, o_orderdate -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_orderkey, o_orderdate, o_shippriority - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -104,7 +104,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partit Sort Key: sum((sum(revenue))) DESC -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -139,10 +139,10 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ single partition join "part" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -158,11 +158,11 @@ WHERE GROUP BY l_partkey; LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_partkey - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (4 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_small.out b/src/test/regress/expected/multi_join_order_tpch_small.out index 66bcca2f4..c0466b1e3 100644 --- a/src/test/regress/expected/multi_join_order_tpch_small.out +++ b/src/test/regress/expected/multi_join_order_tpch_small.out @@ -18,10 +18,10 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -49,13 +49,13 @@ ORDER BY revenue DESC, o_orderdate; LOG: join order: [ "orders" ][ broadcast join "customer" ][ local partition join "lineitem" ] - QUERY PLAN --------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: sum((sum(revenue))) DESC, o_orderdate -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_orderkey, o_orderdate, o_shippriority - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -98,7 +98,7 @@ LOG: join order: [ "orders" ][ broadcast join "customer" ][ broadcast join "nat Sort Key: sum((sum(revenue))) DESC -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -133,10 +133,10 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ broadcast join "part" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 60caa5865..e04799723 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -100,10 +100,10 @@ EXPLAIN SELECT count(*) WHERE table1.array_column = table2.array_column; DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -112,10 +112,10 @@ EXPLAIN SELECT count(*) WHERE table1.composite_column = table2.composite_column; DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -125,10 +125,10 @@ EXPLAIN SELECT count(*) WHERE table1.varchar_column = table2.varchar_column; DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_mx_explain.out b/src/test/regress/expected/multi_mx_explain.out index 75a3c880d..02dfbee99 100644 --- a/src/test/regress/expected/multi_mx_explain.out +++ b/src/test/regress/expected/multi_mx_explain.out @@ -65,8 +65,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -96,10 +95,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Parallel Aware": false, "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 16, "Tasks Shown": "One of 16", @@ -173,10 +171,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time false - Real-Time 16 One of 16 @@ -244,10 +241,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Parallel Aware: false Distributed Query: - Executor: "Real-Time" Job: Task Count: 16 Tasks Shown: "One of 16" @@ -276,8 +272,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -291,9 +286,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 16 Tasks Shown: One of 16 -> Task @@ -310,8 +304,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -328,8 +321,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem_mx VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -341,8 +333,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem_mx SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -355,8 +346,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem_mx WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -368,8 +358,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -388,8 +377,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem_mx; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -400,8 +388,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: All -> Task @@ -496,8 +483,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -514,8 +500,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -547,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Parallel Aware": false, "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 4, "Tasks Shown": "None, not supported for re-partition queries", @@ -605,10 +589,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker false - Task-Tracker 4 None, not supported for re-partition queries @@ -660,10 +643,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Parallel Aware: false Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 4 Tasks Shown: "None, not supported for re-partition queries" diff --git a/src/test/regress/expected/multi_mx_explain_0.out b/src/test/regress/expected/multi_mx_explain_0.out index 07f2c51b2..0a74d3001 100644 --- a/src/test/regress/expected/multi_mx_explain_0.out +++ b/src/test/regress/expected/multi_mx_explain_0.out @@ -65,8 +65,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -93,9 +92,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 16, "Tasks Shown": "One of 16", @@ -163,9 +161,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time - Real-Time 16 One of 16 @@ -227,9 +224,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Distributed Query: - Executor: "Real-Time" Job: Task Count: 16 Tasks Shown: "One of 16" @@ -255,8 +251,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -270,9 +265,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 16 Tasks Shown: One of 16 -> Task @@ -289,8 +283,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -307,8 +300,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem_mx VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -320,8 +312,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem_mx SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -334,8 +325,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem_mx WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -347,8 +337,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -367,8 +356,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem_mx; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -379,8 +367,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: All -> Task @@ -475,8 +462,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -493,8 +479,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -524,9 +509,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 4, "Tasks Shown": "None, not supported for re-partition queries", @@ -579,9 +563,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker - Task-Tracker 4 None, not supported for re-partition queries @@ -631,9 +614,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 4 Tasks Shown: "None, not supported for re-partition queries" diff --git a/src/test/regress/expected/multi_mx_reference_table.out b/src/test/regress/expected/multi_mx_reference_table.out index 34c39f1b0..0aecc94f6 100644 --- a/src/test/regress/expected/multi_mx_reference_table.out +++ b/src/test/regress/expected/multi_mx_reference_table.out @@ -525,6 +525,12 @@ FETCH test_cursor; -- fetch one row after the last ---------+---------+---------+--------- (0 rows) +FETCH BACKWARD test_cursor; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 2 | 2 | 2 | Fri Dec 02 00:00:00 2016 +(1 row) + END; -- table creation queries inside can be router plannable CREATE TEMP TABLE temp_reference_test as diff --git a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out index 401f74d12..3ccf84eaf 100644 --- a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out +++ b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out @@ -173,10 +173,9 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN -------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Task-Tracker + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -185,7 +184,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot -> MapMergeJob Map Task Count: 5 Merge Task Count: 4 -(10 rows) +(9 rows) SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 8179be5eb..9c6b09cf2 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -1312,6 +1312,12 @@ FETCH test_cursor; 11 | 1 | alamo | 1347 (1 row) +FETCH BACKWARD test_cursor; + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 +(1 row) + END; -- queries inside copy can be router plannable COPY ( @@ -1454,7 +1460,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY DEBUG: Plan is router executable -CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash_mx ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY id | word_count ----+------------ 1 | 9572 diff --git a/src/test/regress/expected/multi_mx_schema_support.out b/src/test/regress/expected/multi_mx_schema_support.out index ed171c8c7..5035dc96f 100644 --- a/src/test/regress/expected/multi_mx_schema_support.out +++ b/src/test/regress/expected/multi_mx_schema_support.out @@ -37,6 +37,17 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+--------+-------------+----------- +(0 rows) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test with search_path is set SET search_path TO citus_mx_test_schema; @@ -51,6 +62,17 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+--------+-------------+----------- +(0 rows) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test inserting to table in different schema SET search_path TO public; diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index 28ea9dc13..65034ae48 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -170,30 +170,30 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, -- one shard. EXPLAIN SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2'; DEBUG: predicate pruning for shardId 100 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) EXPLAIN SELECT count(*) FROM array_partitioned_table WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}'; DEBUG: predicate pruning for shardId 102 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) EXPLAIN SELECT count(*) FROM composite_partitioned_table WHERE composite_column < '(b,5,c)'::composite_type; DEBUG: predicate pruning for shardId 105 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 6544b9f40..2e363623d 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -571,6 +571,12 @@ FETCH test_cursor; -- fetch one row after the last ---------+---------+---------+--------- (0 rows) +FETCH BACKWARD test_cursor; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 2 | 2 | 2 | Fri Dec 02 00:00:00 2016 +(1 row) + END; -- table creation queries inside can be router plannable CREATE TEMP TABLE temp_reference_test as diff --git a/src/test/regress/expected/multi_repartition_udt.out b/src/test/regress/expected/multi_repartition_udt.out index 585d09c27..6deeefddb 100644 --- a/src/test/regress/expected/multi_repartition_udt.out +++ b/src/test/regress/expected/multi_repartition_udt.out @@ -181,10 +181,9 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN -------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Task-Tracker + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -193,7 +192,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot -> MapMergeJob Map Task Count: 5 Merge Task Count: 4 -(10 rows) +(9 rows) SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 04e82a6b4..63fe2e7ba 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -2057,6 +2057,12 @@ FETCH test_cursor; -- fetch one row after the last ----+-----------+-------+------------ (0 rows) +FETCH BACKWARD test_cursor; + id | author_id | title | word_count +----+-----------+----------+------------ + 41 | 1 | aznavour | 11814 +(1 row) + END; -- queries inside copy can be router plannable COPY ( @@ -2199,7 +2205,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY DEBUG: Plan is router executable -CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY id | word_count ----+------------ 1 | 9572 diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 97c582ceb..b476242b3 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -155,6 +155,18 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test with search_path is set SET search_path TO test_schema_support; @@ -169,6 +181,18 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test inserting to table in different schema SET search_path TO public; diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index fcca06718..d5f4fdc95 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -60,10 +60,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -74,10 +74,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -93,10 +93,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -107,10 +107,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -140,10 +140,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -154,10 +154,10 @@ DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -168,10 +168,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_utility_statements.out b/src/test/regress/expected/multi_utility_statements.out index e4891e6c1..b7f4b3723 100644 --- a/src/test/regress/expected/multi_utility_statements.out +++ b/src/test/regress/expected/multi_utility_statements.out @@ -261,6 +261,12 @@ FETCH ABSOLUTE 5 FROM noHoldCursor; 1 | 5 | 24.00 | 0.10 (1 row) +FETCH BACKWARD noHoldCursor; + l_orderkey | l_linenumber | l_quantity | l_discount +------------+--------------+------------+------------ + 1 | 4 | 28.00 | 0.09 +(1 row) + COMMIT; FETCH ABSOLUTE 5 FROM noHoldCursor; ERROR: cursor "noholdcursor" does not exist diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 1e76bb3f2..99f53d6d1 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -33,11 +33,9 @@ test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate test: multi_average_expression multi_working_columns test: multi_array_agg test: multi_agg_type_conversion multi_count_type_conversion -test: multi_partition_pruning -test: multi_join_pruning multi_hash_pruning +test: multi_hash_pruning test: multi_null_minmax_value_pruning test: multi_query_directory_cleanup -test: multi_task_assignment_policy test: multi_utility_statements test: multi_dropped_column_aliases @@ -52,7 +50,7 @@ test: multi_tpch_query7 multi_tpch_query7_nested # Parallel tests to check our join order planning logic. Note that we load data # below; and therefore these tests should come after the execution tests. # ---------- -test: multi_join_order_tpch_small multi_join_order_additional +test: multi_join_order_additional test: multi_load_more_data test: multi_join_order_tpch_large diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index 11d00f8dc..4c5931f17 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -767,8 +767,7 @@ FROM QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -786,7 +785,7 @@ FROM Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) -(20 rows) +(19 rows) -- Union and left join subquery pushdown EXPLAIN SELECT @@ -855,8 +854,7 @@ GROUP BY ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: hasdone - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -894,7 +892,7 @@ GROUP BY Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=80) Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) -(41 rows) +(40 rows) -- Union, left join and having subquery pushdown EXPLAIN SELECT @@ -1023,8 +1021,7 @@ LIMIT Limit (cost=0.00..0.00 rows=0 width=0) -> Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: user_lastseen DESC - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -1047,6 +1044,6 @@ LIMIT Sort Key: events.event_time DESC -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) -(27 rows) +(26 rows) SET citus.enable_router_execution TO 'true'; diff --git a/src/test/regress/output/multi_subquery_0.source b/src/test/regress/output/multi_subquery_0.source index ecc2ef7eb..096c204e4 100644 --- a/src/test/regress/output/multi_subquery_0.source +++ b/src/test/regress/output/multi_subquery_0.source @@ -764,31 +764,28 @@ FROM GROUP BY tenant_id, user_id) AS subquery; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270014 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Aggregate (cost=40.01..40.02 rows=1 width=32) - -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Merge Join (cost=39.89..39.97 rows=1 width=556) - Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) - -> Sort (cost=28.08..28.09 rows=6 width=32) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Sort (cost=11.81..11.82 rows=3 width=556) - Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) - -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) - Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) - Master Query - -> Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0) -(22 rows) + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate (cost=40.01..40.02 rows=1 width=32) + -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Merge Join (cost=39.89..39.97 rows=1 width=556) + Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) + -> Sort (cost=28.08..28.09 rows=6 width=32) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Sort (cost=11.81..11.82 rows=3 width=556) + Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) + -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) + Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) +(19 rows) -- Union and left join subquery pushdown EXPLAIN SELECT @@ -853,49 +850,46 @@ FROM hasdone) AS subquery_top GROUP BY hasdone; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270015 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate (cost=91.94..91.96 rows=2 width=64) - Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) - -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) - -> Sort (cost=91.85..91.85 rows=2 width=88) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) - -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) - Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) - -> Unique (cost=79.46..79.48 rows=2 width=40) - -> Sort (cost=79.46..79.47 rows=2 width=40) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time - -> Append (cost=0.00..79.45 rows=2 width=40) - -> Nested Loop (cost=0.00..39.72 rows=1 width=40) - Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) - -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) - Filter: ((event_type)::text = 'click'::text) - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Nested Loop (cost=0.00..39.72 rows=1 width=40) - Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) - -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) - Filter: ((event_type)::text = 'submit'::text) - -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Materialize (cost=12.29..12.31 rows=1 width=48) - -> Unique (cost=12.29..12.30 rows=1 width=32) - -> Sort (cost=12.29..12.29 rows=1 width=32) - Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) - -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) - Master Query - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: intermediate_column_270015_2 - -> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0) -(40 rows) + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + HashAggregate (cost=0.00..0.00 rows=0 width=0) + Group Key: hasdone + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate (cost=91.94..91.96 rows=2 width=64) + Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) + -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Sort (cost=91.85..91.85 rows=2 width=88) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) + Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) + -> Unique (cost=79.46..79.48 rows=2 width=40) + -> Sort (cost=79.46..79.47 rows=2 width=40) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time + -> Append (cost=0.00..79.45 rows=2 width=40) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) + -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'click'::text) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) + -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'submit'::text) + -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Materialize (cost=12.29..12.31 rows=1 width=48) + -> Unique (cost=12.29..12.30 rows=1 width=32) + -> Sort (cost=12.29..12.29 rows=1 width=32) + Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) + -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) +(37 rows) -- Union, left join and having subquery pushdown EXPLAIN SELECT @@ -1019,37 +1013,34 @@ ORDER BY user_lastseen DESC LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270017 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit (cost=100.43..100.44 rows=6 width=56) - -> Sort (cost=100.43..100.44 rows=6 width=56) - Sort Key: (max(users.lastseen)) DESC - -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Sort (cost=100.14..100.16 rows=6 width=548) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) - -> Limit (cost=28.08..28.09 rows=6 width=40) - -> Sort (cost=28.08..28.09 rows=6 width=40) - Sort Key: users.lastseen DESC - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Limit (cost=11.96..11.96 rows=1 width=524) - -> Sort (cost=11.96..11.96 rows=1 width=524) - Sort Key: events.event_time DESC - -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) - Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) - Master Query - -> Limit (cost=0.00..0.00 rows=0 width=0) - -> Sort (cost=0.00..0.00 rows=0 width=0) - Sort Key: intermediate_column_270017_2 DESC - -> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0) -(29 rows) + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit (cost=0.00..0.00 rows=0 width=0) + -> Sort (cost=0.00..0.00 rows=0 width=0) + Sort Key: user_lastseen DESC + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit (cost=100.43..100.44 rows=6 width=56) + -> Sort (cost=100.43..100.44 rows=6 width=56) + Sort Key: (max(users.lastseen)) DESC + -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Sort (cost=100.14..100.16 rows=6 width=548) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) + -> Limit (cost=28.08..28.09 rows=6 width=40) + -> Sort (cost=28.08..28.09 rows=6 width=40) + Sort Key: users.lastseen DESC + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Limit (cost=11.96..11.96 rows=1 width=524) + -> Sort (cost=11.96..11.96 rows=1 width=524) + Sort Key: events.event_time DESC + -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) + Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) +(26 rows) SET citus.enable_router_execution TO 'true'; diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql index 3abf122b1..18d307017 100644 --- a/src/test/regress/sql/multi_join_order_additional.sql +++ b/src/test/regress/sql/multi_join_order_additional.sql @@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000; SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO DEBUG2; -- Create new table definitions for use in testing in distributed planning and diff --git a/src/test/regress/sql/multi_join_order_tpch_large.sql b/src/test/regress/sql/multi_join_order_tpch_large.sql index 12fc81c8b..20cf83dd0 100644 --- a/src/test/regress/sql/multi_join_order_tpch_large.sql +++ b/src/test/regress/sql/multi_join_order_tpch_large.sql @@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000; SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO LOG; -- Change configuration to treat lineitem, orders, customer, and part tables as diff --git a/src/test/regress/sql/multi_mx_reference_table.sql b/src/test/regress/sql/multi_mx_reference_table.sql index c4843c077..31f89b60b 100644 --- a/src/test/regress/sql/multi_mx_reference_table.sql +++ b/src/test/regress/sql/multi_mx_reference_table.sql @@ -304,6 +304,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- table creation queries inside can be router plannable diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index cbcc14d52..bdca55574 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -557,6 +557,7 @@ DECLARE test_cursor CURSOR FOR ORDER BY id; FETCH test_cursor; FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- queries inside copy can be router plannable diff --git a/src/test/regress/sql/multi_mx_schema_support.sql b/src/test/regress/sql/multi_mx_schema_support.sql index de631df22..b72a5ee04 100644 --- a/src/test/regress/sql/multi_mx_schema_support.sql +++ b/src/test/regress/sql/multi_mx_schema_support.sql @@ -21,6 +21,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_hash WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- test with search_path is set @@ -31,6 +33,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_hash WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 96af1c710..d2a4476b9 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -335,6 +335,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- table creation queries inside can be router plannable diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index fdea0f4a6..4d72d8e1f 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -917,6 +917,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- queries inside copy can be router plannable diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 1b84e71ff..70feb03de 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -113,6 +113,8 @@ DECLARE test_cursor CURSOR FOR FROM test_schema_support.nation_append WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- test with search_path is set @@ -123,6 +125,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_append WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; diff --git a/src/test/regress/sql/multi_utility_statements.sql b/src/test/regress/sql/multi_utility_statements.sql index a29842208..b1e8232c3 100644 --- a/src/test/regress/sql/multi_utility_statements.sql +++ b/src/test/regress/sql/multi_utility_statements.sql @@ -146,5 +146,6 @@ DECLARE noHoldCursor SCROLL CURSOR FOR ORDER BY l_orderkey, l_linenumber; FETCH ABSOLUTE 5 FROM noHoldCursor; +FETCH BACKWARD noHoldCursor; COMMIT; FETCH ABSOLUTE 5 FROM noHoldCursor;