diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 502b8e862..dd2c26733 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -24,7 +24,6 @@ #include "distributed/multi_utility.h" #include "distributed/worker_protocol.h" #include "executor/execdebug.h" -#include "executor/executor.h" #include "commands/copy.h" #include "nodes/makefuncs.h" #include "storage/lmgr.h" @@ -34,212 +33,389 @@ /* - * FIXME: It'd probably be better to have different set of methods for: - * - router readonly queries - * - router modify - * - router insert ... select - * - real-time/task-tracker (no point in seperating those) - * - * I think it's better however to only have one type of CitusScanState, to - * allow to easily share code between routines. + * Define executor methods for the different executor types. */ -static CustomExecMethods CitusCustomExecMethods = { - "CitusScan", - CitusBeginScan, - CitusExecScan, - CitusEndScan, - CitusReScan, -#if (PG_VERSION_NUM >= 90600) - NULL, /* NO EstimateDSMCustomScan callback */ - NULL, /* NO InitializeDSMCustomScan callback */ - NULL, /* NO InitializeWorkerCustomScan callback */ -#endif - NULL, - NULL, - CitusExplainScan +static CustomExecMethods RealTimeCustomExecMethods = { + .CustomName = "RealTimeScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RealTimeExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods TaskTrackerCustomExecMethods = { + .CustomName = "TaskTrackerScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = TaskTrackerExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSingleModifyCustomExecMethods = { + .CustomName = "RouterSingleModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterSingleModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterMultiModifyCustomExecMethods = { + .CustomName = "RouterMultiModifyScan", + .BeginCustomScan = CitusModifyBeginScan, + .ExecCustomScan = RouterMultiModifyExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan +}; + +static CustomExecMethods RouterSelectCustomExecMethods = { + .CustomName = "RouterSelectScan", + .BeginCustomScan = CitusSelectBeginScan, + .ExecCustomScan = RouterSelectExecScan, + .EndCustomScan = CitusEndScan, + .ReScanCustomScan = CitusReScan, + .ExplainCustomScan = CitusExplainScan }; +/* local function forward declarations */ +static void PrepareMasterJobDirectory(Job *workerJob); +static void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob); +static Relation StubRelation(TupleDesc tupleDescriptor); + + +/* + * RealTimeCreateScan creates the scan state for real-time executor queries. + */ Node * -CitusCreateScan(CustomScan *scan) +RealTimeCreateScan(CustomScan *scan) { CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + scanState->executorType = MULTI_EXECUTOR_REAL_TIME; scanState->customScanState.ss.ps.type = T_CustomScanState; - scanState->customScanState.methods = &CitusCustomExecMethods; scanState->multiPlan = GetMultiPlan(scan); - scanState->executorType = JobExecutorType(scanState->multiPlan); + + scanState->customScanState.methods = &RealTimeCustomExecMethods; return (Node *) scanState; } -void -CitusBeginScan(CustomScanState *node, - EState *estate, - int eflags) +/* + * TaskTrackerCreateScan creates the scan state for task-tracker executor queries. + */ +Node * +TaskTrackerCreateScan(CustomScan *scan) { - CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); - Assert(IsA(scanState, CustomScanState)); + scanState->executorType = MULTI_EXECUTOR_TASK_TRACKER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); - /* ensure plan is executable */ - VerifyMultiPlanValidity(multiPlan); + scanState->customScanState.methods = &TaskTrackerCustomExecMethods; - /* ExecCheckRTPerms(planStatement->rtable, true); */ - - if (scanState->executorType == MULTI_EXECUTOR_ROUTER) - { - RouterBeginScan(scanState); - } + return (Node *) scanState; } -TupleTableSlot * -CitusExecScan(CustomScanState *node) +/* + * RouterCreateScan creates the scan state for router executor queries. + */ +Node * +RouterCreateScan(CustomScan *scan) { - CitusScanState *scanState = (CitusScanState *) node; - MultiPlan *multiPlan = scanState->multiPlan; + CitusScanState *scanState = palloc0(sizeof(CitusScanState)); + MultiPlan *multiPlan = NULL; + Job *workerJob = NULL; + List *taskList = NIL; + bool isModificationQuery = false; - if (scanState->executorType == MULTI_EXECUTOR_ROUTER) + scanState->executorType = MULTI_EXECUTOR_ROUTER; + scanState->customScanState.ss.ps.type = T_CustomScanState; + scanState->multiPlan = GetMultiPlan(scan); + + multiPlan = scanState->multiPlan; + workerJob = multiPlan->workerJob; + taskList = workerJob->taskList; + + isModificationQuery = IsModifyMultiPlan(multiPlan); + + /* check if this is a single shard query */ + if (list_length(taskList) == 1) { - return RouterExecScan(scanState); + if (isModificationQuery) + { + scanState->customScanState.methods = &RouterSingleModifyCustomExecMethods; + } + else + { + scanState->customScanState.methods = &RouterSelectCustomExecMethods; + } } else { - TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + Assert(isModificationQuery); + scanState->customScanState.methods = &RouterMultiModifyCustomExecMethods; + } - if (!scanState->finishedUnderlyingScan) + return (Node *) scanState; +} + + +/* + * DelayedErrorCreateScan is only called if we could not plan for the given + * query. This is the case when a plan is not ready for execution because + * CreateDistributedPlan() couldn't find a plan due to unresolved prepared + * statement parameters, but didn't error out, because we expect custom plans + * to come to our rescue. But sql (not plpgsql) functions unfortunately don't + * go through a codepath supporting custom plans. Here, we error out with this + * delayed error message. + */ +Node * +DelayedErrorCreateScan(CustomScan *scan) +{ + MultiPlan *multiPlan = GetMultiPlan(scan); + + /* raise the deferred error */ + RaiseDeferredError(multiPlan->planningError, ERROR); + + return NULL; +} + + +/* + * CitusSelectBeginScan is an empty function for BeginCustomScan callback. + */ +void +CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) +{ + /* just an empty function */ +} + + +/* + * RealTimeExecScan is a callback function which returns next tuple from a real-time + * execution. In the first call, it executes distributed real-time plan and loads + * results from temporary files into custom scan's tuple store. Then, it returns + * tuples one by one from this tuple store. + */ +TupleTableSlot * +RealTimeExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + + PrepareMasterJobDirectory(workerJob); + MultiRealTimeExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * PrepareMasterJobDirectory creates a directory on the master node to keep job + * execution results. We also register this directory for automatic cleanup on + * portal delete. + */ +static void +PrepareMasterJobDirectory(Job *workerJob) +{ + StringInfo jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); + CreateDirectory(jobDirectoryName); + + ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); + ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); +} + + +/* + * Load data collected by real-time or task-tracker executors into the tuplestore + * of CitusScanState. For that, we first create a tuple store, and then copy the + * files one-by-one into the tuple store. + * + * Note that in the long term it'd be a lot better if Multi*Execute() directly + * filled the tuplestores, but that's a fair bit of work. + */ +static void +LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob) +{ + CustomScanState customScanState = citusScanState->customScanState; + List *workerTaskList = workerJob->taskList; + List *copyOptions = NIL; + EState *executorState = NULL; + MemoryContext executorTupleContext = NULL; + ExprContext *executorExpressionContext = NULL; + TupleDesc tupleDescriptor = NULL; + Relation stubRelation = NULL; + ListCell *workerTaskCell = NULL; + uint32 columnCount = 0; + Datum *columnValues = NULL; + bool *columnNulls = NULL; + bool randomAccess = true; + bool interTransactions = false; + + executorState = citusScanState->customScanState.ss.ps.state; + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); + + tupleDescriptor = customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; + stubRelation = StubRelation(tupleDescriptor); + + columnCount = tupleDescriptor->natts; + columnValues = palloc0(columnCount * sizeof(Datum)); + columnNulls = palloc0(columnCount * sizeof(bool)); + + Assert(citusScanState->tuplestorestate == NULL); + citusScanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); + + if (BinaryMasterCopyFormat) + { + DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary")); + copyOptions = lappend(copyOptions, copyOption); + } + + foreach(workerTaskCell, workerTaskList) + { + Task *workerTask = (Task *) lfirst(workerTaskCell); + StringInfo jobDirectoryName = NULL; + StringInfo taskFilename = NULL; + CopyState copyState = NULL; + + jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); + taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId); + + copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL, + copyOptions); + + while (true) { - Job *workerJob = multiPlan->workerJob; - StringInfo jobDirectoryName = NULL; - EState *executorState = scanState->customScanState.ss.ps.state; - List *workerTaskList = workerJob->taskList; - ListCell *workerTaskCell = NULL; - TupleDesc tupleDescriptor = NULL; - Relation fakeRel = NULL; - MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); - ExprContext *executorExpressionContext = - GetPerTupleExprContext(executorState); - uint32 columnCount = 0; - Datum *columnValues = NULL; - bool *columnNulls = NULL; + MemoryContext oldContext = NULL; + bool nextRowFound = false; - /* - * We create a directory on the master node to keep task execution results. - * We also register this directory for automatic cleanup on portal delete. - */ - jobDirectoryName = MasterJobDirectoryName(workerJob->jobId); - CreateDirectory(jobDirectoryName); + ResetPerTupleExprContext(executorState); + oldContext = MemoryContextSwitchTo(executorTupleContext); - ResourceOwnerEnlargeJobDirectories(CurrentResourceOwner); - ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId); - - /* pick distributed executor to use */ - if (executorState->es_top_eflags & EXEC_FLAG_EXPLAIN_ONLY) + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + if (!nextRowFound) { - /* skip distributed query execution for EXPLAIN commands */ - } - else if (scanState->executorType == MULTI_EXECUTOR_REAL_TIME) - { - MultiRealTimeExecute(workerJob); - } - else if (scanState->executorType == MULTI_EXECUTOR_TASK_TRACKER) - { - MultiTaskTrackerExecute(workerJob); + MemoryContextSwitchTo(oldContext); + break; } - tupleDescriptor = node->ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; - - /* - * Load data, collected by Multi*Execute() above, into a - * tuplestore. For that first create a tuplestore, and then copy - * the files one-by-one. - * - * FIXME: Should probably be in a separate routine. - * - * Long term it'd be a lot better if Multi*Execute() directly - * filled the tuplestores, but that's a fair bit of work. - */ - - /* - * To be able to use copy.c, we need a Relation descriptor. As - * there's no relation corresponding to the data loaded from - * workers, fake one. We just need the bare minimal set of fields - * accessed by BeginCopyFrom(). - * - * FIXME: should be abstracted into a separate function. - */ - fakeRel = palloc0(sizeof(RelationData)); - fakeRel->rd_att = tupleDescriptor; - fakeRel->rd_rel = palloc0(sizeof(FormData_pg_class)); - fakeRel->rd_rel->relkind = RELKIND_RELATION; - - columnCount = tupleDescriptor->natts; - columnValues = palloc0(columnCount * sizeof(Datum)); - columnNulls = palloc0(columnCount * sizeof(bool)); - - Assert(scanState->tuplestorestate == NULL); - scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); - - foreach(workerTaskCell, workerTaskList) - { - Task *workerTask = (Task *) lfirst(workerTaskCell); - StringInfo jobDirectoryName = MasterJobDirectoryName(workerTask->jobId); - StringInfo taskFilename = - TaskFilename(jobDirectoryName, workerTask->taskId); - List *copyOptions = NIL; - CopyState copyState = NULL; - - if (BinaryMasterCopyFormat) - { - DefElem *copyOption = makeDefElem("format", - (Node *) makeString("binary")); - copyOptions = lappend(copyOptions, copyOption); - } - copyState = BeginCopyFrom(fakeRel, taskFilename->data, false, NULL, - copyOptions); - - while (true) - { - MemoryContext oldContext = NULL; - bool nextRowFound = false; - - ResetPerTupleExprContext(executorState); - oldContext = MemoryContextSwitchTo(executorTupleContext); - - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - if (!nextRowFound) - { - MemoryContextSwitchTo(oldContext); - break; - } - - tuplestore_putvalues(scanState->tuplestorestate, - tupleDescriptor, - columnValues, columnNulls); - MemoryContextSwitchTo(oldContext); - } - } - - scanState->finishedUnderlyingScan = true; + tuplestore_putvalues(citusScanState->tuplestorestate, tupleDescriptor, + columnValues, columnNulls); + MemoryContextSwitchTo(oldContext); } - if (scanState->tuplestorestate != NULL) - { - Tuplestorestate *tupleStore = scanState->tuplestorestate; - tuplestore_gettupleslot(tupleStore, true, false, resultSlot); - - return resultSlot; - } - - return NULL; + EndCopyFrom(copyState); } } +/* + * StubRelation creates a stub Relation from the given tuple descriptor. + * To be able to use copy.c, we need a Relation descriptor. As there is no + * relation corresponding to the data loaded from workers, we need to fake one. + * We just need the bare minimal set of fields accessed by BeginCopyFrom(). + */ +static Relation +StubRelation(TupleDesc tupleDescriptor) +{ + Relation stubRelation = palloc0(sizeof(RelationData)); + stubRelation->rd_att = tupleDescriptor; + stubRelation->rd_rel = palloc0(sizeof(FormData_pg_class)); + stubRelation->rd_rel->relkind = RELKIND_RELATION; + + return stubRelation; +} + + +/* + * ReturnTupleFromTuplestore reads the next tuple from the tuple store of the + * given Citus scan node and returns it. It returns null if all tuples are read + * from the tuple store. + */ +TupleTableSlot * +ReturnTupleFromTuplestore(CitusScanState *scanState) +{ + Tuplestorestate *tupleStore = scanState->tuplestorestate; + TupleTableSlot *resultSlot = NULL; + ScanDirection scanDirection = NoMovementScanDirection; + bool forwardScanDirection = true; + + if (tupleStore == NULL) + { + return NULL; + } + + scanDirection = scanState->customScanState.ss.ps.state->es_direction; + Assert(ScanDirectionIsValid(scanDirection)); + + if (ScanDirectionIsBackward(scanDirection)) + { + forwardScanDirection = false; + } + + resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + tuplestore_gettupleslot(tupleStore, forwardScanDirection, false, resultSlot); + + return resultSlot; +} + + +/* + * TaskTrackerExecScan is a callback function which returns next tuple from a + * task-tracker execution. In the first call, it executes distributed task-tracker + * plan and loads results from temporary files into custom scan's tuple store. + * Then, it returns tuples one by one from this tuple store. + */ +TupleTableSlot * +TaskTrackerExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + + PrepareMasterJobDirectory(workerJob); + MultiTaskTrackerExecute(workerJob); + + LoadTuplesIntoTupleStore(scanState, workerJob); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * CitusEndScan is used to clean up tuple store of the given custom scan state. + */ void CitusEndScan(CustomScanState *node) { @@ -253,17 +429,14 @@ CitusEndScan(CustomScanState *node) } +/* + * CitusReScan is just a place holder for rescan callback. Currently, we don't + * support rescan given that there is not any way to reach this code path. + */ void CitusReScan(CustomScanState *node) { - CitusScanState *scanState = (CitusScanState *) node; - - scanState->tuplestorestate = NULL; - scanState->finishedUnderlyingScan = true; - - /* - * XXX: this probably already works, but if not should be easily - * supportable - probably hard to exercise right now though. - */ - elog(WARNING, "unsupported at this point"); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("rescan is unsupported"), + errdetail("We don't expect this code path to be executed."))); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 6233b0d90..9a26090f8 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -77,26 +77,24 @@ static void ReacquireMetadataLocks(List *taskList); static void ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults); static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task); -static List * GetModifyConnections(List *taskPlacementList, - bool markCritical, +static List * GetModifyConnections(List *taskPlacementList, bool markCritical, bool startedInTransaction); static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults); static int64 ExecuteModifyTasks(List *taskList, bool expectResults, - ParamListInfo paramListInfo, - CitusScanState *scanState, - TupleDesc tupleDescriptor); + ParamListInfo paramListInfo, CitusScanState *scanState); static List * TaskShardIntervalList(List *taskList); static void AcquireExecutorShardLock(Task *task, CmdType commandType); static void AcquireExecutorMultiShardLocks(List *taskList); static bool RequiresConsistentSnapshot(Task *task); +static void ProcessMasterEvaluableFunctions(Job *workerJob); static void ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterTypes, const char ***parameterValues); static bool SendQueryInSingleRowMode(MultiConnection *connection, char *query, ParamListInfo paramListInfo); static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - TupleDesc tupleDescriptor, bool failOnError, int64 *rows); + bool failOnError, int64 *rows); static bool ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows); @@ -407,9 +405,14 @@ RequiresConsistentSnapshot(Task *task) } +/* + * CitusModifyBeginScan checks the validity of the given custom scan node and + * gets locks on the shards involved in the task list of the distributed plan. + */ void -RouterBeginScan(CitusScanState *scanState) +CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) { + CitusScanState *scanState = (CitusScanState *) node; MultiPlan *multiPlan = scanState->multiPlan; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; @@ -428,77 +431,115 @@ RouterBeginScan(CitusScanState *scanState) } +/* + * RouterSingleModifyExecScan executes a single modification query on a + * distributed plan and returns results if there is any. + */ TupleTableSlot * -RouterExecScan(CitusScanState *scanState) +RouterSingleModifyExecScan(CustomScanState *node) { - MultiPlan *multiPlan = scanState->multiPlan; - TupleTableSlot *resultSlot = scanState->customScanState.ss.ps.ps_ResultTupleSlot; + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; - if (!scanState->finishedUnderlyingScan) + if (!scanState->finishedRemoteScan) { + MultiPlan *multiPlan = scanState->multiPlan; + bool hasReturning = multiPlan->hasReturning; Job *workerJob = multiPlan->workerJob; List *taskList = workerJob->taskList; - bool requiresMasterEvaluation = workerJob->requiresMasterEvaluation; - bool isModificationQuery = false; - CmdType operation = multiPlan->operation; + Task *task = (Task *) linitial(taskList); - /* should use IsModificationStmt or such */ - if (operation == CMD_INSERT || operation == CMD_UPDATE || - operation == CMD_DELETE) - { - isModificationQuery = true; - } + ProcessMasterEvaluableFunctions(workerJob); - if (requiresMasterEvaluation) - { - Query *jobQuery = workerJob->jobQuery; + ExecuteSingleModifyTask(scanState, task, hasReturning); - ExecuteMasterEvaluableFunctions(jobQuery); - RebuildQueryStrings(jobQuery, taskList); - } - - if (list_length(taskList) == 1) - { - Task *task = (Task *) linitial(taskList); - - if (isModificationQuery) - { - bool sendTuples = multiPlan->hasReturning; - ExecuteSingleModifyTask(scanState, task, sendTuples); - } - else - { - ExecuteSingleSelectTask(scanState, task); - } - } - else - { - bool sendTuples = multiPlan->hasReturning; - ExecuteMultipleTasks(scanState, taskList, isModificationQuery, - sendTuples); - } - - /* mark underlying query as having executed */ - scanState->finishedUnderlyingScan = true; + scanState->finishedRemoteScan = true; } - /* if the underlying query produced output, return it */ + resultSlot = ReturnTupleFromTuplestore(scanState); - /* - * FIXME: centralize this into function to be shared between router and - * other executors? - */ - if (scanState->tuplestorestate != NULL) + return resultSlot; +} + + +/* + * ProcessMasterEvaluableFunctions executes evaluable functions and rebuilds + * the query strings in task lists. + */ +static void +ProcessMasterEvaluableFunctions(Job *workerJob) +{ + if (workerJob->requiresMasterEvaluation) { - Tuplestorestate *tupleStore = scanState->tuplestorestate; + Query *jobQuery = workerJob->jobQuery; + List *taskList = workerJob->taskList; - /* XXX: could trivially support backward scans here */ - tuplestore_gettupleslot(tupleStore, true, false, resultSlot); + ExecuteMasterEvaluableFunctions(jobQuery); + RebuildQueryStrings(jobQuery, taskList); + } +} - return resultSlot; + +/* + * RouterMultiModifyExecScan executes a list of tasks on remote nodes, retrieves + * the results and, if RETURNING is used, stores them in custom scan's tuple store. + * Then, it returns tuples one by one from this tuple store. + */ +TupleTableSlot * +RouterMultiModifyExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; + bool hasReturning = multiPlan->hasReturning; + bool isModificationQuery = true; + + ProcessMasterEvaluableFunctions(workerJob); + + ExecuteMultipleTasks(scanState, taskList, isModificationQuery, hasReturning); + + scanState->finishedRemoteScan = true; } - return NULL; + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; +} + + +/* + * RouterSelectExecScan executes a single select task on the remote node, + * retrieves the results and stores them in custom scan's tuple store. Then, it + * returns tuples one by one from this tuple store. + */ +TupleTableSlot * +RouterSelectExecScan(CustomScanState *node) +{ + CitusScanState *scanState = (CitusScanState *) node; + TupleTableSlot *resultSlot = NULL; + + if (!scanState->finishedRemoteScan) + { + MultiPlan *multiPlan = scanState->multiPlan; + Job *workerJob = multiPlan->workerJob; + List *taskList = workerJob->taskList; + Task *task = (Task *) linitial(taskList); + + ProcessMasterEvaluableFunctions(workerJob); + + ExecuteSingleSelectTask(scanState, task); + + scanState->finishedRemoteScan = true; + } + + resultSlot = ReturnTupleFromTuplestore(scanState); + + return resultSlot; } @@ -512,8 +553,6 @@ RouterExecScan(CitusScanState *scanState) static void ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; ParamListInfo paramListInfo = scanState->customScanState.ss.ps.state->es_param_list_info; List *taskPlacementList = task->taskPlacementList; @@ -547,8 +586,8 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) continue; } - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - dontFailOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, dontFailOnError, + ¤tAffectedTupleCount); if (queryOK) { return; @@ -569,21 +608,19 @@ ExecuteSingleSelectTask(CitusScanState *scanState, Task *task) * framework), or errors out (failed on all placements). */ static void -ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, - bool expectResults) +ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, bool expectResults) { CmdType operation = scanState->multiPlan->operation; - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; EState *executorState = scanState->customScanState.ss.ps.state; ParamListInfo paramListInfo = executorState->es_param_list_info; - bool resultsOK = false; List *taskPlacementList = task->taskPlacementList; List *connectionList = NIL; ListCell *taskPlacementCell = NULL; ListCell *connectionCell = NULL; int64 affectedTupleCount = -1; + bool resultsOK = false; bool gotResults = false; + char *queryString = task->queryString; bool taskRequiresTwoPhaseCommit = (task->replicationModel == REPLICATION_MODEL_2PC); bool startedInTransaction = @@ -669,8 +706,8 @@ ExecuteSingleModifyTask(CitusScanState *scanState, Task *task, */ if (!gotResults && expectResults) { - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - failOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, failOnError, + ¤tAffectedTupleCount); } else { @@ -804,8 +841,6 @@ static void ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, bool isModificationQuery, bool expectResults) { - TupleDesc tupleDescriptor = - scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; EState *executorState = scanState->customScanState.ss.ps.state; ParamListInfo paramListInfo = executorState->es_param_list_info; int64 affectedTupleCount = -1; @@ -813,9 +848,8 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, /* can only support modifications right now */ Assert(isModificationQuery); - /* XXX: Seems very redundant to pass both scanState and tupleDescriptor */ affectedTupleCount = ExecuteModifyTasks(taskList, expectResults, paramListInfo, - scanState, tupleDescriptor); + scanState); executorState->es_processed = affectedTupleCount; } @@ -831,7 +865,7 @@ ExecuteMultipleTasks(CitusScanState *scanState, List *taskList, int64 ExecuteModifyTasksWithoutResults(List *taskList) { - return ExecuteModifyTasks(taskList, false, NULL, NULL, NULL); + return ExecuteModifyTasks(taskList, false, NULL, NULL); } @@ -845,7 +879,7 @@ ExecuteModifyTasksWithoutResults(List *taskList) */ static int64 ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListInfo, - CitusScanState *scanState, TupleDesc tupleDescriptor) + CitusScanState *scanState) { int64 totalAffectedTupleCount = 0; ListCell *taskCell = NULL; @@ -929,8 +963,7 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn continue; } - connection = - (MultiConnection *) list_nth(connectionList, placementIndex); + connection = (MultiConnection *) list_nth(connectionList, placementIndex); queryOK = SendQueryInSingleRowMode(connection, queryString, paramListInfo); if (!queryOK) @@ -975,10 +1008,10 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn */ if (placementIndex == 0 && expectResults) { - Assert(scanState != NULL && tupleDescriptor != NULL); + Assert(scanState != NULL); - queryOK = StoreQueryResult(scanState, connection, tupleDescriptor, - failOnError, ¤tAffectedTupleCount); + queryOK = StoreQueryResult(scanState, connection, failOnError, + ¤tAffectedTupleCount); } else { @@ -1184,13 +1217,17 @@ ExtractParametersFromParamListInfo(ParamListInfo paramListInfo, Oid **parameterT */ static bool StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, - TupleDesc tupleDescriptor, bool failOnError, int64 *rows) + bool failOnError, int64 *rows) { + TupleDesc tupleDescriptor = + scanState->customScanState.ss.ps.ps_ResultTupleSlot->tts_tupleDescriptor; AttInMetadata *attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor); - Tuplestorestate *tupleStore = NULL; List *targetList = scanState->customScanState.ss.ps.plan->targetlist; uint32 expectedColumnCount = ExecCleanTargetListLength(targetList); char **columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *)); + Tuplestorestate *tupleStore = NULL; + bool randomAccess = true; + bool interTransactions = false; bool commandFailed = false; MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext, "StoreQueryResult", @@ -1201,7 +1238,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection, if (scanState->tuplestorestate == NULL) { - scanState->tuplestorestate = tuplestore_begin_heap(false, false, work_mem); + scanState->tuplestorestate = + tuplestore_begin_heap(randomAccess, interTransactions, work_mem); } else if (!failOnError) { @@ -1403,39 +1441,3 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows) return gotResponse && !commandFailed; } - - -/* - * RouterExecutorFinish cleans up after a distributed execution. - */ -void -RouterExecutorFinish(QueryDesc *queryDesc) -{ - EState *estate = queryDesc->estate; - Assert(estate != NULL); - - estate->es_finished = true; -} - - -/* - * RouterExecutorEnd cleans up the executor state after a distributed - * execution. - */ -void -RouterExecutorEnd(QueryDesc *queryDesc) -{ - EState *estate = queryDesc->estate; - MaterialState *routerState = (MaterialState *) queryDesc->planstate; - - if (routerState->tuplestorestate) - { - tuplestore_end(routerState->tuplestorestate); - } - - Assert(estate != NULL); - - FreeExecutorState(estate); - queryDesc->estate = NULL; - queryDesc->totaltime = NULL; -} diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 97ef0ee7b..25fa2ab9c 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -84,12 +84,17 @@ static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es); static void ExplainJSONLineEnding(ExplainState *es); static void ExplainYAMLLineStarting(ExplainState *es); + +/* + * CitusExplainScan is a custom scan explain callback function which is used to + * print explain information of a Citus plan which includes both master and + * distributed plan. + */ void CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es) { CitusScanState *scanState = (CitusScanState *) node; MultiPlan *multiPlan = scanState->multiPlan; - const char *executorName = NULL; if (!ExplainDistributedQueries) { @@ -99,44 +104,8 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es return; } - /* - * XXX: can we get by without the open/close group somehow - then we'd not - * copy any code from explain.c? Seems unlikely. - */ ExplainOpenGroup("Distributed Query", "Distributed Query", true, es); - /* - * XXX: might be worthwhile to put this somewhere central, e.g. for - * debugging output. - */ - switch (scanState->executorType) - { - case MULTI_EXECUTOR_ROUTER: - { - executorName = "Router"; - } - break; - - case MULTI_EXECUTOR_REAL_TIME: - { - executorName = "Real-Time"; - } - break; - - case MULTI_EXECUTOR_TASK_TRACKER: - { - executorName = "Task-Tracker"; - } - break; - - default: - { - executorName = "Other"; - } - break; - } - ExplainPropertyText("Executor", executorName, es); - ExplainJob(multiPlan->workerJob, es); ExplainCloseGroup("Distributed Query", "Distributed Query", true, es); diff --git a/src/backend/distributed/planner/multi_master_planner.c b/src/backend/distributed/planner/multi_master_planner.c index facff1f9d..6b4be984f 100644 --- a/src/backend/distributed/planner/multi_master_planner.c +++ b/src/backend/distributed/planner/multi_master_planner.c @@ -15,6 +15,7 @@ #include "distributed/multi_master_planner.h" #include "distributed/multi_physical_planner.h" +#include "distributed/multi_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/worker_protocol.h" #include "nodes/makefuncs.h" @@ -34,7 +35,7 @@ * a target target list for the master node. This master target list keeps the * temporary table's columns on the master node. */ -List * +static List * MasterTargetList(List *workerTargetList) { List *masterTargetList = NIL; @@ -164,67 +165,57 @@ BuildAggregatePlan(Query *masterQuery, Plan *subPlan) /* * BuildSelectStatement builds the final select statement to run on the master - * node, before returning results to the user. The function first builds a scan - * statement for all results fetched to the master, and layers aggregation, sort + * node, before returning results to the user. The function first gets the custom + * scan node for all results fetched to the master, and layers aggregation, sort * and limit plans on top of the scan statement if necessary. */ static PlannedStmt * -BuildSelectStatement(Query *masterQuery, char *masterTableName, - List *masterTargetList, CustomScan *dataScan) +BuildSelectStatement(Query *masterQuery, List *masterTargetList, CustomScan *remoteScan) { PlannedStmt *selectStatement = NULL; - RangeTblEntry *rangeTableEntry = NULL; - RangeTblEntry *queryRangeTableEntry = NULL; + RangeTblEntry *customScanRangeTableEntry = NULL; Agg *aggregationPlan = NULL; Plan *topLevelPlan = NULL; - ListCell *lc = NULL; - List *columnNames = NULL; - - /* (0) compute column names */ - foreach(lc, masterTargetList) - { - TargetEntry *te = lfirst(lc); - columnNames = lappend(columnNames, makeString(te->resname)); - } + ListCell *targetEntryCell = NULL; + List *columnNameList = NULL; /* (1) make PlannedStmt and set basic information */ selectStatement = makeNode(PlannedStmt); selectStatement->canSetTag = true; - selectStatement->relationOids = NIL; /* to be filled in exec_Start */ + selectStatement->relationOids = NIL; selectStatement->commandType = CMD_SELECT; - /* prepare the range table entry for our temporary table */ + /* top level select query should have only one range table entry */ Assert(list_length(masterQuery->rtable) == 1); - queryRangeTableEntry = (RangeTblEntry *) linitial(masterQuery->rtable); - rangeTableEntry = copyObject(queryRangeTableEntry); - rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */ - rangeTableEntry->eref = makeAlias("remote scan", columnNames); - rangeTableEntry->inh = false; - rangeTableEntry->inFromCl = true; + /* compute column names for the custom range table entry */ + foreach(targetEntryCell, masterTargetList) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + columnNameList = lappend(columnNameList, makeString(targetEntry->resname)); + } + + customScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); /* set the single element range table list */ - selectStatement->rtable = list_make1(rangeTableEntry); + selectStatement->rtable = list_make1(customScanRangeTableEntry); - /* (2) build and initialize sequential scan node */ - /* Gone */ - - /* (3) add an aggregation plan if needed */ + /* (2) add an aggregation plan if needed */ if (masterQuery->hasAggs || masterQuery->groupClause) { - dataScan->scan.plan.targetlist = masterTargetList; + remoteScan->scan.plan.targetlist = masterTargetList; - aggregationPlan = BuildAggregatePlan(masterQuery, &dataScan->scan.plan); + aggregationPlan = BuildAggregatePlan(masterQuery, &remoteScan->scan.plan); topLevelPlan = (Plan *) aggregationPlan; } else { /* otherwise set the final projections on the scan plan directly */ - dataScan->scan.plan.targetlist = masterQuery->targetList; - topLevelPlan = &dataScan->scan.plan; + remoteScan->scan.plan.targetlist = masterQuery->targetList; + topLevelPlan = &remoteScan->scan.plan; } - /* (4) add a sorting plan if needed */ + /* (3) add a sorting plan if needed */ if (masterQuery->sortClause) { List *sortClauseList = masterQuery->sortClause; @@ -242,7 +233,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, topLevelPlan = (Plan *) sortPlan; } - /* (5) add a limit plan if needed */ + /* (4) add a limit plan if needed */ if (masterQuery->limitCount || masterQuery->limitOffset) { Node *limitCount = masterQuery->limitCount; @@ -259,7 +250,7 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, topLevelPlan = (Plan *) limitPlan; } - /* (6) finally set our top level plan in the plan tree */ + /* (5) finally set our top level plan in the plan tree */ selectStatement->planTree = topLevelPlan; return selectStatement; @@ -267,24 +258,24 @@ BuildSelectStatement(Query *masterQuery, char *masterTableName, /* - * MasterNodeSelectPlan takes in a distributed plan, finds the master node query - * structure in that plan, and builds the final select plan to execute on the - * master node. Note that this select plan is executed after result files are - * retrieved from worker nodes and are merged into a temporary table. + * MasterNodeSelectPlan takes in a distributed plan and a custom scan node which + * wraps remote part of the plan. This function finds the master node query + * structure in the multi plan, and builds the final select plan to execute on + * the tuples returned by remote scan on the master node. Note that this select + * plan is executed after result files are retrieved from worker nodes and + * filled into the tuple store inside provided custom scan. */ PlannedStmt * -MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *dataScan) +MasterNodeSelectPlan(MultiPlan *multiPlan, CustomScan *remoteScan) { Query *masterQuery = multiPlan->masterQuery; - char *tableName = multiPlan->masterTableName; PlannedStmt *masterSelectPlan = NULL; Job *workerJob = multiPlan->workerJob; List *workerTargetList = workerJob->jobQuery->targetList; List *masterTargetList = MasterTargetList(workerTargetList); - masterSelectPlan = - BuildSelectStatement(masterQuery, tableName, masterTargetList, dataScan); + masterSelectPlan = BuildSelectStatement(masterQuery, masterTargetList, remoteScan); return masterSelectPlan; } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a4a87c372..a33808d5b 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -196,9 +196,7 @@ MultiPlan * MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) { MultiPlan *multiPlan = NULL; - StringInfo jobSchemaName = NULL; Job *workerJob = NULL; - uint64 workerJobId = 0; Query *masterQuery = NULL; List *masterDependedJobList = NIL; @@ -207,10 +205,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) /* create the tree of executable tasks for the worker job */ workerJob = BuildJobTreeTaskList(workerJob); - workerJobId = workerJob->jobId; - - /* get job schema name */ - jobSchemaName = JobSchemaName(workerJobId); /* build the final merge query to execute on the master */ masterDependedJobList = list_make1(workerJob); @@ -219,7 +213,6 @@ MultiPhysicalPlanCreate(MultiTreeRoot *multiTree) multiPlan = CitusMakeNode(MultiPlan); multiPlan->workerJob = workerJob; multiPlan->masterQuery = masterQuery; - multiPlan->masterTableName = jobSchemaName->data; multiPlan->routerExecutable = MultiPlanRouterExecutable(multiPlan); multiPlan->operation = CMD_SELECT; diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c index bc0c1335f..7839c7cfc 100644 --- a/src/backend/distributed/planner/multi_planner.c +++ b/src/backend/distributed/planner/multi_planner.c @@ -23,30 +23,48 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_master_planner.h" #include "distributed/multi_router_planner.h" - #include "executor/executor.h" - #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" - #include "optimizer/planner.h" - #include "utils/memutils.h" static List *relationRestrictionContextList = NIL; +/* create custom scan methods for separate executors */ +static CustomScanMethods RealTimeCustomScanMethods = { + "Citus Real-Time", + RealTimeCreateScan +}; + +static CustomScanMethods TaskTrackerCustomScanMethods = { + "Citus Task-Tracker", + TaskTrackerCreateScan +}; + +static CustomScanMethods RouterCustomScanMethods = { + "Citus Router", + RouterCreateScan +}; + +static CustomScanMethods DelayedErrorCustomScanMethods = { + "Citus Delayed Error", + DelayedErrorCreateScan +}; + /* local function forward declarations */ +static PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, + Query *query, ParamListInfo boundParams, + RelationRestrictionContext *restrictionContext); +static Node * SerializeMultiPlan(struct MultiPlan *multiPlan); +static MultiPlan * DeserializeMultiPlan(Node *node); +static PlannedStmt * FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan); +static PlannedStmt * FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, + CustomScan *customScan); +static PlannedStmt * FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan); static void CheckNodeIsDumpable(Node *node); -static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, - struct MultiPlan *multiPlan); -static struct PlannedStmt * CreateDistributedPlan(PlannedStmt *localPlan, - Query *originalQuery, - Query *query, - ParamListInfo boundParams, - RelationRestrictionContext * - restrictionContext); static RelationRestrictionContext * CreateAndPushRestrictionContext(void); static RelationRestrictionContext * CurrentRestrictionContext(void); static void PopRestrictionContext(void); @@ -144,22 +162,21 @@ IsModifyCommand(Query *query) /* - * VerifyMultiPlanValidity verifies that multiPlan is ready for execution, or - * errors out if not. - * - * A plan may e.g. not be ready for execution because CreateDistributedPlan() - * couldn't find a plan due to unresolved prepared statement parameters, but - * didn't error out, because we expect custom plans to come to our rescue. - * But sql (not plpgsql) functions unfortunately don't go through a codepath - * supporting custom plans. + * IsModifyMultiPlan returns true if the multi plan performs modifications, + * false otherwise. */ -void -VerifyMultiPlanValidity(MultiPlan *multiPlan) +bool +IsModifyMultiPlan(MultiPlan *multiPlan) { - if (multiPlan->planningError) + bool isModifyMultiPlan = false; + CmdType operation = multiPlan->operation; + + if (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE) { - RaiseDeferredError(multiPlan->planningError, ERROR); + isModifyMultiPlan = true; } + + return isModifyMultiPlan; } @@ -274,8 +291,8 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query RaiseDeferredError(distributedPlan->planningError, ERROR); } - /* store required data into the planned statement */ - resultPlan = MultiQueryContainerNode(localPlan, distributedPlan); + /* create final plan by combining local plan with distributed plan */ + resultPlan = FinalizePlan(localPlan, distributedPlan); /* * As explained above, force planning costs to be unrealistically high if @@ -294,12 +311,6 @@ CreateDistributedPlan(PlannedStmt *localPlan, Query *originalQuery, Query *query } -static CustomScanMethods CitusCustomScanMethods = { - "CitusScan", - CitusCreateScan -}; - - /* * GetMultiPlan returns the associated MultiPlan for a CustomScan. */ @@ -308,41 +319,34 @@ GetMultiPlan(CustomScan *customScan) { MultiPlan *multiPlan = NULL; - Assert(IsA(customScan, CustomScan)); - Assert(customScan->methods == &CitusCustomScanMethods); Assert(list_length(customScan->custom_private) == 1); - multiPlan = DeSerializeMultiPlan(linitial(customScan->custom_private)); + multiPlan = DeserializeMultiPlan(linitial(customScan->custom_private)); return multiPlan; } -/* Does the passed in statement require distributed execution? */ -bool -HasCitusToplevelNode(PlannedStmt *result) +/* + * SerializeMultiPlan returns the string representing the distributed plan in a + * Const node. + * + * Note that this should be improved for 9.6+, we we can copy trees efficiently. + * I.e. we should introduce copy support for relevant node types, and just + * return the MultiPlan as-is for 9.6. + */ +static Node * +SerializeMultiPlan(MultiPlan *multiPlan) { - elog(ERROR, "gone"); -} - - -Node * -SerializableMultiPlan(MultiPlan *multiPlan) -{ - /* - * FIXME: This should be improved for 9.6+, we we can copy trees - * efficiently. I.e. we should introduce copy support for relevant node - * types, and just return the MultiPlan as-is for 9.6. - */ - char *serializedPlan = NULL; + char *serializedMultiPlan = NULL; Const *multiPlanData = NULL; - serializedPlan = CitusNodeToString(multiPlan); + serializedMultiPlan = CitusNodeToString(multiPlan); multiPlanData = makeNode(Const); multiPlanData->consttype = CSTRINGOID; - multiPlanData->constlen = strlen(serializedPlan); - multiPlanData->constvalue = CStringGetDatum(serializedPlan); + multiPlanData->constlen = strlen(serializedMultiPlan); + multiPlanData->constvalue = CStringGetDatum(serializedMultiPlan); multiPlanData->constbyval = false; multiPlanData->location = -1; @@ -350,8 +354,12 @@ SerializableMultiPlan(MultiPlan *multiPlan) } -MultiPlan * -DeSerializeMultiPlan(Node *node) +/* + * DeserializeMultiPlan returns the deserialized distributed plan from the string + * representation in a Const node. + */ +static MultiPlan * +DeserializeMultiPlan(Node *node) { Const *multiPlanData = NULL; char *serializedMultiPlan = NULL; @@ -369,107 +377,171 @@ DeSerializeMultiPlan(Node *node) /* - * CreateCitusToplevelNode creates the top-level planTree node for a - * distributed statement. That top-level node is a) recognizable by the - * executor hooks, allowing them to redirect execution, b) contains the - * parameters required for distributed execution. - * - * The exact representation of the top-level node is an implementation detail - * which should not be referred to outside this file, as it's likely to become - * version dependant. Use GetMultiPlan() and HasCitusToplevelNode() to access. - * - * FIXME - * - * Internally the data is stored as arguments to a 'citus_extradata_container' - * function, which has to be removed from the really executed plan tree before - * query execution. + * FinalizePlan combines local plan with distributed plan and creates a plan + * which can be run by the PostgreSQL executor. */ -PlannedStmt * -MultiQueryContainerNode(PlannedStmt *originalPlan, MultiPlan *multiPlan) +static PlannedStmt * +FinalizePlan(PlannedStmt *localPlan, MultiPlan *multiPlan) { - PlannedStmt *resultPlan = NULL; + PlannedStmt *finalPlan = NULL; CustomScan *customScan = makeNode(CustomScan); - Node *multiPlanData = SerializableMultiPlan(multiPlan); + Node *multiPlanData = NULL; + MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; - customScan->methods = &CitusCustomScanMethods; - customScan->custom_private = list_make1(multiPlanData); - - /* FIXME: This probably ain't correct */ - if (ExecSupportsBackwardScan(originalPlan->planTree)) + if (!multiPlan->planningError) { - customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; + executorType = JobExecutorType(multiPlan); } - /* - * FIXME: these two branches/pieces of code should probably be moved into - * router / logical planner code respectively. - */ + switch (executorType) + { + case MULTI_EXECUTOR_REAL_TIME: + { + customScan->methods = &RealTimeCustomScanMethods; + break; + } + + case MULTI_EXECUTOR_TASK_TRACKER: + { + customScan->methods = &TaskTrackerCustomScanMethods; + break; + } + + case MULTI_EXECUTOR_ROUTER: + { + customScan->methods = &RouterCustomScanMethods; + break; + } + + default: + { + customScan->methods = &DelayedErrorCustomScanMethods; + break; + } + } + + multiPlanData = SerializeMultiPlan(multiPlan); + + customScan->custom_private = list_make1(multiPlanData); + customScan->flags = CUSTOMPATH_SUPPORT_BACKWARD_SCAN; + + /* check if we have a master query */ if (multiPlan->masterQuery) { - resultPlan = MasterNodeSelectPlan(multiPlan, customScan); - resultPlan->queryId = originalPlan->queryId; - resultPlan->utilityStmt = originalPlan->utilityStmt; + finalPlan = FinalizeNonRouterPlan(localPlan, multiPlan, customScan); } else { - ListCell *lc = NULL; - List *targetList = NIL; - bool foundJunk = false; - RangeTblEntry *rangeTableEntry = NULL; - List *columnNames = NIL; - int newRTI = list_length(originalPlan->rtable) + 1; - - /* - * XXX: This basically just builds a targetlist to "read" from the - * custom scan output. - */ - foreach(lc, originalPlan->planTree->targetlist) - { - TargetEntry *te = lfirst(lc); - Var *newVar = NULL; - TargetEntry *newTargetEntry = NULL; - - Assert(IsA(te, TargetEntry)); - - /* - * XXX: I can't think of a case where we'd need resjunk stuff at - * the toplevel of a router query - all things needing it have - * been pushed down. - */ - if (te->resjunk) - { - foundJunk = true; - continue; - } - - if (foundJunk) - { - ereport(ERROR, (errmsg("unexpected !junk entry after resjunk entry"))); - } - - /* build TE pointing to custom scan */ - newVar = makeVarFromTargetEntry(newRTI, te); - newTargetEntry = flatCopyTargetEntry(te); - newTargetEntry->expr = (Expr *) newVar; - targetList = lappend(targetList, newTargetEntry); - - columnNames = lappend(columnNames, makeString(te->resname)); - } - - /* XXX: can't think of a better RTE type than VALUES */ - rangeTableEntry = makeNode(RangeTblEntry); - rangeTableEntry->rtekind = RTE_VALUES; /* can't look up relation */ - rangeTableEntry->eref = makeAlias("remote_scan", columnNames); - rangeTableEntry->inh = false; - rangeTableEntry->inFromCl = true; - - resultPlan = originalPlan; - resultPlan->planTree = (Plan *) customScan; - resultPlan->rtable = lappend(resultPlan->rtable, rangeTableEntry); - customScan->scan.plan.targetlist = targetList; + finalPlan = FinalizeRouterPlan(localPlan, customScan); } - return resultPlan; + return finalPlan; +} + + +/* + * FinalizeNonRouterPlan gets the distributed custom scan plan, and creates the + * final master select plan on the top of this distributed plan for real-time + * and task-tracker executors. + */ +static PlannedStmt * +FinalizeNonRouterPlan(PlannedStmt *localPlan, MultiPlan *multiPlan, + CustomScan *customScan) +{ + PlannedStmt *finalPlan = NULL; + + finalPlan = MasterNodeSelectPlan(multiPlan, customScan); + finalPlan->queryId = localPlan->queryId; + finalPlan->utilityStmt = localPlan->utilityStmt; + + return finalPlan; +} + + +/* + * FinalizeRouterPlan gets a CustomScan node which already wrapped distributed + * part of a router plan and sets it as the direct child of the router plan + * because we don't run any query on master node for router executable queries. + * Here, we also rebuild the column list to read from the remote scan. + */ +static PlannedStmt * +FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) +{ + PlannedStmt *routerPlan = NULL; + RangeTblEntry *remoteScanRangeTableEntry = NULL; + ListCell *targetEntryCell = NULL; + List *targetList = NIL; + List *columnNameList = NIL; + + /* we will have only one range table entry */ + int customScanRangeTableIndex = 1; + + /* build a targetlist to read from the custom scan output */ + foreach(targetEntryCell, localPlan->planTree->targetlist) + { + TargetEntry *targetEntry = lfirst(targetEntryCell); + TargetEntry *newTargetEntry = NULL; + Var *newVar = NULL; + Value *columnName = NULL; + + Assert(IsA(targetEntry, TargetEntry)); + + /* + * This is unlikely to be hit because we would not need resjunk stuff + * at the toplevel of a router query - all things needing it have been + * pushed down. + */ + if (targetEntry->resjunk) + { + continue; + } + + /* build target entry pointing to remote scan range table entry */ + newVar = makeVarFromTargetEntry(customScanRangeTableIndex, targetEntry); + newTargetEntry = flatCopyTargetEntry(targetEntry); + newTargetEntry->expr = (Expr *) newVar; + targetList = lappend(targetList, newTargetEntry); + + columnName = makeString(targetEntry->resname); + columnNameList = lappend(columnNameList, columnName); + } + + customScan->scan.plan.targetlist = targetList; + + routerPlan = makeNode(PlannedStmt); + routerPlan->planTree = (Plan *) customScan; + + remoteScanRangeTableEntry = RemoteScanRangeTableEntry(columnNameList); + routerPlan->rtable = list_make1(remoteScanRangeTableEntry); + + routerPlan->canSetTag = true; + routerPlan->relationOids = NIL; + + routerPlan->queryId = localPlan->queryId; + routerPlan->utilityStmt = localPlan->utilityStmt; + routerPlan->commandType = localPlan->commandType; + routerPlan->hasReturning = localPlan->hasReturning; + + return routerPlan; +} + + +/* + * RemoteScanRangeTableEntry creates a range table entry from given column name + * list to represent a remote scan. + */ +RangeTblEntry * +RemoteScanRangeTableEntry(List *columnNameList) +{ + RangeTblEntry *remoteScanRangeTableEntry = makeNode(RangeTblEntry); + + /* we use RTE_VALUES for custom scan because we can't look up relation */ + remoteScanRangeTableEntry->rtekind = RTE_VALUES; + remoteScanRangeTableEntry->eref = makeAlias("remote_scan", columnNameList); + remoteScanRangeTableEntry->inh = false; + remoteScanRangeTableEntry->inFromCl = true; + + return remoteScanRangeTableEntry; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 11212ec81..39bf01782 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -236,9 +236,13 @@ CreateSingleTaskRouterPlan(Query *originalQuery, Query *query, multiPlan->workerJob = job; multiPlan->masterQuery = NULL; - multiPlan->masterTableName = NULL; multiPlan->routerExecutable = true; - multiPlan->hasReturning = list_length(originalQuery->returningList) > 0; + multiPlan->hasReturning = false; + + if (list_length(originalQuery->returningList) > 0) + { + multiPlan->hasReturning = true; + } return multiPlan; } @@ -321,10 +325,14 @@ CreateInsertSelectRouterPlan(Query *originalQuery, /* and finally the multi plan */ multiPlan->workerJob = workerJob; - multiPlan->masterTableName = NULL; multiPlan->masterQuery = NULL; multiPlan->routerExecutable = true; - multiPlan->hasReturning = list_length(originalQuery->returningList) > 0; + multiPlan->hasReturning = false; + + if (list_length(originalQuery->returningList) > 0) + { + multiPlan->hasReturning = true; + } return multiPlan; } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 05f92f486..689296b08 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -114,9 +114,7 @@ _PG_init(void) * (thus as the innermost/last running hook) to be able to do our * duties. For simplicity insist that all hooks are previously unused. */ - if (planner_hook != NULL || - ExplainOneQuery_hook != NULL || - ProcessUtility_hook != NULL) + if (planner_hook != NULL || ProcessUtility_hook != NULL) { ereport(ERROR, (errmsg("Citus has to be loaded first"), errhint("Place citus at the beginning of " diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 8da52e506..8aa485857 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -281,7 +281,6 @@ OutMultiPlan(OUTFUNC_ARGS) WRITE_NODE_FIELD(workerJob); WRITE_NODE_FIELD(masterQuery); - WRITE_STRING_FIELD(masterTableName); WRITE_BOOL_FIELD(routerExecutable); WRITE_NODE_FIELD(planningError); } diff --git a/src/backend/distributed/utils/citus_readfuncs.c b/src/backend/distributed/utils/citus_readfuncs.c index fca056e4e..37c9245fa 100644 --- a/src/backend/distributed/utils/citus_readfuncs.c +++ b/src/backend/distributed/utils/citus_readfuncs.c @@ -188,7 +188,6 @@ ReadMultiPlan(READFUNC_ARGS) READ_NODE_FIELD(workerJob); READ_NODE_FIELD(masterQuery); - READ_STRING_FIELD(masterTableName); READ_BOOL_FIELD(routerExecutable); READ_NODE_FIELD(planningError); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 53fc327b9..e3b53a327 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -17,9 +17,6 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" -/* signal currently executed statement is a master select statement or router execution */ -#define EXEC_FLAG_CITUS_MASTER_SELECT 0x100 -#define EXEC_FLAG_CITUS_ROUTER_EXECUTOR 0x200 #if (PG_VERSION_NUM >= 90600) #define tuplecount_t uint64 @@ -30,23 +27,26 @@ typedef struct CitusScanState { - CustomScanState customScanState; - MultiPlan *multiPlan; - MultiExecutorType executorType; - - /* state for router */ - bool finishedUnderlyingScan; - Tuplestorestate *tuplestorestate; + CustomScanState customScanState; /* underlying custom scan node */ + MultiPlan *multiPlan; /* distributed execution plan */ + MultiExecutorType executorType; /* distributed executor type */ + bool finishedRemoteScan; /* flag to check if remote scan is finished */ + Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */ } CitusScanState; -Node * CitusCreateScan(CustomScan *scan); -extern void CitusBeginScan(CustomScanState *node, - EState *estate, - int eflags); -extern TupleTableSlot * CitusExecScan(CustomScanState *node); + +extern Node * RealTimeCreateScan(CustomScan *scan); +extern Node * TaskTrackerCreateScan(CustomScan *scan); +extern Node * RouterCreateScan(CustomScan *scan); +extern Node * DelayedErrorCreateScan(CustomScan *scan); +extern void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags); +extern TupleTableSlot * RealTimeExecScan(CustomScanState *node); +extern TupleTableSlot * TaskTrackerExecScan(CustomScanState *node); extern void CitusEndScan(CustomScanState *node); extern void CitusReScan(CustomScanState *node); -extern void CitusExplainScan(CustomScanState *node, List *ancestors, - struct ExplainState *es); +extern void CitusExplainScan(CustomScanState *node, List *ancestors, struct + ExplainState *es); +extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState); + #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_master_planner.h b/src/include/distributed/multi_master_planner.h index c58b42717..9873ce4ff 100644 --- a/src/include/distributed/multi_master_planner.h +++ b/src/include/distributed/multi_master_planner.h @@ -24,6 +24,6 @@ struct MultiPlan; struct CustomScan; extern PlannedStmt * MasterNodeSelectPlan(struct MultiPlan *multiPlan, struct CustomScan *dataScan); -extern List * MasterTargetList(List *workerTargetList); + #endif /* MULTI_MASTER_PLANNER_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index ccf3b44a2..0cf340899 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -213,13 +213,11 @@ typedef struct JoinSequenceNode typedef struct MultiPlan { CitusNode type; - CmdType operation; - bool hasReturning; + bool hasReturning; Job *workerJob; Query *masterQuery; - char *masterTableName; bool routerExecutable; /* diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h index e2f255082..8a24b2efa 100644 --- a/src/include/distributed/multi_planner.h +++ b/src/include/distributed/multi_planner.h @@ -51,14 +51,13 @@ typedef struct RelationShard extern PlannedStmt * multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams); -extern bool HasCitusToplevelNode(PlannedStmt *planStatement); struct MultiPlan; extern struct MultiPlan * GetMultiPlan(CustomScan *node); -extern Node * SerializableMultiPlan(struct MultiPlan *multiPlan); -extern struct MultiPlan * DeSerializeMultiPlan(Node *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, Index index, RangeTblEntry *rte); extern bool IsModifyCommand(Query *query); -extern void VerifyMultiPlanValidity(struct MultiPlan *multiPlan); +extern bool IsModifyMultiPlan(struct MultiPlan *multiPlan); +extern RangeTblEntry * RemoteScanRangeTableEntry(List *columnNameList); + #endif /* MULTI_PLANNER_H */ diff --git a/src/include/distributed/multi_router_executor.h b/src/include/distributed/multi_router_executor.h index b42f6e002..8c9eafb7d 100644 --- a/src/include/distributed/multi_router_executor.h +++ b/src/include/distributed/multi_router_executor.h @@ -35,15 +35,12 @@ typedef struct XactShardConnSet extern bool AllModificationsCommutative; extern bool EnableDeadlockPrevention; -extern void RouterBeginScan(CitusScanState *scanState); - -extern TupleTableSlot * RouterExecScan(CitusScanState *scanState); - -extern void RouterExecutorStart(QueryDesc *queryDesc, int eflags, List *taskList); -extern void RouterExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count); -extern void RouterExecutorFinish(QueryDesc *queryDesc); -extern void RouterExecutorEnd(QueryDesc *queryDesc); +extern void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags); +extern TupleTableSlot * RouterSingleModifyExecScan(CustomScanState *node); +extern TupleTableSlot * RouterSelectExecScan(CustomScanState *node); +extern TupleTableSlot * RouterMultiModifyExecScan(CustomScanState *node); extern int64 ExecuteModifyTasksWithoutResults(List *taskList); + #endif /* MULTI_ROUTER_EXECUTOR_H_ */ diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index cffad3c26..e2ef27342 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -43,8 +43,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -74,10 +73,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Parallel Aware": false, "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 8, "Tasks Shown": "One of 8", @@ -150,10 +148,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time false - Real-Time 8 One of 8 @@ -221,10 +218,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Parallel Aware: false Distributed Query: - Executor: "Real-Time" Job: Task Count: 8 Tasks Shown: "One of 8" @@ -253,8 +249,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -267,9 +262,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -286,8 +280,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -304,8 +297,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -317,8 +309,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -333,8 +324,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -348,8 +338,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -368,8 +357,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -381,10 +369,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING sum(l_quantity) > 100; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - Filter: (sum("remote scan".worker_column_4) > '100'::numeric) - -> Custom Scan (CitusScan) + Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2", worker_column_4 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -400,11 +387,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING l_quantity > (100 * random()); HashAggregate Output: l_quantity - Group Key: "remote scan".l_quantity - Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random())) - -> Custom Scan (CitusScan) + Group Key: remote_scan.l_quantity + Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) + -> Custom Scan (Citus Real-Time) Output: l_quantity, worker_column_2 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -419,8 +405,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: All -> Task @@ -455,8 +440,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -473,8 +457,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 1 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -500,10 +483,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Parallel Aware": false, "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 1, "Tasks Shown": "None, not supported for re-partition queries", @@ -550,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker false - Task-Tracker 1 None, not supported for re-partition queries @@ -610,10 +591,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Parallel Aware: false Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 1 Tasks Shown: "None, not supported for re-partition queries" @@ -639,8 +619,7 @@ Finalize Aggregate -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -652,8 +631,7 @@ PREPARE task_tracker_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -664,8 +642,7 @@ Aggregate SET citus.task_executor_type TO 'real-time'; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task @@ -678,8 +655,7 @@ PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -691,8 +667,7 @@ Aggregate -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out index be57df725..af839b514 100644 --- a/src/test/regress/expected/multi_explain_0.out +++ b/src/test/regress/expected/multi_explain_0.out @@ -43,8 +43,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -71,9 +70,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 8, "Tasks Shown": "One of 8", @@ -140,9 +138,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time - Real-Time 8 One of 8 @@ -204,9 +201,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Distributed Query: - Executor: "Real-Time" Job: Task Count: 8 Tasks Shown: "One of 8" @@ -232,8 +228,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -246,9 +241,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -265,8 +259,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -283,8 +276,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -296,8 +288,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -312,8 +303,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -327,8 +317,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -347,8 +336,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -360,10 +348,9 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING sum(l_quantity) > 100; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - Filter: (sum("remote scan".worker_column_4) > '100'::numeric) - -> Custom Scan (CitusScan) + Filter: (sum(remote_scan.worker_column_4) > '100'::numeric) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2", worker_column_4 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -379,11 +366,10 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) HAVING l_quantity > (100 * random()); HashAggregate Output: l_quantity - Group Key: "remote scan".l_quantity - Filter: (("remote scan".worker_column_2)::double precision > ('100'::double precision * random())) - -> Custom Scan (CitusScan) + Group Key: remote_scan.l_quantity + Filter: ((remote_scan.worker_column_2)::double precision > ('100'::double precision * random())) + -> Custom Scan (Citus Real-Time) Output: l_quantity, worker_column_2 - Executor: Real-Time Task Count: 8 Tasks Shown: One of 8 -> Task @@ -398,8 +384,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: All -> Task @@ -434,8 +419,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -452,8 +436,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 1 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -477,9 +460,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 1, "Tasks Shown": "None, not supported for re-partition queries", @@ -524,9 +506,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker - Task-Tracker 1 None, not supported for re-partition queries @@ -581,9 +562,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 1 Tasks Shown: "None, not supported for re-partition queries" @@ -610,8 +590,7 @@ Aggregate -- ensure distributed plans don't break EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 8 Tasks Shown: One of 8 -> Task @@ -623,8 +602,7 @@ PREPARE task_tracker_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE task_tracker_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -635,8 +613,7 @@ Aggregate SET citus.task_executor_type TO 'real-time'; PREPARE router_executor_query AS SELECT l_quantity FROM lineitem WHERE l_orderkey = 5; EXPLAIN EXECUTE router_executor_query; -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task @@ -649,8 +626,7 @@ PREPARE real_time_executor_query AS SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030; EXPLAIN (COSTS FALSE) EXECUTE real_time_executor_query; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 4 Tasks Shown: One of 4 -> Task @@ -662,8 +638,7 @@ Aggregate -- at least make sure to fail without crashing PREPARE router_executor_query_param(int) AS SELECT l_quantity FROM lineitem WHERE l_orderkey = $1; EXPLAIN EXECUTE router_executor_query_param(5); -Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Router +Custom Scan (Citus Router) (cost=0.00..0.00 rows=0 width=0) Task Count: 1 Tasks Shown: All -> Task diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 84b3d55e3..da8ddd2f5 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000; -- Set configuration to print table join order and pruned shards SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO DEBUG2; -- Create new table definitions for use in testing in distributed planning and -- execution functionality. Also create indexes to boost performance. @@ -140,9 +140,9 @@ DEBUG: join prunable for intervals [13473,14947] and [2951,4455] DEBUG: join prunable for intervals [13473,14947] and [4480,5986] DEBUG: join prunable for intervals [13473,14947] and [8997,10560] DEBUG: join prunable for intervals [13473,14947] and [10560,12036] - QUERY PLAN --------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (2 rows) @@ -156,10 +156,10 @@ EXPLAIN SELECT count(*) FROM lineitem, orders WHERE (l_orderkey = o_orderkey AND l_quantity > 5) OR (l_orderkey = o_orderkey AND l_quantity < 10); LOG: join order: [ "lineitem" ][ local partition join "orders" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -177,10 +177,10 @@ BEGIN; EXPLAIN SELECT count(*) FROM orders, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -188,10 +188,10 @@ LOG: join order: [ "orders" ][ single partition join "lineitem_hash" ] EXPLAIN SELECT count(*) FROM orders_hash, lineitem_hash WHERE o_orderkey = l_orderkey; LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -199,10 +199,10 @@ LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] EXPLAIN SELECT count(*) FROM customer_hash, nation WHERE c_nationkey = n_nationkey; LOG: join order: [ "customer_hash" ][ broadcast join "nation" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -213,10 +213,10 @@ SET citus.large_table_shard_count TO 1; EXPLAIN SELECT count(*) FROM orders, lineitem, customer WHERE o_custkey = l_partkey AND o_custkey = c_nationkey; LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -225,10 +225,10 @@ LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition EXPLAIN SELECT count(*) FROM orders, customer_hash WHERE c_custkey = o_custkey; LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -237,10 +237,10 @@ LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] EXPLAIN SELECT count(*) FROM orders_hash, customer WHERE c_custkey = o_custkey; LOG: join order: [ "orders_hash" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_large.out b/src/test/regress/expected/multi_join_order_tpch_large.out index 8fe01093c..9489b567e 100644 --- a/src/test/regress/expected/multi_join_order_tpch_large.out +++ b/src/test/regress/expected/multi_join_order_tpch_large.out @@ -6,7 +6,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000; -- Enable configuration to print table join order SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO LOG; -- Change configuration to treat lineitem, orders, customer, and part tables as -- large. The following queries are basically the same as the ones in tpch_small @@ -24,10 +24,10 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -55,13 +55,13 @@ ORDER BY revenue DESC, o_orderdate; LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: sum((sum(revenue))) DESC, o_orderdate -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_orderkey, o_orderdate, o_shippriority - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -104,7 +104,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partit Sort Key: sum((sum(revenue))) DESC -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -139,10 +139,10 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ single partition join "part" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -158,11 +158,11 @@ WHERE GROUP BY l_partkey; LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part" ][ single partition join "customer" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +-------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_partkey - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (4 rows) diff --git a/src/test/regress/expected/multi_join_order_tpch_small.out b/src/test/regress/expected/multi_join_order_tpch_small.out index 66bcca2f4..c0466b1e3 100644 --- a/src/test/regress/expected/multi_join_order_tpch_small.out +++ b/src/test/regress/expected/multi_join_order_tpch_small.out @@ -18,10 +18,10 @@ WHERE and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_quantity < 24; LOG: join order: [ "lineitem" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -49,13 +49,13 @@ ORDER BY revenue DESC, o_orderdate; LOG: join order: [ "orders" ][ broadcast join "customer" ][ local partition join "lineitem" ] - QUERY PLAN --------------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------------- Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: sum((sum(revenue))) DESC, o_orderdate -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: l_orderkey, o_orderdate, o_shippriority - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -98,7 +98,7 @@ LOG: join order: [ "orders" ][ broadcast join "customer" ][ broadcast join "nat Sort Key: sum((sum(revenue))) DESC -> HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: c_custkey, c_name, c_acctbal, c_phone, n_name, c_address, c_comment - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (6 rows) @@ -133,10 +133,10 @@ WHERE AND l_shipinstruct = 'DELIVER IN PERSON' ); LOG: join order: [ "lineitem" ][ broadcast join "part" ] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out index 60caa5865..e04799723 100644 --- a/src/test/regress/expected/multi_join_pruning.out +++ b/src/test/regress/expected/multi_join_pruning.out @@ -100,10 +100,10 @@ EXPLAIN SELECT count(*) WHERE table1.array_column = table2.array_column; DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -112,10 +112,10 @@ EXPLAIN SELECT count(*) WHERE table1.composite_column = table2.composite_column; DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -125,10 +125,10 @@ EXPLAIN SELECT count(*) WHERE table1.varchar_column = table2.varchar_column; DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_mx_explain.out b/src/test/regress/expected/multi_mx_explain.out index 75a3c880d..02dfbee99 100644 --- a/src/test/regress/expected/multi_mx_explain.out +++ b/src/test/regress/expected/multi_mx_explain.out @@ -65,8 +65,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -96,10 +95,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Parallel Aware": false, "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 16, "Tasks Shown": "One of 16", @@ -173,10 +171,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time false - Real-Time 16 One of 16 @@ -244,10 +241,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Parallel Aware: false Distributed Query: - Executor: "Real-Time" Job: Task Count: 16 Tasks Shown: "One of 16" @@ -276,8 +272,7 @@ Sort Sort Key: COALESCE((pg_catalog.sum((COALESCE((pg_catalog.sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -291,9 +286,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx; Aggregate Output: (sum("?column?") / (sum("?column?_1") / pg_catalog.sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 16 Tasks Shown: One of 16 -> Task @@ -310,8 +304,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -328,8 +321,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem_mx VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -341,8 +333,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem_mx SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -355,8 +346,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem_mx WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -368,8 +358,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -388,8 +377,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem_mx; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -400,8 +388,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: All -> Task @@ -496,8 +483,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -514,8 +500,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -547,10 +532,9 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Parallel Aware": false, "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 4, "Tasks Shown": "None, not supported for re-partition queries", @@ -605,10 +589,9 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker false - Task-Tracker 4 None, not supported for re-partition queries @@ -660,10 +643,9 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Parallel Aware: false Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 4 Tasks Shown: "None, not supported for re-partition queries" diff --git a/src/test/regress/expected/multi_mx_explain_0.out b/src/test/regress/expected/multi_mx_explain_0.out index 07f2c51b2..0a74d3001 100644 --- a/src/test/regress/expected/multi_mx_explain_0.out +++ b/src/test/regress/expected/multi_mx_explain_0.out @@ -65,8 +65,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -93,9 +92,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Real-Time", "Distributed Query": { - "Executor": "Real-Time", "Job": { "Task Count": 16, "Tasks Shown": "One of 16", @@ -163,9 +161,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Real-Time - Real-Time 16 One of 16 @@ -227,9 +224,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Real-Time" Distributed Query: - Executor: "Real-Time" Job: Task Count: 16 Tasks Shown: "One of 16" @@ -255,8 +251,7 @@ Sort Sort Key: COALESCE((sum((COALESCE((sum(count_quantity))::bigint, '0'::bigint))))::bigint, '0'::bigint), l_quantity -> HashAggregate Group Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -270,9 +265,8 @@ EXPLAIN (COSTS FALSE, VERBOSE TRUE) SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem_mx; Aggregate Output: (sum("?column?") / (sum("?column?_1") / sum("?column?_2"))) - -> Custom Scan (CitusScan) + -> Custom Scan (Citus Real-Time) Output: "?column?", "?column?_1", "?column?_2" - Executor: Real-Time Task Count: 16 Tasks Shown: One of 16 -> Task @@ -289,8 +283,7 @@ EXPLAIN (COSTS FALSE) Limit -> Sort Sort Key: l_quantity - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -307,8 +300,7 @@ Limit -- Test insert EXPLAIN (COSTS FALSE) INSERT INTO lineitem_mx VALUES(1,0); -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -320,8 +312,7 @@ EXPLAIN (COSTS FALSE) UPDATE lineitem_mx SET l_suppkey = 12 WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -334,8 +325,7 @@ Custom Scan (CitusScan) EXPLAIN (COSTS FALSE) DELETE FROM lineitem_mx WHERE l_orderkey = 1 AND l_partkey = 0; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -347,8 +337,7 @@ Custom Scan (CitusScan) -- Test single-shard SELECT EXPLAIN (COSTS FALSE) SELECT l_quantity FROM lineitem_mx WHERE l_orderkey = 5; -Custom Scan (CitusScan) - Executor: Router +Custom Scan (Citus Router) Task Count: 1 Tasks Shown: All -> Task @@ -367,8 +356,7 @@ t EXPLAIN (COSTS FALSE) CREATE TABLE explain_result AS SELECT * FROM lineitem_mx; -Custom Scan (CitusScan) - Executor: Real-Time +Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -379,8 +367,7 @@ SET citus.explain_all_tasks TO on; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) Task Count: 16 Tasks Shown: All -> Task @@ -475,8 +462,7 @@ SET citus.explain_all_tasks TO off; EXPLAIN (COSTS FALSE) SELECT avg(l_linenumber) FROM lineitem_mx WHERE l_orderkey > 9030; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 16 Tasks Shown: One of 16 -> Task @@ -493,8 +479,7 @@ EXPLAIN (COSTS FALSE) AND o_custkey = c_custkey AND l_suppkey = s_suppkey; Aggregate - -> Custom Scan (CitusScan) - Executor: Task-Tracker + -> Custom Scan (Citus Task-Tracker) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -524,9 +509,8 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Custom Scan", "Parent Relationship": "Outer", - "Custom Plan Provider": "CitusScan", + "Custom Plan Provider": "Citus Task-Tracker", "Distributed Query": { - "Executor": "Task-Tracker", "Job": { "Task Count": 4, "Tasks Shown": "None, not supported for re-partition queries", @@ -579,9 +563,8 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Custom Scan Outer - CitusScan + Citus Task-Tracker - Task-Tracker 4 None, not supported for re-partition queries @@ -631,9 +614,8 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Custom Scan" Parent Relationship: "Outer" - Custom Plan Provider: "CitusScan" + Custom Plan Provider: "Citus Task-Tracker" Distributed Query: - Executor: "Task-Tracker" Job: Task Count: 4 Tasks Shown: "None, not supported for re-partition queries" diff --git a/src/test/regress/expected/multi_mx_reference_table.out b/src/test/regress/expected/multi_mx_reference_table.out index 34c39f1b0..0aecc94f6 100644 --- a/src/test/regress/expected/multi_mx_reference_table.out +++ b/src/test/regress/expected/multi_mx_reference_table.out @@ -525,6 +525,12 @@ FETCH test_cursor; -- fetch one row after the last ---------+---------+---------+--------- (0 rows) +FETCH BACKWARD test_cursor; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 2 | 2 | 2 | Fri Dec 02 00:00:00 2016 +(1 row) + END; -- table creation queries inside can be router plannable CREATE TEMP TABLE temp_reference_test as diff --git a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out index 401f74d12..3ccf84eaf 100644 --- a/src/test/regress/expected/multi_mx_repartition_udt_prepare.out +++ b/src/test/regress/expected/multi_mx_repartition_udt_prepare.out @@ -173,10 +173,9 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN -------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Task-Tracker + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -185,7 +184,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot -> MapMergeJob Map Task Count: 5 Merge Task Count: 4 -(10 rows) +(9 rows) SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol diff --git a/src/test/regress/expected/multi_mx_router_planner.out b/src/test/regress/expected/multi_mx_router_planner.out index 8179be5eb..9c6b09cf2 100644 --- a/src/test/regress/expected/multi_mx_router_planner.out +++ b/src/test/regress/expected/multi_mx_router_planner.out @@ -1312,6 +1312,12 @@ FETCH test_cursor; 11 | 1 | alamo | 1347 (1 row) +FETCH BACKWARD test_cursor; + id | author_id | title | word_count +----+-----------+----------+------------ + 1 | 1 | arsenous | 9572 +(1 row) + END; -- queries inside copy can be router plannable COPY ( @@ -1454,7 +1460,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY DEBUG: Plan is router executable -CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash_mx ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY id | word_count ----+------------ 1 | 9572 diff --git a/src/test/regress/expected/multi_mx_schema_support.out b/src/test/regress/expected/multi_mx_schema_support.out index ed171c8c7..5035dc96f 100644 --- a/src/test/regress/expected/multi_mx_schema_support.out +++ b/src/test/regress/expected/multi_mx_schema_support.out @@ -37,6 +37,17 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+--------+-------------+----------- +(0 rows) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test with search_path is set SET search_path TO citus_mx_test_schema; @@ -51,6 +62,17 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+--------+-------------+----------- +(0 rows) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test inserting to table in different schema SET search_path TO public; diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out index 28ea9dc13..65034ae48 100644 --- a/src/test/regress/expected/multi_partition_pruning.out +++ b/src/test/regress/expected/multi_partition_pruning.out @@ -170,30 +170,30 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, -- one shard. EXPLAIN SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2'; DEBUG: predicate pruning for shardId 100 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) EXPLAIN SELECT count(*) FROM array_partitioned_table WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}'; DEBUG: predicate pruning for shardId 102 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) EXPLAIN SELECT count(*) FROM composite_partitioned_table WHERE composite_column < '(b,5,c)'::composite_type; DEBUG: predicate pruning for shardId 105 - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 6544b9f40..2e363623d 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -571,6 +571,12 @@ FETCH test_cursor; -- fetch one row after the last ---------+---------+---------+--------- (0 rows) +FETCH BACKWARD test_cursor; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 2 | 2 | 2 | Fri Dec 02 00:00:00 2016 +(1 row) + END; -- table creation queries inside can be router plannable CREATE TEMP TABLE temp_reference_test as diff --git a/src/test/regress/expected/multi_repartition_udt.out b/src/test/regress/expected/multi_repartition_udt.out index 585d09c27..6deeefddb 100644 --- a/src/test/regress/expected/multi_repartition_udt.out +++ b/src/test/regress/expected/multi_repartition_udt.out @@ -181,10 +181,9 @@ EXPLAIN SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol WHERE repartition_udt.pk > 1; LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_other" ] - QUERY PLAN -------------------------------------------------------------- - Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Task-Tracker + QUERY PLAN +-------------------------------------------------------------------- + Custom Scan (Citus Task-Tracker) (cost=0.00..0.00 rows=0 width=0) Task Count: 4 Tasks Shown: None, not supported for re-partition queries -> MapMergeJob @@ -193,7 +192,7 @@ LOG: join order: [ "repartition_udt" ][ dual partition join "repartition_udt_ot -> MapMergeJob Map Task Count: 5 Merge Task Count: 4 -(10 rows) +(9 rows) SELECT * FROM repartition_udt JOIN repartition_udt_other ON repartition_udt.udtcol = repartition_udt_other.udtcol diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 04e82a6b4..63fe2e7ba 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -2057,6 +2057,12 @@ FETCH test_cursor; -- fetch one row after the last ----+-----------+-------+------------ (0 rows) +FETCH BACKWARD test_cursor; + id | author_id | title | word_count +----+-----------+----------+------------ + 41 | 1 | aznavour | 11814 +(1 row) + END; -- queries inside copy can be router plannable COPY ( @@ -2199,7 +2205,10 @@ CONTEXT: SQL statement "SELECT ah.id, ah.word_count WHERE author_id = 1" PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY DEBUG: Plan is router executable -CONTEXT: PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY +CONTEXT: SQL statement "SELECT ah.id, ah.word_count + FROM articles_hash ah + WHERE author_id = 1" +PL/pgSQL function author_articles_id_word_count() line 4 at RETURN QUERY id | word_count ----+------------ 1 | 9572 diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 97c582ceb..b476242b3 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -155,6 +155,18 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test with search_path is set SET search_path TO test_schema_support; @@ -169,6 +181,18 @@ FETCH test_cursor; 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon (1 row) +FETCH test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + +FETCH BACKWARD test_cursor; + n_nationkey | n_name | n_regionkey | n_comment +-------------+---------------------------+-------------+------------------------------------------------------------------------------ + 1 | ARGENTINA | 1 | al foxes promise slyly according to the regular accounts. bold requests alon +(1 row) + END; -- test inserting to table in different schema SET search_path TO public; diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index fcca06718..d5f4fdc95 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -60,10 +60,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -74,10 +74,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -93,10 +93,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -107,10 +107,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -140,10 +140,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -154,10 +154,10 @@ DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 4 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57637 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) @@ -168,10 +168,10 @@ DEBUG: assigned task 6 to node localhost:57637 DEBUG: assigned task 4 to node localhost:57637 DEBUG: assigned task 2 to node localhost:57638 DEBUG: CommitTransactionCommand - QUERY PLAN --------------------------------------------------------------------- + QUERY PLAN +----------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) explain statements for distributed queries are not enabled (3 rows) diff --git a/src/test/regress/expected/multi_utility_statements.out b/src/test/regress/expected/multi_utility_statements.out index e4891e6c1..b7f4b3723 100644 --- a/src/test/regress/expected/multi_utility_statements.out +++ b/src/test/regress/expected/multi_utility_statements.out @@ -261,6 +261,12 @@ FETCH ABSOLUTE 5 FROM noHoldCursor; 1 | 5 | 24.00 | 0.10 (1 row) +FETCH BACKWARD noHoldCursor; + l_orderkey | l_linenumber | l_quantity | l_discount +------------+--------------+------------+------------ + 1 | 4 | 28.00 | 0.09 +(1 row) + COMMIT; FETCH ABSOLUTE 5 FROM noHoldCursor; ERROR: cursor "noholdcursor" does not exist diff --git a/src/test/regress/multi_task_tracker_extra_schedule b/src/test/regress/multi_task_tracker_extra_schedule index 1e76bb3f2..99f53d6d1 100644 --- a/src/test/regress/multi_task_tracker_extra_schedule +++ b/src/test/regress/multi_task_tracker_extra_schedule @@ -33,11 +33,9 @@ test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate test: multi_average_expression multi_working_columns test: multi_array_agg test: multi_agg_type_conversion multi_count_type_conversion -test: multi_partition_pruning -test: multi_join_pruning multi_hash_pruning +test: multi_hash_pruning test: multi_null_minmax_value_pruning test: multi_query_directory_cleanup -test: multi_task_assignment_policy test: multi_utility_statements test: multi_dropped_column_aliases @@ -52,7 +50,7 @@ test: multi_tpch_query7 multi_tpch_query7_nested # Parallel tests to check our join order planning logic. Note that we load data # below; and therefore these tests should come after the execution tests. # ---------- -test: multi_join_order_tpch_small multi_join_order_additional +test: multi_join_order_additional test: multi_load_more_data test: multi_join_order_tpch_large diff --git a/src/test/regress/output/multi_subquery.source b/src/test/regress/output/multi_subquery.source index 11d00f8dc..4c5931f17 100644 --- a/src/test/regress/output/multi_subquery.source +++ b/src/test/regress/output/multi_subquery.source @@ -767,8 +767,7 @@ FROM QUERY PLAN ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -786,7 +785,7 @@ FROM Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) -(20 rows) +(19 rows) -- Union and left join subquery pushdown EXPLAIN SELECT @@ -855,8 +854,7 @@ GROUP BY ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- HashAggregate (cost=0.00..0.00 rows=0 width=0) Group Key: hasdone - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -894,7 +892,7 @@ GROUP BY Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=80) Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) -(41 rows) +(40 rows) -- Union, left join and having subquery pushdown EXPLAIN SELECT @@ -1023,8 +1021,7 @@ LIMIT Limit (cost=0.00..0.00 rows=0 width=0) -> Sort (cost=0.00..0.00 rows=0 width=0) Sort Key: user_lastseen DESC - -> Custom Scan (CitusScan) (cost=0.00..0.00 rows=0 width=0) - Executor: Real-Time + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) Task Count: 2 Tasks Shown: One of 2 -> Task @@ -1047,6 +1044,6 @@ LIMIT Sort Key: events.event_time DESC -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) -(27 rows) +(26 rows) SET citus.enable_router_execution TO 'true'; diff --git a/src/test/regress/output/multi_subquery_0.source b/src/test/regress/output/multi_subquery_0.source index ecc2ef7eb..096c204e4 100644 --- a/src/test/regress/output/multi_subquery_0.source +++ b/src/test/regress/output/multi_subquery_0.source @@ -764,31 +764,28 @@ FROM GROUP BY tenant_id, user_id) AS subquery; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270014 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Aggregate (cost=40.01..40.02 rows=1 width=32) - -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Merge Join (cost=39.89..39.97 rows=1 width=556) - Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) - -> Sort (cost=28.08..28.09 rows=6 width=32) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Sort (cost=11.81..11.82 rows=3 width=556) - Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) - -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) - Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) - Master Query - -> Aggregate (cost=0.00..0.00 rows=0 width=0) - -> Seq Scan on pg_merge_job_270014 (cost=0.00..0.00 rows=0 width=0) -(22 rows) + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Aggregate (cost=0.00..0.00 rows=0 width=0) + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Aggregate (cost=40.01..40.02 rows=1 width=32) + -> GroupAggregate (cost=39.89..39.99 rows=1 width=556) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Merge Join (cost=39.89..39.97 rows=1 width=556) + Merge Cond: ((((users.composite_id).tenant_id) = ((events.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events.composite_id).user_id))) + -> Sort (cost=28.08..28.09 rows=6 width=32) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Sort (cost=11.81..11.82 rows=3 width=556) + Sort Key: ((events.composite_id).tenant_id), ((events.composite_id).user_id) + -> Seq Scan on events_270009 events (cost=0.00..11.79 rows=3 width=556) + Filter: ((event_type)::text = ANY ('{click,submit,pay}'::text[])) +(19 rows) -- Union and left join subquery pushdown EXPLAIN SELECT @@ -853,49 +850,46 @@ FROM hasdone) AS subquery_top GROUP BY hasdone; - QUERY PLAN -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270015 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> HashAggregate (cost=91.94..91.96 rows=2 width=64) - Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) - -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) - -> Sort (cost=91.85..91.85 rows=2 width=88) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) - -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) - Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) - -> Unique (cost=79.46..79.48 rows=2 width=40) - -> Sort (cost=79.46..79.47 rows=2 width=40) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time - -> Append (cost=0.00..79.45 rows=2 width=40) - -> Nested Loop (cost=0.00..39.72 rows=1 width=40) - Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) - -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) - Filter: ((event_type)::text = 'click'::text) - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Nested Loop (cost=0.00..39.72 rows=1 width=40) - Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) - -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) - Filter: ((event_type)::text = 'submit'::text) - -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Materialize (cost=12.29..12.31 rows=1 width=48) - -> Unique (cost=12.29..12.30 rows=1 width=32) - -> Sort (cost=12.29..12.29 rows=1 width=32) - Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) - -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) - Master Query - -> HashAggregate (cost=0.00..0.00 rows=0 width=0) - Group Key: intermediate_column_270015_2 - -> Seq Scan on pg_merge_job_270015 (cost=0.00..0.00 rows=0 width=0) -(40 rows) + QUERY PLAN +------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + HashAggregate (cost=0.00..0.00 rows=0 width=0) + Group Key: hasdone + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> HashAggregate (cost=91.94..91.96 rows=2 width=64) + Group Key: COALESCE(('Has done paying'::text), 'Has not done paying'::text) + -> GroupAggregate (cost=91.85..91.90 rows=2 width=88) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Sort (cost=91.85..91.85 rows=2 width=88) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('Has done paying'::text) + -> Merge Left Join (cost=91.75..91.84 rows=2 width=88) + Merge Cond: ((((users.composite_id).tenant_id) = ((events_2.composite_id).tenant_id)) AND (((users.composite_id).user_id) = ((events_2.composite_id).user_id))) + -> Unique (cost=79.46..79.48 rows=2 width=40) + -> Sort (cost=79.46..79.47 rows=2 width=40) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id), ('action=>1'::text), events.event_time + -> Append (cost=0.00..79.45 rows=2 width=40) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users.composite_id).tenant_id = (events.composite_id).tenant_id) AND ((users.composite_id).user_id = (events.composite_id).user_id)) + -> Seq Scan on events_270009 events (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'click'::text) + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Nested Loop (cost=0.00..39.72 rows=1 width=40) + Join Filter: (((users_1.composite_id).tenant_id = (events_1.composite_id).tenant_id) AND ((users_1.composite_id).user_id = (events_1.composite_id).user_id)) + -> Seq Scan on events_270009 events_1 (cost=0.00..11.62 rows=1 width=40) + Filter: ((event_type)::text = 'submit'::text) + -> Seq Scan on users_270013 users_1 (cost=0.00..28.00 rows=6 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Materialize (cost=12.29..12.31 rows=1 width=48) + -> Unique (cost=12.29..12.30 rows=1 width=32) + -> Sort (cost=12.29..12.29 rows=1 width=32) + Sort Key: ((events_2.composite_id).tenant_id), ((events_2.composite_id).user_id) + -> Seq Scan on events_270009 events_2 (cost=0.00..12.28 rows=1 width=32) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type) AND ((event_type)::text = 'pay'::text)) +(37 rows) -- Union, left join and having subquery pushdown EXPLAIN SELECT @@ -1019,37 +1013,34 @@ ORDER BY user_lastseen DESC LIMIT 10; - QUERY PLAN ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- - Distributed Query into pg_merge_job_270017 - Executor: Real-Time - Task Count: 2 - Tasks Shown: One of 2 - -> Task - Node: host=localhost port=57637 dbname=regression - -> Limit (cost=100.43..100.44 rows=6 width=56) - -> Sort (cost=100.43..100.44 rows=6 width=56) - Sort Key: (max(users.lastseen)) DESC - -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) - Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Sort (cost=100.14..100.16 rows=6 width=548) - Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) - -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) - -> Limit (cost=28.08..28.09 rows=6 width=40) - -> Sort (cost=28.08..28.09 rows=6 width=40) - Sort Key: users.lastseen DESC - -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) - Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) - -> Limit (cost=11.96..11.96 rows=1 width=524) - -> Sort (cost=11.96..11.96 rows=1 width=524) - Sort Key: events.event_time DESC - -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) - Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) - Master Query - -> Limit (cost=0.00..0.00 rows=0 width=0) - -> Sort (cost=0.00..0.00 rows=0 width=0) - Sort Key: intermediate_column_270017_2 DESC - -> Seq Scan on pg_merge_job_270017 (cost=0.00..0.00 rows=0 width=0) -(29 rows) + QUERY PLAN +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + Limit (cost=0.00..0.00 rows=0 width=0) + -> Sort (cost=0.00..0.00 rows=0 width=0) + Sort Key: user_lastseen DESC + -> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) + Task Count: 2 + Tasks Shown: One of 2 + -> Task + Node: host=localhost port=57637 dbname=regression + -> Limit (cost=100.43..100.44 rows=6 width=56) + -> Sort (cost=100.43..100.44 rows=6 width=56) + Sort Key: (max(users.lastseen)) DESC + -> GroupAggregate (cost=100.14..100.29 rows=6 width=548) + Group Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Sort (cost=100.14..100.16 rows=6 width=548) + Sort Key: ((users.composite_id).tenant_id), ((users.composite_id).user_id) + -> Nested Loop Left Join (cost=40.04..100.06 rows=6 width=548) + -> Limit (cost=28.08..28.09 rows=6 width=40) + -> Sort (cost=28.08..28.09 rows=6 width=40) + Sort Key: users.lastseen DESC + -> Seq Scan on users_270013 users (cost=0.00..28.00 rows=6 width=40) + Filter: ((composite_id >= '(1,-9223372036854775808)'::user_composite_type) AND (composite_id <= '(1,9223372036854775807)'::user_composite_type)) + -> Limit (cost=11.96..11.96 rows=1 width=524) + -> Sort (cost=11.96..11.96 rows=1 width=524) + Sort Key: events.event_time DESC + -> Seq Scan on events_270009 events (cost=0.00..11.95 rows=1 width=524) + Filter: (((composite_id).tenant_id = ((users.composite_id).tenant_id)) AND ((composite_id).user_id = ((users.composite_id).user_id))) +(26 rows) SET citus.enable_router_execution TO 'true'; diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql index 3abf122b1..18d307017 100644 --- a/src/test/regress/sql/multi_join_order_additional.sql +++ b/src/test/regress/sql/multi_join_order_additional.sql @@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 650000; SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO DEBUG2; -- Create new table definitions for use in testing in distributed planning and diff --git a/src/test/regress/sql/multi_join_order_tpch_large.sql b/src/test/regress/sql/multi_join_order_tpch_large.sql index 12fc81c8b..20cf83dd0 100644 --- a/src/test/regress/sql/multi_join_order_tpch_large.sql +++ b/src/test/regress/sql/multi_join_order_tpch_large.sql @@ -11,7 +11,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 660000; SET citus.explain_distributed_queries TO off; SET citus.log_multi_join_order TO TRUE; -SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwhise +SET citus.task_executor_type = 'task-tracker'; -- can't explain all queries otherwise SET client_min_messages TO LOG; -- Change configuration to treat lineitem, orders, customer, and part tables as diff --git a/src/test/regress/sql/multi_mx_reference_table.sql b/src/test/regress/sql/multi_mx_reference_table.sql index c4843c077..31f89b60b 100644 --- a/src/test/regress/sql/multi_mx_reference_table.sql +++ b/src/test/regress/sql/multi_mx_reference_table.sql @@ -304,6 +304,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- table creation queries inside can be router plannable diff --git a/src/test/regress/sql/multi_mx_router_planner.sql b/src/test/regress/sql/multi_mx_router_planner.sql index cbcc14d52..bdca55574 100644 --- a/src/test/regress/sql/multi_mx_router_planner.sql +++ b/src/test/regress/sql/multi_mx_router_planner.sql @@ -557,6 +557,7 @@ DECLARE test_cursor CURSOR FOR ORDER BY id; FETCH test_cursor; FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- queries inside copy can be router plannable diff --git a/src/test/regress/sql/multi_mx_schema_support.sql b/src/test/regress/sql/multi_mx_schema_support.sql index de631df22..b72a5ee04 100644 --- a/src/test/regress/sql/multi_mx_schema_support.sql +++ b/src/test/regress/sql/multi_mx_schema_support.sql @@ -21,6 +21,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_hash WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- test with search_path is set @@ -31,6 +33,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_hash WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index 96af1c710..d2a4476b9 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -335,6 +335,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- table creation queries inside can be router plannable diff --git a/src/test/regress/sql/multi_router_planner.sql b/src/test/regress/sql/multi_router_planner.sql index fdea0f4a6..4d72d8e1f 100644 --- a/src/test/regress/sql/multi_router_planner.sql +++ b/src/test/regress/sql/multi_router_planner.sql @@ -917,6 +917,7 @@ DECLARE test_cursor CURSOR FOR FETCH test_cursor; FETCH ALL test_cursor; FETCH test_cursor; -- fetch one row after the last +FETCH BACKWARD test_cursor; END; -- queries inside copy can be router plannable diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 1b84e71ff..70feb03de 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -113,6 +113,8 @@ DECLARE test_cursor CURSOR FOR FROM test_schema_support.nation_append WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; -- test with search_path is set @@ -123,6 +125,8 @@ DECLARE test_cursor CURSOR FOR FROM nation_append WHERE n_nationkey = 1; FETCH test_cursor; +FETCH test_cursor; +FETCH BACKWARD test_cursor; END; diff --git a/src/test/regress/sql/multi_utility_statements.sql b/src/test/regress/sql/multi_utility_statements.sql index a29842208..b1e8232c3 100644 --- a/src/test/regress/sql/multi_utility_statements.sql +++ b/src/test/regress/sql/multi_utility_statements.sql @@ -146,5 +146,6 @@ DECLARE noHoldCursor SCROLL CURSOR FOR ORDER BY l_orderkey, l_linenumber; FETCH ABSOLUTE 5 FROM noHoldCursor; +FETCH BACKWARD noHoldCursor; COMMIT; FETCH ABSOLUTE 5 FROM noHoldCursor;