diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c
index 929af593c..181e9f524 100644
--- a/src/backend/distributed/executor/multi_executor.c
+++ b/src/backend/distributed/executor/multi_executor.c
@@ -29,6 +29,9 @@
#include "utils/snapmgr.h"
+static void CopyQueryResults(List *masterCopyStmtList);
+
+
/*
* multi_ExecutorStart is a hook called at at the beginning of any execution
* of any query plan.
@@ -77,7 +80,6 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
PlannedStmt *masterSelectPlan = MasterNodeSelectPlan(multiPlan);
CreateStmt *masterCreateStmt = MasterNodeCreateStatement(multiPlan);
List *masterCopyStmtList = MasterNodeCopyStatementList(multiPlan);
- ListCell *masterCopyStmtCell = NULL;
RangeTblEntry *masterRangeTableEntry = NULL;
StringInfo jobDirectoryName = NULL;
@@ -92,7 +94,11 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
/* pick distributed executor to use */
- if (executorType == MULTI_EXECUTOR_REAL_TIME)
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ {
+ /* skip distributed query execution for EXPLAIN commands */
+ }
+ else if (executorType == MULTI_EXECUTOR_REAL_TIME)
{
MultiRealTimeExecute(workerJob);
}
@@ -112,24 +118,11 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
/* make the temporary table visible */
CommandCounterIncrement();
- /* now copy data from all the remote nodes into temp table */
- foreach(masterCopyStmtCell, masterCopyStmtList)
+ if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
{
- Node *masterCopyStmt = (Node *) lfirst(masterCopyStmtCell);
-
- Assert(IsA(masterCopyStmt, CopyStmt));
-
- ProcessUtility(masterCopyStmt,
- "(copy job)",
- PROCESS_UTILITY_QUERY,
- NULL,
- None_Receiver,
- NULL);
+ CopyQueryResults(masterCopyStmtList);
}
- /* make the copied contents visible */
- CommandCounterIncrement();
-
/*
* Update the QueryDesc's snapshot so it sees the table. That's not
* particularly pretty, but we don't have much of a choice. One might
@@ -176,6 +169,35 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
}
+/*
+ * CopyQueryResults executes the commands that copy query results into a
+ * temporary table.
+ */
+static void
+CopyQueryResults(List *masterCopyStmtList)
+{
+ ListCell *masterCopyStmtCell = NULL;
+
+ /* now copy data from all the remote nodes into temp table */
+ foreach(masterCopyStmtCell, masterCopyStmtList)
+ {
+ Node *masterCopyStmt = (Node *) lfirst(masterCopyStmtCell);
+
+ Assert(IsA(masterCopyStmt, CopyStmt));
+
+ ProcessUtility(masterCopyStmt,
+ "(copy job)",
+ PROCESS_UTILITY_QUERY,
+ NULL,
+ None_Receiver,
+ NULL);
+ }
+
+ /* make the copied contents visible */
+ CommandCounterIncrement();
+}
+
+
/* Execute query plan. */
void
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c
index 295fb36bf..46b7b9b7d 100644
--- a/src/backend/distributed/executor/multi_router_executor.c
+++ b/src/backend/distributed/executor/multi_router_executor.c
@@ -586,7 +586,6 @@ RouterExecutorEnd(QueryDesc *queryDesc)
}
Assert(estate != NULL);
- Assert(estate->es_finished);
FreeExecutorState(estate);
queryDesc->estate = NULL;
diff --git a/src/backend/distributed/executor/multi_server_executor.c b/src/backend/distributed/executor/multi_server_executor.c
index 5072aac37..443ca813e 100644
--- a/src/backend/distributed/executor/multi_server_executor.c
+++ b/src/backend/distributed/executor/multi_server_executor.c
@@ -30,8 +30,6 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
-static bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
-
/*
* JobExecutorType selects the executor type for the given multiPlan using the task
@@ -116,7 +114,7 @@ JobExecutorType(MultiPlan *multiPlan)
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
-static bool
+bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{
Job *job = multiPlan->workerJob;
diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c
index 2cba3977a..bb44393df 100644
--- a/src/backend/distributed/planner/multi_explain.c
+++ b/src/backend/distributed/planner/multi_explain.c
@@ -8,21 +8,91 @@
*/
#include "postgres.h"
+#include "miscadmin.h"
-#include "commands/prepare.h"
+#include "access/xact.h"
+#include "catalog/namespace.h"
+#include "catalog/pg_class.h"
+#include "commands/copy.h"
+#include "commands/createas.h"
+#include "commands/dbcommands.h"
+#include "commands/explain.h"
+#include "commands/tablecmds.h"
#include "distributed/citus_nodefuncs.h"
+#include "distributed/multi_client_executor.h"
+#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
-#include "distributed/multi_planner.h"
#include "distributed/multi_logical_optimizer.h"
+#include "distributed/multi_logical_planner.h"
+#include "distributed/multi_master_planner.h"
#include "distributed/multi_physical_planner.h"
+#include "distributed/multi_planner.h"
+#include "distributed/multi_server_executor.h"
+#include "distributed/worker_protocol.h"
+#include "lib/stringinfo.h"
+#include "nodes/plannodes.h"
+#include "nodes/primnodes.h"
#include "nodes/print.h"
+#include "optimizer/clauses.h"
#include "optimizer/planner.h"
+#include "portability/instr_time.h"
+#include "tcop/dest.h"
#include "tcop/tcopprot.h"
+#include "tcop/utility.h"
+#include "utils/json.h"
+#include "utils/snapmgr.h"
+
+
+#if (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90500)
+
+/* Crude hack to avoid changing sizeof(ExplainState) in released branches (explain.c) */
+#define grouping_stack extra->groupingstack
+#endif
+
+/* OR-able flags for ExplainXMLTag() (explain.c) */
+#define X_OPENING 0
+#define X_CLOSING 1
+#define X_CLOSE_IMMEDIATE 2
+#define X_NOWHITESPACE 4
/* Config variables that enable printing distributed query plans */
bool ExplainMultiLogicalPlan = false;
bool ExplainMultiPhysicalPlan = false;
+bool ExplainDistributedQueries = true;
+bool ExplainAllTasks = false;
+
+
+/* Result for a single remote EXPLAIN command */
+typedef struct RemoteExplainPlan
+{
+ int placementIndex;
+ List *explainOutputList;
+} RemoteExplainPlan;
+
+
+/* Explain functions for distributed queries */
+static void ExplainMasterPlan(PlannedStmt *masterPlan, IntoClause *into,
+ ExplainState *es, const char *queryString,
+ ParamListInfo params, const instr_time *planDuration);
+static void ExplainJob(Job *job, ExplainState *es);
+static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
+static void ExplainTaskList(List *taskList, ExplainState *es);
+static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es);
+static void ExplainTask(Task *task, int placementIndex, List *explainOutputList,
+ ExplainState *es);
+static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
+ ExplainState *es);
+static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
+
+/* Static Explain functions copied from explain.c */
+static void ExplainOpenGroup(const char *objtype, const char *labelname,
+ bool labeled, ExplainState *es);
+static void ExplainCloseGroup(const char *objtype, const char *labelname,
+ bool labeled, ExplainState *es);
+static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es);
+static void ExplainJSONLineEnding(ExplainState *es);
+static void ExplainYAMLLineStarting(ExplainState *es);
/*
@@ -35,54 +105,60 @@ void
MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params)
{
- MultiTreeRoot *multiTree = NULL;
MultiPlan *multiPlan = NULL;
- Query *queryCopy = NULL;
- CmdType commandType = query->commandType;
+ CmdType commandType = CMD_UNKNOWN;
+ PlannedStmt *initialPlan = NULL;
+ Job *workerJob = NULL;
+ bool routerExecutablePlan = false;
+ instr_time planStart;
+ instr_time planDuration;
/* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query);
if (localQuery)
{
PlannedStmt *plan = NULL;
- instr_time planstart;
- instr_time planduration;
- INSTR_TIME_SET_CURRENT(planstart);
+ INSTR_TIME_SET_CURRENT(planStart);
/* plan the query */
plan = pg_plan_query(query, 0, params);
- INSTR_TIME_SET_CURRENT(planduration);
- INSTR_TIME_SUBTRACT(planduration, planstart);
+ INSTR_TIME_SET_CURRENT(planDuration);
+ INSTR_TIME_SUBTRACT(planDuration, planStart);
/* run it (if needed) and produce output */
- ExplainOnePlan(plan, into, es, queryString, params, &planduration);
+ ExplainOnePlan(plan, into, es, queryString, params, &planDuration);
return;
}
- /* error out early if the query is a modification */
+ /* measure the full planning time to display in EXPLAIN ANALYZE */
+ INSTR_TIME_SET_CURRENT(planStart);
+
+ /* call standard planner to modify the query structure before multi planning */
+ initialPlan = standard_planner(query, 0, params);
+
+ commandType = initialPlan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
- ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("cannot show execution plan for distributed modification"),
- errdetail("EXPLAIN commands are unsupported for distributed "
- "modifications.")));
+ if (es->analyze)
+ {
+ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
+ "distributed tables is not supported.")));
+ }
}
- /* call standard planner to modify the query structure before multi planning */
- standard_planner(query, 0, params);
- queryCopy = copyObject(query);
+ multiPlan = CreatePhysicalPlan(query);
- /* create the logical and physical plan */
- multiTree = MultiLogicalPlanCreate(queryCopy);
- MultiLogicalPlanOptimize(multiTree);
- multiPlan = MultiPhysicalPlanCreate(multiTree);
+ INSTR_TIME_SET_CURRENT(planDuration);
+ INSTR_TIME_SUBTRACT(planDuration, planStart);
if (ExplainMultiLogicalPlan)
{
+ MultiTreeRoot *multiTree = MultiLogicalPlanCreate(query);
char *logicalPlanString = CitusNodeToString(multiTree);
char *formattedPlanString = pretty_format_node_dump(logicalPlanString);
@@ -99,10 +175,739 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
appendStringInfo(es->str, "%s\n", formattedPlanString);
}
- /* if explain printing isn't enabled, print error only after planning */
- if (!ExplainMultiLogicalPlan && !ExplainMultiPhysicalPlan)
+ if (!ExplainDistributedQueries)
{
appendStringInfo(es->str, "explain statements for distributed queries ");
- appendStringInfo(es->str, "are currently unsupported\n");
+ appendStringInfo(es->str, "are not enabled\n");
+ return;
+ }
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfo(es->str, "Distributed Query");
+
+ if (multiPlan->masterTableName != NULL)
+ {
+ appendStringInfo(es->str, " into %s", multiPlan->masterTableName);
+ }
+
+ appendStringInfo(es->str, "\n");
+
+ es->indent += 1;
+ }
+
+ routerExecutablePlan = RouterExecutablePlan(multiPlan, TaskExecutorType);
+
+ if (routerExecutablePlan)
+ {
+ ExplainPropertyText("Executor", "Router", es);
+ }
+ else
+ {
+ switch (TaskExecutorType)
+ {
+ case MULTI_EXECUTOR_REAL_TIME:
+ {
+ ExplainPropertyText("Executor", "Real-Time", es);
+ }
+ break;
+
+ case MULTI_EXECUTOR_TASK_TRACKER:
+ {
+ ExplainPropertyText("Executor", "Task-Tracker", es);
+ }
+ break;
+
+ default:
+ {
+ ExplainPropertyText("Executor", "Other", es);
+ }
+ break;
+ }
+ }
+
+ workerJob = multiPlan->workerJob;
+ ExplainJob(workerJob, es);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ es->indent -= 1;
+ }
+
+ if (!routerExecutablePlan)
+ {
+ PlannedStmt *masterPlan = MultiQueryContainerNode(initialPlan, multiPlan);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfo(es->str, "Master Query\n");
+ es->indent += 1;
+ }
+
+ ExplainMasterPlan(masterPlan, into, es, queryString, params, &planDuration);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ es->indent -= 1;
+ }
+ }
+}
+
+
+/*
+ * ExplainMasterPlan generates EXPLAIN output for the master query that merges results.
+ * When using EXPLAIN ANALYZE, this function shows the execution time of the master query
+ * in isolation. Calling ExplainOnePlan directly would show the overall execution time of
+ * the distributed query, which makes it hard to determine how much time the master query
+ * took.
+ *
+ * Parts of this function are copied directly from ExplainOnePlan.
+ */
+static void
+ExplainMasterPlan(PlannedStmt *masterPlan, IntoClause *into,
+ ExplainState *es, const char *queryString,
+ ParamListInfo params, const instr_time *planDuration)
+{
+ DestReceiver *dest = NULL;
+ int eflags = 0;
+ QueryDesc *queryDesc = NULL;
+ int instrument_option = 0;
+
+ if (es->analyze && es->timing)
+ {
+ instrument_option |= INSTRUMENT_TIMER;
+ }
+ else if (es->analyze)
+ {
+ instrument_option |= INSTRUMENT_ROWS;
+ }
+
+ if (es->buffers)
+ {
+ instrument_option |= INSTRUMENT_BUFFERS;
+ }
+
+ /*
+ * Use a snapshot with an updated command ID to ensure this query sees
+ * results of any previously executed queries.
+ */
+ PushCopiedSnapshot(GetActiveSnapshot());
+ UpdateActiveSnapshotCommandId();
+
+ /*
+ * Normally we discard the query's output, but if explaining CREATE TABLE
+ * AS, we'd better use the appropriate tuple receiver.
+ */
+ if (into)
+ {
+ dest = CreateIntoRelDestReceiver(into);
+ }
+ else
+ {
+ dest = None_Receiver;
+ }
+
+ /* Create a QueryDesc for the query */
+ queryDesc = CreateQueryDesc(masterPlan, queryString,
+ GetActiveSnapshot(), InvalidSnapshot,
+ dest, params, instrument_option);
+
+ /* Select execution options */
+ if (es->analyze)
+ {
+ eflags = 0; /* default run-to-completion flags */
+ }
+ else
+ {
+ eflags = EXEC_FLAG_EXPLAIN_ONLY;
+ }
+ if (into)
+ {
+ eflags |= GetIntoRelEFlags(into);
+ }
+
+ /*
+ * ExecutorStart creates the merge table. If using ANALYZE, it also executes the
+ * worker job and populates the merge table.
+ */
+ ExecutorStart(queryDesc, eflags);
+
+ if (es->analyze)
+ {
+ ScanDirection dir;
+
+ /* if using analyze, then finish query execution */
+
+ /* EXPLAIN ANALYZE CREATE TABLE AS WITH NO DATA is weird */
+ if (into && into->skipData)
+ {
+ dir = NoMovementScanDirection;
+ }
+ else
+ {
+ dir = ForwardScanDirection;
+ }
+
+ /* run the plan */
+ ExecutorRun(queryDesc, dir, 0L);
+
+ /* run cleanup too */
+ ExecutorFinish(queryDesc);
+ }
+
+ /*
+ * ExplainOnePlan executes the master query again, which ensures that the execution
+ * time only shows the execution time of the master query itself, instead of the
+ * overall execution time.
+ */
+ ExplainOnePlan(queryDesc->plannedstmt, into, es, queryString, params, planDuration);
+
+ /*
+ * ExecutorEnd for the distributed query is deferred until after the master query
+ * is executed again, otherwise the merge table would be dropped.
+ */
+ ExecutorEnd(queryDesc);
+
+ FreeQueryDesc(queryDesc);
+
+ PopActiveSnapshot();
+}
+
+
+/*
+ * ExplainJob shows the EXPLAIN output for a Job in the physical plan of
+ * a distributed query by showing the remote EXPLAIN for the first task,
+ * or all tasks if citus.explain_all_tasks is on.
+ */
+static void
+ExplainJob(Job *job, ExplainState *es)
+{
+ List *dependedJobList = job->dependedJobList;
+ int dependedJobCount = list_length(dependedJobList);
+ ListCell *dependedJobCell = NULL;
+ List *taskList = job->taskList;
+ int taskCount = list_length(taskList);
+
+ ExplainOpenGroup("Job", NULL, true, es);
+
+ ExplainPropertyInteger("Task Count", taskCount, es);
+
+ if (dependedJobCount > 0)
+ {
+ ExplainPropertyText("Tasks Shown", "None, not supported for re-partition "
+ "queries", es);
+ }
+ else if (ExplainAllTasks || taskCount <= 1)
+ {
+ ExplainPropertyText("Tasks Shown", "All", es);
+ }
+ else
+ {
+ StringInfo tasksShownText = makeStringInfo();
+ appendStringInfo(tasksShownText, "One of %d", taskCount);
+
+ ExplainPropertyText("Tasks Shown", tasksShownText->data, es);
+ }
+
+ /*
+ * We cannot fetch EXPLAIN plans for jobs that have dependencies, since the
+ * intermediate tables have not been created.
+ */
+ if (dependedJobCount == 0)
+ {
+ ExplainOpenGroup("Tasks", "Tasks", false, es);
+
+ ExplainTaskList(taskList, es);
+
+ ExplainCloseGroup("Tasks", "Tasks", false, es);
+ }
+
+ ExplainCloseGroup("Job", NULL, true, es);
+
+ /* show explain output for depended jobs, if any */
+ foreach(dependedJobCell, dependedJobList)
+ {
+ Job *dependedJob = (Job *) lfirst(dependedJobCell);
+
+ if (CitusIsA(dependedJob, MapMergeJob))
+ {
+ ExplainMapMergeJob((MapMergeJob *) dependedJob, es);
+ }
+ }
+}
+
+
+/*
+ * ExplainMapMergeJob shows a very basic EXPLAIN plan for a MapMergeJob. It does
+ * not yet show the EXPLAIN plan for the individual tasks, because this requires
+ * specific logic for getting the query (which is wrapped in a UDF), and the
+ * queries may use intermediate tables that have not been created.
+ */
+static void
+ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es)
+{
+ List *dependedJobList = mapMergeJob->job.dependedJobList;
+ ListCell *dependedJobCell = NULL;
+ int mapTaskCount = list_length(mapMergeJob->mapTaskList);
+ int mergeTaskCount = list_length(mapMergeJob->mergeTaskList);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfo(es->str, "-> MapMergeJob\n");
+ es->indent += 3;
+ }
+
+ ExplainOpenGroup("MapMergeJob", NULL, true, es);
+
+ ExplainPropertyInteger("Map Task Count", mapTaskCount, es);
+ ExplainPropertyInteger("Merge Task Count", mergeTaskCount, es);
+
+ ExplainCloseGroup("Job", NULL, true, es);
+
+ foreach(dependedJobCell, dependedJobList)
+ {
+ Job *dependedJob = (Job *) lfirst(dependedJobCell);
+
+ if (CitusIsA(dependedJob, MapMergeJob))
+ {
+ ExplainMapMergeJob((MapMergeJob *) dependedJob, es);
+ }
+ }
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ es->indent -= 3;
+ }
+}
+
+
+/*
+ * ExplainTaskList shows the remote EXPLAIN for the first task in taskList,
+ * or all tasks if citus.explain_all_tasks is on.
+ */
+static void
+ExplainTaskList(List *taskList, ExplainState *es)
+{
+ ListCell *taskCell = NULL;
+ ListCell *remoteExplainCell = NULL;
+ List *remoteExplainList = NIL;
+
+ foreach(taskCell, taskList)
+ {
+ Task *task = (Task *) lfirst(taskCell);
+ RemoteExplainPlan *remoteExplain = NULL;
+
+ remoteExplain = RemoteExplain(task, es);
+ remoteExplainList = lappend(remoteExplainList, remoteExplain);
+
+ if (!ExplainAllTasks)
+ {
+ break;
+ }
+ }
+
+ forboth(taskCell, taskList, remoteExplainCell, remoteExplainList)
+ {
+ Task *task = (Task *) lfirst(taskCell);
+ RemoteExplainPlan *remoteExplain =
+ (RemoteExplainPlan *) lfirst(remoteExplainCell);
+
+ ExplainTask(task, remoteExplain->placementIndex,
+ remoteExplain->explainOutputList, es);
+ }
+}
+
+
+/*
+ * RemoteExplain fetches the the remote EXPLAIN output for a single
+ * task. It tries each shard placement until one succeeds or all
+ * failed.
+ */
+static RemoteExplainPlan *
+RemoteExplain(Task *task, ExplainState *es)
+{
+ StringInfo explainQuery = NULL;
+ List *taskPlacementList = task->taskPlacementList;
+ int placementCount = list_length(taskPlacementList);
+ int placementIndex = 0;
+ RemoteExplainPlan *remotePlan = NULL;
+
+ remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan));
+ explainQuery = BuildRemoteExplainQuery(task->queryString, es);
+
+ for (placementIndex = 0; placementIndex < placementCount; placementIndex++)
+ {
+ ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
+ char *nodeName = taskPlacement->nodeName;
+ uint32 nodePort = taskPlacement->nodePort;
+
+ remotePlan->placementIndex = placementIndex;
+ remotePlan->explainOutputList = ExecuteRemoteQuery(nodeName, nodePort,
+ NULL, explainQuery);
+ if (remotePlan->explainOutputList != NIL)
+ {
+ break;
+ }
+ }
+
+ return remotePlan;
+}
+
+
+/*
+ * ExplainTask shows the EXPLAIN output for an single task. The output has been
+ * fetched from the placement at index placementIndex. If explainOutputList is NIL,
+ * then the EXPLAIN output could not be fetched from any placement.
+ */
+static void
+ExplainTask(Task *task, int placementIndex, List *explainOutputList, ExplainState *es)
+{
+ ExplainOpenGroup("Task", NULL, true, es);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ appendStringInfoSpaces(es->str, es->indent * 2);
+ appendStringInfo(es->str, "-> Task\n");
+ es->indent += 3;
+ }
+
+ if (explainOutputList != NIL)
+ {
+ List *taskPlacementList = task->taskPlacementList;
+ ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
+
+ ExplainTaskPlacement(taskPlacement, explainOutputList, es);
+ }
+ else
+ {
+ ExplainPropertyText("Error", "Could not get remote plan.", es);
+ }
+
+ ExplainCloseGroup("Task", NULL, true, es);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ es->indent -= 3;
+ }
+}
+
+
+/*
+ * ExplainTaskPlacement shows the EXPLAIN output for an individual task placement.
+ * It corrects the indentation of the remote explain output to match the local
+ * output.
+ */
+static void
+ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
+ ExplainState *es)
+{
+ int savedIndentation = es->indent;
+ StringInfo nodeAddress = makeStringInfo();
+ char *nodeName = taskPlacement->nodeName;
+ uint32 nodePort = taskPlacement->nodePort;
+ char *nodeDatabase = get_database_name(MyDatabaseId);
+ ListCell *explainOutputCell = NULL;
+ int rowIndex = 0;
+
+ appendStringInfo(nodeAddress, "host=%s port=%d dbname=%s", nodeName, nodePort,
+ nodeDatabase);
+ ExplainPropertyText("Node", nodeAddress->data, es);
+
+ ExplainOpenGroup("Remote Plan", "Remote Plan", false, es);
+
+ if (es->format == EXPLAIN_FORMAT_JSON || es->format == EXPLAIN_FORMAT_YAML)
+ {
+ /* prevent appending the remote EXPLAIN on the same line */
+ appendStringInfoChar(es->str, '\n');
+ }
+
+ foreach(explainOutputCell, explainOutputList)
+ {
+ StringInfo rowString = (StringInfo) lfirst(explainOutputCell);
+ int rowLength = 0;
+ char *lineStart = NULL;
+
+ rowLength = strlen(rowString->data);
+ lineStart = rowString->data;
+
+ /* parse the lines in the remote EXPLAIN for proper indentation */
+ while (lineStart < rowString->data + rowLength)
+ {
+ /* find the end-of-line */
+ char *lineEnd = strchr(lineStart, '\n');
+
+ if (lineEnd == NULL)
+ {
+ /* no end-of-line, use end of row string instead */
+ lineEnd = rowString->data + rowLength;
+ }
+
+ /* convert line to a separate string */
+ *lineEnd = '\0';
+
+ /* indentation that is applied to all lines */
+ appendStringInfoSpaces(es->str, es->indent * 2);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT && rowIndex == 0)
+ {
+ /* indent the first line of the remote plan with an arrow */
+ appendStringInfoString(es->str, "-> ");
+ es->indent += 2;
+ }
+
+ /* show line in the output */
+ appendStringInfo(es->str, "%s\n", lineStart);
+
+ /* continue at the start of the next line */
+ lineStart = lineEnd + 1;
+ }
+
+ rowIndex++;
+ }
+
+ ExplainCloseGroup("Remote Plan", "Remote Plan", false, es);
+
+ if (es->format == EXPLAIN_FORMAT_TEXT)
+ {
+ es->indent = savedIndentation;
+ }
+}
+
+
+/*
+ * BuildRemoteExplainQuery returns an EXPLAIN query string
+ * to run on a worker node which explicitly contains all
+ * the options in the explain state.
+ */
+static StringInfo
+BuildRemoteExplainQuery(char *queryString, ExplainState *es)
+{
+ StringInfo explainQuery = makeStringInfo();
+ char *formatStr = NULL;
+
+ switch (es->format)
+ {
+ case EXPLAIN_FORMAT_XML:
+ {
+ formatStr = "XML";
+ }
+ break;
+
+ case EXPLAIN_FORMAT_JSON:
+ {
+ formatStr = "JSON";
+ }
+ break;
+
+ case EXPLAIN_FORMAT_YAML:
+ {
+ formatStr = "YAML";
+ }
+ break;
+
+ default:
+ {
+ formatStr = "TEXT";
+ }
+ break;
+ }
+
+ appendStringInfo(explainQuery,
+ "EXPLAIN (ANALYZE %s, VERBOSE %s, "
+ "COSTS %s, BUFFERS %s, TIMING %s, "
+ "FORMAT %s) %s",
+ es->analyze ? "TRUE" : "FALSE",
+ es->verbose ? "TRUE" : "FALSE",
+ es->costs ? "TRUE" : "FALSE",
+ es->buffers ? "TRUE" : "FALSE",
+ es->timing ? "TRUE" : "FALSE",
+ formatStr,
+ queryString);
+
+ return explainQuery;
+}
+
+
+/* below are private functions copied from explain.c */
+
+
+/* *INDENT-OFF* */
+/*
+ * Open a group of related objects.
+ *
+ * objtype is the type of the group object, labelname is its label within
+ * a containing object (if any).
+ *
+ * If labeled is true, the group members will be labeled properties,
+ * while if it's false, they'll be unlabeled objects.
+ */
+static void
+ExplainOpenGroup(const char *objtype, const char *labelname,
+ bool labeled, ExplainState *es)
+{
+ switch (es->format)
+ {
+ case EXPLAIN_FORMAT_TEXT:
+ /* nothing to do */
+ break;
+
+ case EXPLAIN_FORMAT_XML:
+ ExplainXMLTag(objtype, X_OPENING, es);
+ es->indent++;
+ break;
+
+ case EXPLAIN_FORMAT_JSON:
+ ExplainJSONLineEnding(es);
+ appendStringInfoSpaces(es->str, 2 * es->indent);
+ if (labelname)
+ {
+ escape_json(es->str, labelname);
+ appendStringInfoString(es->str, ": ");
+ }
+ appendStringInfoChar(es->str, labeled ? '{' : '[');
+
+ /*
+ * In JSON format, the grouping_stack is an integer list. 0 means
+ * we've emitted nothing at this grouping level, 1 means we've
+ * emitted something (and so the next item needs a comma). See
+ * ExplainJSONLineEnding().
+ */
+ es->grouping_stack = lcons_int(0, es->grouping_stack);
+ es->indent++;
+ break;
+
+ case EXPLAIN_FORMAT_YAML:
+
+ /*
+ * In YAML format, the grouping stack is an integer list. 0 means
+ * we've emitted nothing at this grouping level AND this grouping
+ * level is unlabelled and must be marked with "- ". See
+ * ExplainYAMLLineStarting().
+ */
+ ExplainYAMLLineStarting(es);
+ if (labelname)
+ {
+ appendStringInfo(es->str, "%s: ", labelname);
+ es->grouping_stack = lcons_int(1, es->grouping_stack);
+ }
+ else
+ {
+ appendStringInfoString(es->str, "- ");
+ es->grouping_stack = lcons_int(0, es->grouping_stack);
+ }
+ es->indent++;
+ break;
+ }
+}
+
+
+/*
+ * Close a group of related objects.
+ * Parameters must match the corresponding ExplainOpenGroup call.
+ */
+static void
+ExplainCloseGroup(const char *objtype, const char *labelname,
+ bool labeled, ExplainState *es)
+{
+ switch (es->format)
+ {
+ case EXPLAIN_FORMAT_TEXT:
+ /* nothing to do */
+ break;
+
+ case EXPLAIN_FORMAT_XML:
+ es->indent--;
+ ExplainXMLTag(objtype, X_CLOSING, es);
+ break;
+
+ case EXPLAIN_FORMAT_JSON:
+ es->indent--;
+ appendStringInfoChar(es->str, '\n');
+ appendStringInfoSpaces(es->str, 2 * es->indent);
+ appendStringInfoChar(es->str, labeled ? '}' : ']');
+ es->grouping_stack = list_delete_first(es->grouping_stack);
+ break;
+
+ case EXPLAIN_FORMAT_YAML:
+ es->indent--;
+ es->grouping_stack = list_delete_first(es->grouping_stack);
+ break;
+ }
+}
+
+
+/*
+ * Emit opening or closing XML tag.
+ *
+ * "flags" must contain X_OPENING, X_CLOSING, or X_CLOSE_IMMEDIATE.
+ * Optionally, OR in X_NOWHITESPACE to suppress the whitespace we'd normally
+ * add.
+ *
+ * XML tag names can't contain white space, so we replace any spaces in
+ * "tagname" with dashes.
+ */
+static void
+ExplainXMLTag(const char *tagname, int flags, ExplainState *es)
+{
+ const char *s;
+
+ if ((flags & X_NOWHITESPACE) == 0)
+ appendStringInfoSpaces(es->str, 2 * es->indent);
+ appendStringInfoCharMacro(es->str, '<');
+ if ((flags & X_CLOSING) != 0)
+ appendStringInfoCharMacro(es->str, '/');
+ for (s = tagname; *s; s++)
+ appendStringInfoCharMacro(es->str, (*s == ' ') ? '-' : *s);
+ if ((flags & X_CLOSE_IMMEDIATE) != 0)
+ appendStringInfoString(es->str, " /");
+ appendStringInfoCharMacro(es->str, '>');
+ if ((flags & X_NOWHITESPACE) == 0)
+ appendStringInfoCharMacro(es->str, '\n');
+}
+
+
+/*
+ * Emit a JSON line ending.
+ *
+ * JSON requires a comma after each property but the last. To facilitate this,
+ * in JSON format, the text emitted for each property begins just prior to the
+ * preceding line-break (and comma, if applicable).
+ */
+static void
+ExplainJSONLineEnding(ExplainState *es)
+{
+ Assert(es->format == EXPLAIN_FORMAT_JSON);
+ if (linitial_int(es->grouping_stack) != 0)
+ appendStringInfoChar(es->str, ',');
+ else
+ linitial_int(es->grouping_stack) = 1;
+ appendStringInfoChar(es->str, '\n');
+}
+
+
+/*
+ * Indent a YAML line.
+ *
+ * YAML lines are ordinarily indented by two spaces per indentation level.
+ * The text emitted for each property begins just prior to the preceding
+ * line-break, except for the first property in an unlabelled group, for which
+ * it begins immediately after the "- " that introduces the group. The first
+ * property of the group appears on the same line as the opening "- ".
+ */
+static void
+ExplainYAMLLineStarting(ExplainState *es)
+{
+ Assert(es->format == EXPLAIN_FORMAT_YAML);
+ if (linitial_int(es->grouping_stack) == 0)
+ {
+ linitial_int(es->grouping_stack) = 1;
+ }
+ else
+ {
+ appendStringInfoChar(es->str, '\n');
+ appendStringInfoSpaces(es->str, es->indent * 2);
}
}
diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c
index 7a073392d..2ca71bf7f 100644
--- a/src/backend/distributed/planner/multi_planner.c
+++ b/src/backend/distributed/planner/multi_planner.c
@@ -34,9 +34,7 @@ static void CheckNodeIsDumpable(Node *node);
/* local function forward declarations */
-static MultiPlan * CreatePhysicalPlan(Query *parse);
static char * GetMultiPlanString(PlannedStmt *result);
-static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan);
/* Distributed planner hook */
@@ -70,7 +68,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* target shards. SELECT queries go through the full logical plan/optimize/
* physical plan process needed to produce distributed query plans.
*/
-static MultiPlan *
+MultiPlan *
CreatePhysicalPlan(Query *parse)
{
Query *parseCopy = copyObject(parse);
@@ -162,7 +160,7 @@ HasCitusToplevelNode(PlannedStmt *result)
* function, which has to be removed from the really executed plan tree before
* query execution.
*/
-static PlannedStmt *
+PlannedStmt *
MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan)
{
FunctionScan *fauxFunctionScan = NULL;
diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c
index 3f021bdf1..5d7e94dc5 100644
--- a/src/backend/distributed/shared_library_init.c
+++ b/src/backend/distributed/shared_library_init.c
@@ -272,6 +272,31 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
+ DefineCustomBoolVariable(
+ "citus.explain_distributed_queries",
+ gettext_noop("Enables Explain for distributed queries."),
+ gettext_noop("When enabled, the Explain command shows remote and local "
+ "plans when used with a distributed query. It is enabled "
+ "by default, but can be disabled for regression tests."),
+ &ExplainDistributedQueries,
+ true,
+ PGC_USERSET,
+ GUC_NO_SHOW_ALL,
+ NULL, NULL, NULL);
+
+ DefineCustomBoolVariable(
+ "citus.explain_all_tasks",
+ gettext_noop("Enables showing output for all tasks in Explain."),
+ gettext_noop("The Explain command for distributed queries shows "
+ "the remote plan for a single task by default. When "
+ "this configuration entry is enabled, the plan for "
+ "all tasks is shown, but the Explain takes longer."),
+ &ExplainAllTasks,
+ false,
+ PGC_USERSET,
+ 0,
+ NULL, NULL, NULL);
+
DefineCustomBoolVariable(
"citus.all_modifications_commutative",
gettext_noop("Bypasses commutativity checks when enabled"),
diff --git a/src/include/distributed/multi_explain.h b/src/include/distributed/multi_explain.h
index c6349201a..984b8e2ab 100644
--- a/src/include/distributed/multi_explain.h
+++ b/src/include/distributed/multi_explain.h
@@ -15,6 +15,8 @@
/* Config variables managed via guc.c to explain distributed query plans */
extern bool ExplainMultiLogicalPlan;
extern bool ExplainMultiPhysicalPlan;
+extern bool ExplainDistributedQueries;
+extern bool ExplainAllTasks;
extern void MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params);
diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h
index c06af63d1..129ca1204 100644
--- a/src/include/distributed/multi_planner.h
+++ b/src/include/distributed/multi_planner.h
@@ -18,6 +18,9 @@ extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan;
+extern struct MultiPlan * CreatePhysicalPlan(Query *parse);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
+extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
+ struct MultiPlan *multiPlan);
#endif /* MULTI_PLANNER_H */
diff --git a/src/include/distributed/multi_server_executor.h b/src/include/distributed/multi_server_executor.h
index 69e23415a..96647de64 100644
--- a/src/include/distributed/multi_server_executor.h
+++ b/src/include/distributed/multi_server_executor.h
@@ -191,6 +191,7 @@ extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */
+extern bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan);
extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);
diff --git a/src/test/regress/expected/multi_copy.out b/src/test/regress/expected/multi_copy.out
new file mode 100644
index 000000000..25582c440
--- /dev/null
+++ b/src/test/regress/expected/multi_copy.out
@@ -0,0 +1,306 @@
+--
+-- MULTI_COPY
+--
+-- Create a new hash-partitioned table into which to COPY
+CREATE TABLE customer_copy_hash (
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117),
+ primary key (c_custkey));
+SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+-- Test COPY into empty hash-partitioned table
+COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+ERROR: could not find any shards into which to copy
+DETAIL: No shards exist for distributed table "customer_copy_hash".
+HINT: Run master_create_worker_shards to create shards and try again.
+SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
+ master_create_worker_shards
+-----------------------------
+
+(1 row)
+
+-- Test empty copy
+COPY customer_copy_hash FROM STDIN;
+-- Test syntax error
+COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
+ERROR: invalid input syntax for integer: "1,customer1"
+CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 0
+(1 row)
+
+-- Test primary key violation
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
+DETAIL: Key (c_custkey)=(2) already exists.
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 0
+(1 row)
+
+-- Test headers option
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
+-- Confirm that only first row was skipped
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 3
+(1 row)
+
+-- Test force_not_null option
+COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
+WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
+-- Confirm that value is not null
+SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
+ count
+-------
+ 1
+(1 row)
+
+-- Test force_null option
+COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
+WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
+-- Confirm that value is null
+SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
+ count
+-------
+ 0
+(1 row)
+
+-- Test null violation
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+ERROR: null value in column "c_name" violates not-null constraint
+DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 5
+(1 row)
+
+-- Test server-side copy from program
+COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
+WITH (DELIMITER ' ');
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
+ count
+-------
+ 1
+(1 row)
+
+-- Test server-side copy from file
+COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.2.data' WITH (DELIMITER '|');
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 1006
+(1 row)
+
+-- Test client-side copy from file
+\COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.3.data' WITH (DELIMITER '|');
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash;
+ count
+-------
+ 2006
+(1 row)
+
+-- Create a new hash-partitioned table with default now() function
+CREATE TABLE customer_with_default(
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_time timestamp default now());
+SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+SELECT master_create_worker_shards('customer_with_default', 64, 1);
+ master_create_worker_shards
+-----------------------------
+
+(1 row)
+
+-- Test with default values for now() function
+COPY customer_with_default (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+-- Confirm that data was copied with now() function
+SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
+ count
+-------
+ 2
+(1 row)
+
+-- Add columns to the table and perform a COPY
+ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
+ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
+COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
+SELECT * FROM customer_copy_hash WHERE extra1 = 1;
+ c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
+-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
+ 10 | customer10 | | | | | | | 1 | 5
+(1 row)
+
+-- Test dropping an intermediate column
+ALTER TABLE customer_copy_hash DROP COLUMN extra1;
+COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
+SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
+ c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
+-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
+ 11 | customer11 | | | | | | | 5
+(1 row)
+
+-- Test dropping the last column
+ALTER TABLE customer_copy_hash DROP COLUMN extra2;
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
+SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
+ c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
+-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
+ 12 | customer12 | | | | | |
+(1 row)
+
+-- Create a new range-partitioned table into which to COPY
+CREATE TABLE customer_copy_range (
+ c_custkey integer,
+ c_name varchar(25),
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117),
+ primary key (c_custkey));
+SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+-- Test COPY into empty range-partitioned table
+COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+ERROR: could not find any shards into which to copy
+DETAIL: No shards exist for distributed table "customer_copy_range".
+SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
+WHERE shardid = :new_shard_id;
+SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
+WHERE shardid = :new_shard_id;
+-- Test copy into range-partitioned table
+COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+-- Check whether data went into the right shard (maybe)
+SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
+FROM customer_copy_range WHERE c_custkey <= 500;
+ min | max | avg | count
+-----+-----+----------------------+-------
+ 1 | 500 | 250.5000000000000000 | 500
+(1 row)
+
+-- Check whether data was copied
+SELECT count(*) FROM customer_copy_range;
+ count
+-------
+ 1000
+(1 row)
+
+-- Create a new append-partitioned table into which to COPY
+CREATE TABLE customer_copy_append (
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117));
+SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+-- Test syntax error
+COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
+ERROR: invalid input syntax for integer: "notinteger"
+CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
+-- Test that no shard is created for failing copy
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
+ count
+-------
+ 0
+(1 row)
+
+-- Test empty copy
+COPY customer_copy_append FROM STDIN;
+-- Test that no shard is created for copying zero rows
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
+ count
+-------
+ 0
+(1 row)
+
+-- Test proper copy
+COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
+-- Check whether data was copied properly
+SELECT * FROM customer_copy_append;
+ c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
+-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
+ 1 | customer1 | | | | | |
+ 2 | customer2 | | | | | |
+(2 rows)
+
+-- Create lineitem table
+CREATE TABLE lineitem_copy_append (
+ l_orderkey bigint not null,
+ l_partkey integer not null,
+ l_suppkey integer not null,
+ l_linenumber integer not null,
+ l_quantity decimal(15, 2) not null,
+ l_extendedprice decimal(15, 2) not null,
+ l_discount decimal(15, 2) not null,
+ l_tax decimal(15, 2) not null,
+ l_returnflag char(1) not null,
+ l_linestatus char(1) not null,
+ l_shipdate date not null,
+ l_commitdate date not null,
+ l_receiptdate date not null,
+ l_shipinstruct char(25) not null,
+ l_shipmode char(10) not null,
+ l_comment varchar(44) not null);
+SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
+ master_create_distributed_table
+---------------------------------
+
+(1 row)
+
+-- Test multiple shard creation
+SET citus.shard_max_size TO '256kB';
+COPY lineitem_copy_append FROM '/home/marco/citus/citus-explain4/src/test/regress/data/lineitem.1.data' with delimiter '|';
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
+ count
+-------
+ 5
+(1 row)
+
diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out
new file mode 100644
index 000000000..fdabbd6cf
--- /dev/null
+++ b/src/test/regress/expected/multi_explain.out
@@ -0,0 +1,378 @@
+--
+-- MULTI_EXPLAIN
+--
+\a\t
+SET citus.explain_distributed_queries TO on;
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_0040
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_102010 lineitem
+Master Query
+ -> Sort
+ Sort Key: (sum(((sum(intermediate_column_40_1))::bigint)))::bigint, intermediate_column_40_0
+ -> HashAggregate
+ Group Key: intermediate_column_40_0
+ -> Seq Scan on pg_merge_job_0040
+-- Test JSON format
+EXPLAIN (COSTS FALSE, FORMAT JSON)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+[
+ "Executor": "Real-Time",
+ {
+ "Task Count": 6,
+ "Tasks Shown": "One of 6",
+ "Tasks": [
+ {
+ "Node": "host=localhost port=57637 dbname=regression",
+ "Remote Plan": [
+ [
+ {
+ "Plan": {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Group Key": ["l_quantity"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "lineitem_102010",
+ "Alias": "lineitem"
+ }
+ ]
+ }
+ }
+ ]
+
+ ]
+ }
+ ]
+ },
+ {
+ "Plan": {
+ "Node Type": "Sort",
+ "Sort Key": ["(sum(((sum(intermediate_column_41_1))::bigint)))::bigint", "intermediate_column_41_0"],
+ "Plans": [
+ {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Parent Relationship": "Outer",
+ "Group Key": ["intermediate_column_41_0"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "pg_merge_job_0041",
+ "Alias": "pg_merge_job_0041"
+ }
+ ]
+ }
+ ]
+ }
+ }
+]
+-- Test XML format
+EXPLAIN (COSTS FALSE, FORMAT XML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+ Real-Time
+
+ 6
+ One of 6
+
+
+ host=localhost port=57637 dbname=regression
+
+
+
+
+ Aggregate
+ Hashed
+
+ - l_quantity
+
+
+
+ Seq Scan
+ Outer
+ lineitem_102010
+ lineitem
+
+
+
+
+
+
+
+
+
+
+
+ Sort
+
+ - (sum(((sum(intermediate_column_42_1))::bigint)))::bigint
+ - intermediate_column_42_0
+
+
+
+ Aggregate
+ Hashed
+ Outer
+
+ - intermediate_column_42_0
+
+
+
+ Seq Scan
+ Outer
+ pg_merge_job_0042
+ pg_merge_job_0042
+
+
+
+
+
+
+
+-- Test YAML format
+EXPLAIN (COSTS FALSE, FORMAT YAML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Executor: "Real-Time"
+- Task Count: 6
+ Tasks Shown: "One of 6"
+ Tasks:
+ - Node: "host=localhost port=57637 dbname=regression"
+ Remote Plan:
+ - Plan:
+ Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Group Key:
+ - "l_quantity"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "lineitem_102010"
+ Alias: "lineitem"
+
+- Plan:
+ Node Type: "Sort"
+ Sort Key:
+ - "(sum(((sum(intermediate_column_43_1))::bigint)))::bigint"
+ - "intermediate_column_43_0"
+ Plans:
+ - Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Parent Relationship: "Outer"
+ Group Key:
+ - "intermediate_column_43_0"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "pg_merge_job_0043"
+ Alias: "pg_merge_job_0043"
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_0044
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_102010 lineitem
+Master Query
+ -> Sort
+ Sort Key: (sum(((sum(intermediate_column_44_1))::bigint)))::bigint, intermediate_column_44_0
+ -> HashAggregate
+ Group Key: intermediate_column_44_0
+ -> Seq Scan on pg_merge_job_0044
+-- Test verbose
+EXPLAIN (COSTS FALSE, VERBOSE TRUE)
+ SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
+Distributed Query into pg_merge_job_0045
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
+ -> Seq Scan on public.lineitem_102010 lineitem
+ Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
+Master Query
+ -> Aggregate
+ Output: (sum(intermediate_column_45_0) / (sum(intermediate_column_45_1) / sum(intermediate_column_45_2)))
+ -> Seq Scan on pg_temp_2.pg_merge_job_0045
+ Output: intermediate_column_45_0, intermediate_column_45_1, intermediate_column_45_2
+-- Test join
+EXPLAIN (COSTS FALSE)
+ SELECT * FROM lineitem
+ JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
+ ORDER BY l_quantity DESC LIMIT 10;
+Distributed Query into pg_merge_job_0046
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Limit
+ -> Sort
+ Sort Key: lineitem.l_quantity DESC
+ -> Hash Join
+ Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
+ -> Seq Scan on lineitem_102010 lineitem
+ Filter: (l_quantity < '5'::numeric)
+ -> Hash
+ -> Seq Scan on orders_102015 orders
+Master Query
+ -> Limit
+ -> Sort
+ Sort Key: intermediate_column_46_4 DESC
+ -> Seq Scan on pg_merge_job_0046
+-- Test insert
+EXPLAIN (COSTS FALSE)
+ INSERT INTO lineitem VALUES(1,0);
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Insert on lineitem_102009
+ -> Result
+-- Test update
+EXPLAIN (COSTS FALSE)
+ UPDATE lineitem
+ SET l_suppkey = 12
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Update on lineitem_102009
+ -> Bitmap Heap Scan on lineitem_102009
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_102009
+ Index Cond: (l_orderkey = 1)
+-- Test delete
+EXPLAIN (COSTS FALSE)
+ DELETE FROM lineitem
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Delete on lineitem_102009
+ -> Bitmap Heap Scan on lineitem_102009
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_102009
+ Index Cond: (l_orderkey = 1)
+-- Test single-shard SELECT
+EXPLAIN
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
+Distributed Query into pg_merge_job_0047
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Bitmap Heap Scan on lineitem_102009 lineitem (cost=4.31..19.58 rows=5 width=18)
+ Recheck Cond: (l_orderkey = 5)
+ -> Bitmap Index Scan on lineitem_pkey_102009 (cost=0.00..4.31 rows=5 width=0)
+ Index Cond: (l_orderkey = 5)
+-- Test CREATE TABLE ... AS
+EXPLAIN CREATE TABLE explain_result AS
+ SELECT * FROM lineitem;
+Distributed Query into pg_merge_job_0048
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Seq Scan on lineitem_102010 lineitem (cost=0.00..58.80 rows=980 width=374)
+Master Query
+ -> Seq Scan on pg_merge_job_0048 (cost=0.00..0.00 rows=0 width=0)
+-- Test all tasks output
+SET citus.explain_all_tasks TO on;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_0049
+ Executor: Real-Time
+ Task Count: 3
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102012 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102013 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102014 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0049
+-- Test track tracker
+SET citus.task_executor_type TO 'task-tracker';
+SET citus.explain_all_tasks TO off;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_0050
+ Executor: Task-Tracker
+ Task Count: 3
+ Tasks Shown: One of 3
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102012 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0050
+-- Test re-partition join
+SET citus.large_table_shard_count TO 1;
+EXPLAIN (COSTS FALSE)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+Distributed Query into pg_merge_job_0053
+ Executor: Task-Tracker
+ Task Count: 1
+ Tasks Shown: None, not supported for re-partition queries
+ -> MapMergeJob
+ Map Task Count: 1
+ Merge Task Count: 1
+ -> MapMergeJob
+ Map Task Count: 6
+ Merge Task Count: 1
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0053
diff --git a/src/test/regress/expected/multi_explain_0.out b/src/test/regress/expected/multi_explain_0.out
new file mode 100644
index 000000000..07a9762e7
--- /dev/null
+++ b/src/test/regress/expected/multi_explain_0.out
@@ -0,0 +1,378 @@
+--
+-- MULTI_EXPLAIN
+--
+\a\t
+SET citus.explain_distributed_queries TO on;
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_0040
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_102010 lineitem
+Master Query
+ -> Sort
+ Sort Key: (sum(((sum(intermediate_column_40_1))::bigint)))::bigint, intermediate_column_40_0
+ -> HashAggregate
+ Group Key: intermediate_column_40_0
+ -> Seq Scan on pg_merge_job_0040
+-- Test JSON format
+EXPLAIN (COSTS FALSE, FORMAT JSON)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+[
+ "Executor": "Real-Time",
+ {
+ "Task Count": 6,
+ "Tasks Shown": "One of 6",
+ "Tasks": [
+ {
+ "Node": "host=localhost port=57637 dbname=regression",
+ "Remote Plan": [
+ [
+ {
+ "Plan": {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Group Key": ["l_quantity"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "lineitem_102010",
+ "Alias": "lineitem"
+ }
+ ]
+ }
+ }
+ ]
+
+ ]
+ }
+ ]
+ },
+ {
+ "Plan": {
+ "Node Type": "Sort",
+ "Sort Key": ["(sum(((sum(intermediate_column_41_1))::bigint)))::bigint", "intermediate_column_41_0"],
+ "Plans": [
+ {
+ "Node Type": "Aggregate",
+ "Strategy": "Hashed",
+ "Parent Relationship": "Outer",
+ "Group Key": ["intermediate_column_41_0"],
+ "Plans": [
+ {
+ "Node Type": "Seq Scan",
+ "Parent Relationship": "Outer",
+ "Relation Name": "pg_merge_job_0041",
+ "Alias": "pg_merge_job_0041"
+ }
+ ]
+ }
+ ]
+ }
+ }
+]
+-- Test XML format
+EXPLAIN (COSTS FALSE, FORMAT XML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+ Real-Time
+
+ 6
+ One of 6
+
+
+ host=localhost port=57637 dbname=regression
+
+
+
+
+ Aggregate
+ Hashed
+
+ - l_quantity
+
+
+
+ Seq Scan
+ Outer
+ lineitem_102010
+ lineitem
+
+
+
+
+
+
+
+
+
+
+
+ Sort
+
+ - (sum(((sum(intermediate_column_42_1))::bigint)))::bigint
+ - intermediate_column_42_0
+
+
+
+ Aggregate
+ Hashed
+ Outer
+
+ - intermediate_column_42_0
+
+
+
+ Seq Scan
+ Outer
+ pg_merge_job_0042
+ pg_merge_job_0042
+
+
+
+
+
+
+
+-- Test YAML format
+EXPLAIN (COSTS FALSE, FORMAT YAML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Executor: "Real-Time"
+- Task Count: 6
+ Tasks Shown: "One of 6"
+ Tasks:
+ - Node: "host=localhost port=57637 dbname=regression"
+ Remote Plan:
+ - Plan:
+ Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Group Key:
+ - "l_quantity"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "lineitem_102010"
+ Alias: "lineitem"
+
+- Plan:
+ Node Type: "Sort"
+ Sort Key:
+ - "(sum(((sum(intermediate_column_43_1))::bigint)))::bigint"
+ - "intermediate_column_43_0"
+ Plans:
+ - Node Type: "Aggregate"
+ Strategy: "Hashed"
+ Parent Relationship: "Outer"
+ Group Key:
+ - "intermediate_column_43_0"
+ Plans:
+ - Node Type: "Seq Scan"
+ Parent Relationship: "Outer"
+ Relation Name: "pg_merge_job_0043"
+ Alias: "pg_merge_job_0043"
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+Distributed Query into pg_merge_job_0044
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> HashAggregate
+ Group Key: l_quantity
+ -> Seq Scan on lineitem_102010 lineitem
+Master Query
+ -> Sort
+ Sort Key: (sum(((sum(intermediate_column_44_1))::bigint)))::bigint, intermediate_column_44_0
+ -> HashAggregate
+ Group Key: intermediate_column_44_0
+ -> Seq Scan on pg_merge_job_0044
+-- Test verbose
+EXPLAIN (COSTS FALSE, VERBOSE TRUE)
+ SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
+Distributed Query into pg_merge_job_0045
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
+ -> Seq Scan on public.lineitem_102010 lineitem
+ Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
+Master Query
+ -> Aggregate
+ Output: (sum(intermediate_column_45_0) / (sum(intermediate_column_45_1) / sum(intermediate_column_45_2)))
+ -> Seq Scan on pg_temp_2.pg_merge_job_0045
+ Output: intermediate_column_45_0, intermediate_column_45_1, intermediate_column_45_2
+-- Test join
+EXPLAIN (COSTS FALSE)
+ SELECT * FROM lineitem
+ JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
+ ORDER BY l_quantity DESC LIMIT 10;
+Distributed Query into pg_merge_job_0046
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Limit
+ -> Sort
+ Sort Key: lineitem.l_quantity
+ -> Hash Join
+ Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
+ -> Seq Scan on lineitem_102010 lineitem
+ Filter: (l_quantity < 5::numeric)
+ -> Hash
+ -> Seq Scan on orders_102015 orders
+Master Query
+ -> Limit
+ -> Sort
+ Sort Key: intermediate_column_46_4
+ -> Seq Scan on pg_merge_job_0046
+-- Test insert
+EXPLAIN (COSTS FALSE)
+ INSERT INTO lineitem VALUES(1,0);
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Insert on lineitem_102009
+ -> Result
+-- Test update
+EXPLAIN (COSTS FALSE)
+ UPDATE lineitem
+ SET l_suppkey = 12
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Update on lineitem_102009
+ -> Bitmap Heap Scan on lineitem_102009
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_102009
+ Index Cond: (l_orderkey = 1)
+-- Test delete
+EXPLAIN (COSTS FALSE)
+ DELETE FROM lineitem
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+Distributed Query
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Delete on lineitem_102009
+ -> Bitmap Heap Scan on lineitem_102009
+ Recheck Cond: (l_orderkey = 1)
+ Filter: (l_partkey = 0)
+ -> Bitmap Index Scan on lineitem_pkey_102009
+ Index Cond: (l_orderkey = 1)
+-- Test single-shard SELECT
+EXPLAIN
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
+Distributed Query into pg_merge_job_0047
+ Executor: Router
+ Task Count: 1
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Bitmap Heap Scan on lineitem_102009 lineitem (cost=4.31..19.58 rows=5 width=18)
+ Recheck Cond: (l_orderkey = 5)
+ -> Bitmap Index Scan on lineitem_pkey_102009 (cost=0.00..4.31 rows=5 width=0)
+ Index Cond: (l_orderkey = 5)
+-- Test CREATE TABLE ... AS
+EXPLAIN CREATE TABLE explain_result AS
+ SELECT * FROM lineitem;
+Distributed Query into pg_merge_job_0048
+ Executor: Real-Time
+ Task Count: 6
+ Tasks Shown: One of 6
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Seq Scan on lineitem_102010 lineitem (cost=0.00..58.80 rows=980 width=374)
+Master Query
+ -> Seq Scan on pg_merge_job_0048 (cost=0.00..0.00 rows=0 width=0)
+-- Test all tasks output
+SET citus.explain_all_tasks TO on;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_0049
+ Executor: Real-Time
+ Task Count: 3
+ Tasks Shown: All
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102012 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57638 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102013 lineitem
+ Filter: (l_orderkey > 9030)
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102014 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0049
+-- Test track tracker
+SET citus.task_executor_type TO 'task-tracker';
+SET citus.explain_all_tasks TO off;
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+Distributed Query into pg_merge_job_0050
+ Executor: Task-Tracker
+ Task Count: 3
+ Tasks Shown: One of 3
+ -> Task
+ Node: host=localhost port=57637 dbname=regression
+ -> Aggregate
+ -> Seq Scan on lineitem_102012 lineitem
+ Filter: (l_orderkey > 9030)
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0050
+-- Test re-partition join
+SET citus.large_table_shard_count TO 1;
+EXPLAIN (COSTS FALSE)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
+Distributed Query into pg_merge_job_0053
+ Executor: Task-Tracker
+ Task Count: 1
+ Tasks Shown: None, not supported for re-partition queries
+ -> MapMergeJob
+ Map Task Count: 1
+ Merge Task Count: 1
+ -> MapMergeJob
+ Map Task Count: 6
+ Merge Task Count: 1
+Master Query
+ -> Aggregate
+ -> Seq Scan on pg_merge_job_0053
diff --git a/src/test/regress/expected/multi_hash_pruning.out b/src/test/regress/expected/multi_hash_pruning.out
index 12b5317d3..458372e91 100644
--- a/src/test/regress/expected/multi_hash_pruning.out
+++ b/src/test/regress/expected/multi_hash_pruning.out
@@ -16,248 +16,247 @@ CREATE TABLE orders_hash_partitioned (
o_clerk char(15),
o_shippriority integer,
o_comment varchar(79) );
-SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'append');
+SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
-UPDATE pg_dist_partition SET partmethod = 'h'
- WHERE logicalrelid = 'orders_hash_partitioned'::regclass;
--- Create logical shards with shardids 110, 111, 112 and 113
-INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
- VALUES ('orders_hash_partitioned'::regclass, 110, 't', -1905060026, -1905060026),
- ('orders_hash_partitioned'::regclass, 111, 't', 1134484726, 1134484726),
- ('orders_hash_partitioned'::regclass, 112, 't', -1905060026, -28094569),
- ('orders_hash_partitioned'::regclass, 113, 't', -1011077333, 0);
--- Create shard placements for shards 110, 111, 112 and 113
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 110, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 111, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 112, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 113, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
+SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
+ master_create_worker_shards
+-----------------------------
+
+(1 row)
+
SET client_min_messages TO DEBUG2;
-- Check that we can prune shards for simple cases, boolean expressions and
-- immutable functions.
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned;
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned;
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
-DEBUG: predicate pruning for shardId 112
-DEBUG: predicate pruning for shardId 110
-DEBUG: predicate pruning for shardId 113
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
-DEBUG: predicate pruning for shardId 110
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
-DEBUG: predicate pruning for shardId 110
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
-DEBUG: predicate pruning for shardId 112
-DEBUG: predicate pruning for shardId 110
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
+DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102036
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2;
-DEBUG: predicate pruning for shardId 113
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey is NULL;
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102036
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM
+SELECT count(*) FROM
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
+DEBUG: Creating router plan
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: Plan is router executable
+ count
+-------
+ 0
(1 row)
-- Check that we don't support pruning for ANY (array expression) and give
-- a notice message when used with the partition column
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}');
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
HINT: Consider rewriting the expression with OR/AND clauses.
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ count
+-------
+ 0
(1 row)
-- Check that we don't show the message if the operator is not
-- equality operator
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey < ALL ('{1,2,3}');
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ count
+-------
+ 0
(1 row)
-- Check that we don't give a spurious hint message when non-partition
-- columns are used with ANY/IN/ALL
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ count
+-------
+ 0
(1 row)
-- Check that we cannot prune for mutable functions.
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() OR o_orderkey = 1;
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() AND o_orderkey = 1;
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+ count
+-------
+ 0
(1 row)
-- Check that we can do join pruning.
-EXPLAIN SELECT count(*)
+SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey;
-DEBUG: join prunable for intervals [-1905060026,-28094569] and [1134484726,1134484726]
-DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0]
-DEBUG: join prunable for intervals [-1905060026,-1905060026] and [1134484726,1134484726]
-DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026]
-DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726]
-DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569]
-DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026]
-DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
+DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
+DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
+DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
+DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
+DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
+DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
+DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
+DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
+DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
+DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
+DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
+ count
+-------
+ 0
(1 row)
-EXPLAIN SELECT count(*)
+SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey
AND orders1.o_orderkey = 1
AND orders2.o_orderkey is NULL;
-DEBUG: predicate pruning for shardId 113
-DEBUG: predicate pruning for shardId 111
-DEBUG: predicate pruning for shardId 112
-DEBUG: predicate pruning for shardId 110
-DEBUG: predicate pruning for shardId 111
-DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102035
+DEBUG: predicate pruning for shardId 102036
+DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102034
+DEBUG: predicate pruning for shardId 102036
+DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
+ count
+-------
+
(1 row)
-SET client_min_messages TO NOTICE;
diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out
index ffc9ae245..3df390d33 100644
--- a/src/test/regress/expected/multi_join_order_additional.out
+++ b/src/test/regress/expected/multi_join_order_additional.out
@@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_ADDITIONAL
--
-- Set configuration to print table join order and pruned shards
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2;
-- The following query checks that we can correctly handle self-joins
@@ -36,9 +37,9 @@ DEBUG: join prunable for intervals [13921,14947] and [2497,4964]
DEBUG: join prunable for intervals [13921,14947] and [4965,5986]
DEBUG: join prunable for intervals [13921,14947] and [8997,11554]
DEBUG: join prunable for intervals [13921,14947] and [11554,13920]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Update configuration to treat lineitem and orders tables as large
@@ -51,9 +52,9 @@ EXPLAIN SELECT count(*) FROM lineitem, orders
WHERE (l_orderkey = o_orderkey AND l_quantity > 5)
OR (l_orderkey = o_orderkey AND l_quantity < 10);
LOG: join order: [ "lineitem" ][ local partition join "orders" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT l_quantity FROM lineitem, orders
@@ -72,9 +73,9 @@ UPDATE pg_dist_partition SET partmethod = 'h' WHERE
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Verify we handle local joins between two hash-partitioned tables.
@@ -83,9 +84,9 @@ UPDATE pg_dist_partition SET partmethod = 'h' WHERE
EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ local partition join "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@@ -98,9 +99,9 @@ UPDATE pg_dist_partition SET partmethod = 'h' WHERE
EXPLAIN SELECT count(*) FROM customer, nation
WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer" ][ broadcast join "nation" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@@ -112,9 +113,9 @@ SET citus.large_table_shard_count TO 1;
EXPLAIN SELECT count(*) FROM orders, lineitem, customer
WHERE o_custkey = l_partkey AND o_custkey = c_nationkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Validate that we don't chose a single-partition join method with a
@@ -124,9 +125,9 @@ UPDATE pg_dist_partition SET partmethod = 'h' WHERE
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@@ -138,9 +139,9 @@ UPDATE pg_dist_partition SET partmethod = 'h' WHERE
EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ single partition join "customer" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
diff --git a/src/test/regress/expected/multi_join_order_tpch_large.out b/src/test/regress/expected/multi_join_order_tpch_large.out
index deb1f20da..04759db3b 100644
--- a/src/test/regress/expected/multi_join_order_tpch_large.out
+++ b/src/test/regress/expected/multi_join_order_tpch_large.out
@@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_TPCH_LARGE
--
-- Enable configuration to print table join order
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem, orders, customer, and part tables as
@@ -20,9 +21,9 @@ WHERE
and l_discount between 0.06 - 0.01 and 0.06 + 0.01
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #3 from the TPC-H decision support benchmark
@@ -49,9 +50,9 @@ ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #10 from the TPC-H decision support benchmark
@@ -87,9 +88,9 @@ GROUP BY
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ][ broadcast join "nation" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #19 from the TPC-H decision support benchmark (modified)
@@ -123,9 +124,9 @@ WHERE
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ single partition join "part" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query to test multiple re-partition jobs in a single query
@@ -140,9 +141,9 @@ WHERE
GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part" ][ single partition join "customer" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Reset client logging level to its previous value
diff --git a/src/test/regress/expected/multi_join_order_tpch_small.out b/src/test/regress/expected/multi_join_order_tpch_small.out
index 6601fb53c..1c506136c 100644
--- a/src/test/regress/expected/multi_join_order_tpch_small.out
+++ b/src/test/regress/expected/multi_join_order_tpch_small.out
@@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_TPCH_SMALL
--
-- Enable configuration to print table join order
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem and orders tables as large
@@ -17,9 +18,9 @@ WHERE
and l_discount between 0.06 - 0.01 and 0.06 + 0.01
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #3 from the TPC-H decision support benchmark
@@ -46,9 +47,9 @@ ORDER BY
revenue DESC,
o_orderdate;
LOG: join order: [ "orders" ][ broadcast join "customer" ][ local partition join "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #10 from the TPC-H decision support benchmark
@@ -84,9 +85,9 @@ GROUP BY
ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ broadcast join "customer" ][ broadcast join "nation" ][ local partition join "lineitem" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Query #19 from the TPC-H decision support benchmark (modified)
@@ -120,9 +121,9 @@ WHERE
AND l_shipinstruct = 'DELIVER IN PERSON'
);
LOG: join order: [ "lineitem" ][ broadcast join "part" ]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Reset client logging level to its previous value
diff --git a/src/test/regress/expected/multi_join_pruning.out b/src/test/regress/expected/multi_join_pruning.out
index 6299768f3..2b2968321 100644
--- a/src/test/regress/expected/multi_join_pruning.out
+++ b/src/test/regress/expected/multi_join_pruning.out
@@ -4,6 +4,7 @@
-- Check that join-pruning works for joins between two large relations. For now
-- we only check for join-pruning between locally partitioned relations. In the
-- future we want to check for pruning between re-partitioned relations as well.
+SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Change configuration to treat all tables as large
SET citus.large_table_shard_count TO 2;
@@ -74,9 +75,9 @@ EXPLAIN SELECT count(*)
WHERE table1.array_column = table2.array_column;
DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}]
DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*)
@@ -84,9 +85,9 @@ EXPLAIN SELECT count(*)
WHERE table1.composite_column = table2.composite_column;
DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)]
DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Test that large table joins on partition varchar columns work
@@ -95,9 +96,9 @@ EXPLAIN SELECT count(*)
WHERE table1.varchar_column = table2.varchar_column;
DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6]
DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6]
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
SET client_min_messages TO NOTICE;
diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out
index 4573c457f..413ac9ca4 100644
--- a/src/test/regress/expected/multi_large_table_join_planning.out
+++ b/src/test/regress/expected/multi_large_table_join_planning.out
@@ -74,7 +74,7 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6
-DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102035 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
+DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4
@@ -90,9 +90,9 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 6
-DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102034 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint"
+DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 9
-DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102033 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint"
+DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4
diff --git a/src/test/regress/expected/multi_large_table_join_planning_0.out b/src/test/regress/expected/multi_large_table_join_planning_0.out
index 9296d52e1..f254c40ca 100644
--- a/src/test/regress/expected/multi_large_table_join_planning_0.out
+++ b/src/test/regress/expected/multi_large_table_join_planning_0.out
@@ -74,7 +74,7 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6
-DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102035 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
+DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4
@@ -90,9 +90,9 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 6
-DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102034 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint"
+DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 9
-DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102033 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint"
+DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4
diff --git a/src/test/regress/expected/multi_large_table_pruning.out b/src/test/regress/expected/multi_large_table_pruning.out
index 27908a6ca..ad13f7109 100644
--- a/src/test/regress/expected/multi_large_table_pruning.out
+++ b/src/test/regress/expected/multi_large_table_pruning.out
@@ -57,8 +57,8 @@ WHERE
o_custkey = c_custkey AND
c_custkey < 0;
DEBUG: predicate pruning for shardId 102017
-DEBUG: predicate pruning for shardId 102034
-DEBUG: predicate pruning for shardId 102033
+DEBUG: predicate pruning for shardId 102038
+DEBUG: predicate pruning for shardId 102037
count
-------
diff --git a/src/test/regress/expected/multi_partition_pruning.out b/src/test/regress/expected/multi_partition_pruning.out
index 189b10594..0a3cade99 100644
--- a/src/test/regress/expected/multi_partition_pruning.out
+++ b/src/test/regress/expected/multi_partition_pruning.out
@@ -3,6 +3,7 @@
--
-- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client.
+SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
@@ -162,25 +163,25 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename,
-- one shard.
EXPLAIN SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2';
DEBUG: predicate pruning for shardId 100
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM array_partitioned_table
WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}';
DEBUG: predicate pruning for shardId 102
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM composite_partitioned_table
WHERE composite_column < '(b,5,c)'::composite_type;
DEBUG: predicate pruning for shardId 105
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
SET client_min_messages TO NOTICE;
diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out
index 381d57b50..9b6cce828 100644
--- a/src/test/regress/expected/multi_task_assignment_policy.out
+++ b/src/test/regress/expected/multi_task_assignment_policy.out
@@ -1,6 +1,7 @@
--
-- MULTI_TASK_ASSIGNMENT
--
+SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
-- and shard placement data into system catalogs. We next run Explain command,
@@ -57,9 +58,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@@ -69,9 +70,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Next test the first-replica task assignment policy
@@ -86,9 +87,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@@ -98,9 +99,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
-- Round-robin task assignment relies on the current jobId. We therefore need to
@@ -129,9 +130,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@@ -141,9 +142,9 @@ DEBUG: assigned task 6 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@@ -153,9 +154,9 @@ DEBUG: assigned task 6 to node localhost:57637
DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
+ QUERY PLAN
+------------------------------------------------------------
+ explain statements for distributed queries are not enabled
(1 row)
RESET citus.task_assignment_policy;
diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out
index 6aa4fa18e..6279c4806 100644
--- a/src/test/regress/expected/multi_utilities.out
+++ b/src/test/regress/expected/multi_utilities.out
@@ -21,17 +21,6 @@ COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
-- cursors may not involve distributed tables
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
ERROR: DECLARE CURSOR can only be used in transaction blocks
--- EXPLAIN support isn't implemented for distributed queries...
-EXPLAIN SELECT * FROM sharded_table;
- QUERY PLAN
-----------------------------------------------------------------------
- explain statements for distributed queries are currently unsupported
-(1 row)
-
--- ... or for distributed modifications
-EXPLAIN INSERT INTO sharded_table VALUES ('dan', 4);
-ERROR: cannot show execution plan for distributed modification
-DETAIL: EXPLAIN commands are unsupported for distributed modifications.
-- verify PREPARE functionality
PREPARE sharded_insert AS INSERT INTO sharded_table VALUES ('adam', 1);
PREPARE sharded_update AS UPDATE sharded_table SET name = 'bob' WHERE id = 1;
diff --git a/src/test/regress/input/multi_master_delete_protocol.source b/src/test/regress/input/multi_master_delete_protocol.source
index c98febba4..b9fca8695 100644
--- a/src/test/regress/input/multi_master_delete_protocol.source
+++ b/src/test/regress/input/multi_master_delete_protocol.source
@@ -42,7 +42,7 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
SELECT count(*) FROM customer_delete_protocol;
-- Verify that empty shards are deleted if no condition is provided
-SELECT master_create_empty_shard('customer_delete_protocol');
+SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_custkey > 1000');
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule
index 546d69b11..310836ba3 100644
--- a/src/test/regress/multi_schedule
+++ b/src/test/regress/multi_schedule
@@ -26,6 +26,7 @@ test: multi_stage_data
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
+test: multi_explain
test: multi_subquery
test: multi_single_relation_subquery
test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate
diff --git a/src/test/regress/output/multi_master_delete_protocol.source b/src/test/regress/output/multi_master_delete_protocol.source
index 84fd2579c..3ec22f2c2 100644
--- a/src/test/regress/output/multi_master_delete_protocol.source
+++ b/src/test/regress/output/multi_master_delete_protocol.source
@@ -67,10 +67,10 @@ SELECT count(*) FROM customer_delete_protocol;
(1 row)
-- Verify that empty shards are deleted if no condition is provided
-SELECT master_create_empty_shard('customer_delete_protocol');
- master_create_empty_shard
----------------------------
- 102041
+SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
+ one
+-----
+ 1
(1 row)
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
diff --git a/src/test/regress/sql/multi_copy.sql b/src/test/regress/sql/multi_copy.sql
new file mode 100644
index 000000000..baf9c37eb
--- /dev/null
+++ b/src/test/regress/sql/multi_copy.sql
@@ -0,0 +1,256 @@
+--
+-- MULTI_COPY
+--
+
+-- Create a new hash-partitioned table into which to COPY
+CREATE TABLE customer_copy_hash (
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117),
+ primary key (c_custkey));
+SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
+
+-- Test COPY into empty hash-partitioned table
+COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+
+SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
+
+-- Test empty copy
+COPY customer_copy_hash FROM STDIN;
+\.
+
+-- Test syntax error
+COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
+1,customer1
+2,customer2,
+notinteger,customernot
+\.
+
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+
+-- Test primary key violation
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+1,customer1
+2,customer2
+2,customer2
+\.
+
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+
+-- Test headers option
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
+# header
+1,customer1
+2,customer2
+3,customer3
+\.
+
+-- Confirm that only first row was skipped
+SELECT count(*) FROM customer_copy_hash;
+
+-- Test force_not_null option
+COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
+WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
+"4","customer4",""
+\.
+
+-- Confirm that value is not null
+SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
+
+-- Test force_null option
+COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
+WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
+"5","customer5",""
+\.
+
+-- Confirm that value is null
+SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
+
+-- Test null violation
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+6,customer6
+7,customer7
+8,
+\.
+
+-- Confirm that no data was copied
+SELECT count(*) FROM customer_copy_hash;
+
+-- Test server-side copy from program
+COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
+WITH (DELIMITER ' ');
+
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
+
+-- Test server-side copy from file
+COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.2.data' WITH (DELIMITER '|');
+
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash;
+
+-- Test client-side copy from file
+\COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.3.data' WITH (DELIMITER '|');
+
+-- Confirm that data was copied
+SELECT count(*) FROM customer_copy_hash;
+
+-- Create a new hash-partitioned table with default now() function
+CREATE TABLE customer_with_default(
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_time timestamp default now());
+
+SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
+
+SELECT master_create_worker_shards('customer_with_default', 64, 1);
+
+-- Test with default values for now() function
+COPY customer_with_default (c_custkey, c_name) FROM STDIN
+WITH (FORMAT 'csv');
+1,customer1
+2,customer2
+\.
+
+-- Confirm that data was copied with now() function
+SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
+
+-- Add columns to the table and perform a COPY
+ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
+ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
+
+COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
+10,customer10,1,5
+\.
+
+SELECT * FROM customer_copy_hash WHERE extra1 = 1;
+
+-- Test dropping an intermediate column
+ALTER TABLE customer_copy_hash DROP COLUMN extra1;
+
+COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
+11,customer11,5
+\.
+
+SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
+
+-- Test dropping the last column
+ALTER TABLE customer_copy_hash DROP COLUMN extra2;
+
+COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
+12,customer12
+\.
+
+SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
+
+-- Create a new range-partitioned table into which to COPY
+CREATE TABLE customer_copy_range (
+ c_custkey integer,
+ c_name varchar(25),
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117),
+ primary key (c_custkey));
+
+SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
+
+-- Test COPY into empty range-partitioned table
+COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+
+SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
+WHERE shardid = :new_shard_id;
+
+SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
+\gset
+UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
+WHERE shardid = :new_shard_id;
+
+-- Test copy into range-partitioned table
+COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
+
+-- Check whether data went into the right shard (maybe)
+SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
+FROM customer_copy_range WHERE c_custkey <= 500;
+
+-- Check whether data was copied
+SELECT count(*) FROM customer_copy_range;
+
+-- Create a new append-partitioned table into which to COPY
+CREATE TABLE customer_copy_append (
+ c_custkey integer,
+ c_name varchar(25) not null,
+ c_address varchar(40),
+ c_nationkey integer,
+ c_phone char(15),
+ c_acctbal decimal(15,2),
+ c_mktsegment char(10),
+ c_comment varchar(117));
+SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
+
+-- Test syntax error
+COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
+1,customer1
+2,customer2
+notinteger,customernot
+\.
+
+-- Test that no shard is created for failing copy
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
+
+-- Test empty copy
+COPY customer_copy_append FROM STDIN;
+\.
+
+-- Test that no shard is created for copying zero rows
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
+
+-- Test proper copy
+COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
+1,customer1
+2,customer2
+\.
+
+-- Check whether data was copied properly
+SELECT * FROM customer_copy_append;
+
+-- Create lineitem table
+CREATE TABLE lineitem_copy_append (
+ l_orderkey bigint not null,
+ l_partkey integer not null,
+ l_suppkey integer not null,
+ l_linenumber integer not null,
+ l_quantity decimal(15, 2) not null,
+ l_extendedprice decimal(15, 2) not null,
+ l_discount decimal(15, 2) not null,
+ l_tax decimal(15, 2) not null,
+ l_returnflag char(1) not null,
+ l_linestatus char(1) not null,
+ l_shipdate date not null,
+ l_commitdate date not null,
+ l_receiptdate date not null,
+ l_shipinstruct char(25) not null,
+ l_shipmode char(10) not null,
+ l_comment varchar(44) not null);
+SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
+
+-- Test multiple shard creation
+SET citus.shard_max_size TO '256kB';
+
+COPY lineitem_copy_append FROM '/home/marco/citus/citus-explain4/src/test/regress/data/lineitem.1.data' with delimiter '|';
+
+SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql
new file mode 100644
index 000000000..479c4c82a
--- /dev/null
+++ b/src/test/regress/sql/multi_explain.sql
@@ -0,0 +1,88 @@
+--
+-- MULTI_EXPLAIN
+--
+
+\a\t
+
+SET citus.explain_distributed_queries TO on;
+
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+-- Test JSON format
+EXPLAIN (COSTS FALSE, FORMAT JSON)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+-- Test XML format
+EXPLAIN (COSTS FALSE, FORMAT XML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+-- Test YAML format
+EXPLAIN (COSTS FALSE, FORMAT YAML)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+-- Test Text format
+EXPLAIN (COSTS FALSE, FORMAT TEXT)
+ SELECT l_quantity, count(*) count_quantity FROM lineitem
+ GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
+
+-- Test verbose
+EXPLAIN (COSTS FALSE, VERBOSE TRUE)
+ SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
+
+-- Test join
+EXPLAIN (COSTS FALSE)
+ SELECT * FROM lineitem
+ JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
+ ORDER BY l_quantity DESC LIMIT 10;
+
+-- Test insert
+EXPLAIN (COSTS FALSE)
+ INSERT INTO lineitem VALUES(1,0);
+
+-- Test update
+EXPLAIN (COSTS FALSE)
+ UPDATE lineitem
+ SET l_suppkey = 12
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+
+-- Test delete
+EXPLAIN (COSTS FALSE)
+ DELETE FROM lineitem
+ WHERE l_orderkey = 1 AND l_partkey = 0;
+
+-- Test single-shard SELECT
+EXPLAIN
+ SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
+
+-- Test CREATE TABLE ... AS
+EXPLAIN CREATE TABLE explain_result AS
+ SELECT * FROM lineitem;
+
+-- Test all tasks output
+SET citus.explain_all_tasks TO on;
+
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+
+-- Test track tracker
+SET citus.task_executor_type TO 'task-tracker';
+SET citus.explain_all_tasks TO off;
+
+EXPLAIN (COSTS FALSE)
+ SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
+
+-- Test re-partition join
+SET citus.large_table_shard_count TO 1;
+
+EXPLAIN (COSTS FALSE)
+ SELECT count(*)
+ FROM lineitem, orders, customer, supplier
+ WHERE l_orderkey = o_orderkey
+ AND o_custkey = c_custkey
+ AND l_suppkey = s_suppkey;
diff --git a/src/test/regress/sql/multi_hash_pruning.sql b/src/test/regress/sql/multi_hash_pruning.sql
index c70ede12b..9ad2f6038 100644
--- a/src/test/regress/sql/multi_hash_pruning.sql
+++ b/src/test/regress/sql/multi_hash_pruning.sql
@@ -19,111 +19,69 @@ CREATE TABLE orders_hash_partitioned (
o_clerk char(15),
o_shippriority integer,
o_comment varchar(79) );
-SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'append');
-
-UPDATE pg_dist_partition SET partmethod = 'h'
- WHERE logicalrelid = 'orders_hash_partitioned'::regclass;
-
--- Create logical shards with shardids 110, 111, 112 and 113
-
-INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
- VALUES ('orders_hash_partitioned'::regclass, 110, 't', -1905060026, -1905060026),
- ('orders_hash_partitioned'::regclass, 111, 't', 1134484726, 1134484726),
- ('orders_hash_partitioned'::regclass, 112, 't', -1905060026, -28094569),
- ('orders_hash_partitioned'::regclass, 113, 't', -1011077333, 0);
-
--- Create shard placements for shards 110, 111, 112 and 113
-
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 110, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 111, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 112, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
-
-INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
- SELECT 113, 1, 1, nodename, nodeport
- FROM pg_dist_shard_placement
- GROUP BY nodename, nodeport
- ORDER BY nodename, nodeport ASC
- LIMIT 1;
+SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
+SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
SET client_min_messages TO DEBUG2;
-- Check that we can prune shards for simple cases, boolean expressions and
-- immutable functions.
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
+SELECT count(*) FROM orders_hash_partitioned;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey is NULL;
-EXPLAIN SELECT count(*) FROM
+SELECT count(*) FROM
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
-- Check that we don't support pruning for ANY (array expression) and give
-- a notice message when used with the partition column
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}');
-- Check that we don't show the message if the operator is not
-- equality operator
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey < ALL ('{1,2,3}');
-- Check that we don't give a spurious hint message when non-partition
-- columns are used with ANY/IN/ALL
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
-- Check that we cannot prune for mutable functions.
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() OR o_orderkey = 1;
-EXPLAIN SELECT count(*) FROM orders_hash_partitioned
+SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() AND o_orderkey = 1;
-- Check that we can do join pruning.
-EXPLAIN SELECT count(*)
+SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey;
-EXPLAIN SELECT count(*)
+SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey
AND orders1.o_orderkey = 1
AND orders2.o_orderkey is NULL;
-
-SET client_min_messages TO NOTICE;
diff --git a/src/test/regress/sql/multi_join_order_additional.sql b/src/test/regress/sql/multi_join_order_additional.sql
index c1668f061..902ed4739 100644
--- a/src/test/regress/sql/multi_join_order_additional.sql
+++ b/src/test/regress/sql/multi_join_order_additional.sql
@@ -4,6 +4,7 @@
-- Set configuration to print table join order and pruned shards
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2;
diff --git a/src/test/regress/sql/multi_join_order_tpch_large.sql b/src/test/regress/sql/multi_join_order_tpch_large.sql
index a0d1e5a9f..ed8ca08bd 100644
--- a/src/test/regress/sql/multi_join_order_tpch_large.sql
+++ b/src/test/regress/sql/multi_join_order_tpch_large.sql
@@ -4,6 +4,7 @@
-- Enable configuration to print table join order
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
diff --git a/src/test/regress/sql/multi_join_order_tpch_small.sql b/src/test/regress/sql/multi_join_order_tpch_small.sql
index b4663cbe9..6b99ee7ea 100644
--- a/src/test/regress/sql/multi_join_order_tpch_small.sql
+++ b/src/test/regress/sql/multi_join_order_tpch_small.sql
@@ -4,6 +4,7 @@
-- Enable configuration to print table join order
+SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
diff --git a/src/test/regress/sql/multi_join_pruning.sql b/src/test/regress/sql/multi_join_pruning.sql
index f2794d8d9..f43632b5f 100644
--- a/src/test/regress/sql/multi_join_pruning.sql
+++ b/src/test/regress/sql/multi_join_pruning.sql
@@ -6,6 +6,7 @@
-- we only check for join-pruning between locally partitioned relations. In the
-- future we want to check for pruning between re-partitioned relations as well.
+SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Change configuration to treat all tables as large
diff --git a/src/test/regress/sql/multi_partition_pruning.sql b/src/test/regress/sql/multi_partition_pruning.sql
index 3c99516ab..128ec266a 100644
--- a/src/test/regress/sql/multi_partition_pruning.sql
+++ b/src/test/regress/sql/multi_partition_pruning.sql
@@ -5,6 +5,7 @@
-- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client.
+SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Adding additional l_orderkey = 1 to make this query not router executable
diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql
index 5ee4cddfc..1fd30b716 100644
--- a/src/test/regress/sql/multi_task_assignment_policy.sql
+++ b/src/test/regress/sql/multi_task_assignment_policy.sql
@@ -2,6 +2,8 @@
-- MULTI_TASK_ASSIGNMENT
--
+SET citus.explain_distributed_queries TO off;
+
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
-- and shard placement data into system catalogs. We next run Explain command,
diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql
index d2161f246..7070311f7 100644
--- a/src/test/regress/sql/multi_utilities.sql
+++ b/src/test/regress/sql/multi_utilities.sql
@@ -13,12 +13,6 @@ COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
-- cursors may not involve distributed tables
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
--- EXPLAIN support isn't implemented for distributed queries...
-EXPLAIN SELECT * FROM sharded_table;
-
--- ... or for distributed modifications
-EXPLAIN INSERT INTO sharded_table VALUES ('dan', 4);
-
-- verify PREPARE functionality
PREPARE sharded_insert AS INSERT INTO sharded_table VALUES ('adam', 1);
PREPARE sharded_update AS UPDATE sharded_table SET name = 'bob' WHERE id = 1;