Merge pull request #414 from citusdata/feature/explain

Add EXPLAIN for simple distributed queries
pull/476/head
Marco Slot 2016-04-30 00:35:47 +02:00
commit 1859a285a0
36 changed files with 2615 additions and 402 deletions

View File

@ -29,6 +29,9 @@
#include "utils/snapmgr.h"
static void CopyQueryResults(List *masterCopyStmtList);
/*
* multi_ExecutorStart is a hook called at at the beginning of any execution
* of any query plan.
@ -77,7 +80,6 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
PlannedStmt *masterSelectPlan = MasterNodeSelectPlan(multiPlan);
CreateStmt *masterCreateStmt = MasterNodeCreateStatement(multiPlan);
List *masterCopyStmtList = MasterNodeCopyStatementList(multiPlan);
ListCell *masterCopyStmtCell = NULL;
RangeTblEntry *masterRangeTableEntry = NULL;
StringInfo jobDirectoryName = NULL;
@ -92,7 +94,11 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
ResourceOwnerRememberJobDirectory(CurrentResourceOwner, workerJob->jobId);
/* pick distributed executor to use */
if (executorType == MULTI_EXECUTOR_REAL_TIME)
if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
{
/* skip distributed query execution for EXPLAIN commands */
}
else if (executorType == MULTI_EXECUTOR_REAL_TIME)
{
MultiRealTimeExecute(workerJob);
}
@ -112,24 +118,11 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
/* make the temporary table visible */
CommandCounterIncrement();
/* now copy data from all the remote nodes into temp table */
foreach(masterCopyStmtCell, masterCopyStmtList)
if (!(eflags & EXEC_FLAG_EXPLAIN_ONLY))
{
Node *masterCopyStmt = (Node *) lfirst(masterCopyStmtCell);
Assert(IsA(masterCopyStmt, CopyStmt));
ProcessUtility(masterCopyStmt,
"(copy job)",
PROCESS_UTILITY_QUERY,
NULL,
None_Receiver,
NULL);
CopyQueryResults(masterCopyStmtList);
}
/* make the copied contents visible */
CommandCounterIncrement();
/*
* Update the QueryDesc's snapshot so it sees the table. That's not
* particularly pretty, but we don't have much of a choice. One might
@ -176,6 +169,35 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
}
/*
* CopyQueryResults executes the commands that copy query results into a
* temporary table.
*/
static void
CopyQueryResults(List *masterCopyStmtList)
{
ListCell *masterCopyStmtCell = NULL;
/* now copy data from all the remote nodes into temp table */
foreach(masterCopyStmtCell, masterCopyStmtList)
{
Node *masterCopyStmt = (Node *) lfirst(masterCopyStmtCell);
Assert(IsA(masterCopyStmt, CopyStmt));
ProcessUtility(masterCopyStmt,
"(copy job)",
PROCESS_UTILITY_QUERY,
NULL,
None_Receiver,
NULL);
}
/* make the copied contents visible */
CommandCounterIncrement();
}
/* Execute query plan. */
void
multi_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)

View File

@ -586,7 +586,6 @@ RouterExecutorEnd(QueryDesc *queryDesc)
}
Assert(estate != NULL);
Assert(estate->es_finished);
FreeExecutorState(estate);
queryDesc->estate = NULL;

View File

@ -30,8 +30,6 @@ int RemoteTaskCheckInterval = 100; /* per cycle sleep interval in millisecs */
int TaskExecutorType = MULTI_EXECUTOR_REAL_TIME; /* distributed executor type */
bool BinaryMasterCopyFormat = false; /* copy data from workers in binary format */
static bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
/*
* JobExecutorType selects the executor type for the given multiPlan using the task
@ -116,7 +114,7 @@ JobExecutorType(MultiPlan *multiPlan)
* router executor. Modify queries are always router executable, select queries
* are router executable only if executorType is real time.
*/
static bool
bool
RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType)
{
Job *job = multiPlan->workerJob;

View File

@ -8,21 +8,91 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "commands/prepare.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "commands/copy.h"
#include "commands/createas.h"
#include "commands/dbcommands.h"
#include "commands/explain.h"
#include "commands/tablecmds.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_master_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/worker_protocol.h"
#include "lib/stringinfo.h"
#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "nodes/print.h"
#include "optimizer/clauses.h"
#include "optimizer/planner.h"
#include "portability/instr_time.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "utils/json.h"
#include "utils/snapmgr.h"
#if (PG_VERSION_NUM >= 90400 && PG_VERSION_NUM < 90500)
/* Crude hack to avoid changing sizeof(ExplainState) in released branches (explain.c) */
#define grouping_stack extra->groupingstack
#endif
/* OR-able flags for ExplainXMLTag() (explain.c) */
#define X_OPENING 0
#define X_CLOSING 1
#define X_CLOSE_IMMEDIATE 2
#define X_NOWHITESPACE 4
/* Config variables that enable printing distributed query plans */
bool ExplainMultiLogicalPlan = false;
bool ExplainMultiPhysicalPlan = false;
bool ExplainDistributedQueries = true;
bool ExplainAllTasks = false;
/* Result for a single remote EXPLAIN command */
typedef struct RemoteExplainPlan
{
int placementIndex;
List *explainOutputList;
} RemoteExplainPlan;
/* Explain functions for distributed queries */
static void ExplainMasterPlan(PlannedStmt *masterPlan, IntoClause *into,
ExplainState *es, const char *queryString,
ParamListInfo params, const instr_time *planDuration);
static void ExplainJob(Job *job, ExplainState *es);
static void ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es);
static void ExplainTaskList(List *taskList, ExplainState *es);
static RemoteExplainPlan * RemoteExplain(Task *task, ExplainState *es);
static void ExplainTask(Task *task, int placementIndex, List *explainOutputList,
ExplainState *es);
static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
ExplainState *es);
static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
/* Static Explain functions copied from explain.c */
static void ExplainOpenGroup(const char *objtype, const char *labelname,
bool labeled, ExplainState *es);
static void ExplainCloseGroup(const char *objtype, const char *labelname,
bool labeled, ExplainState *es);
static void ExplainXMLTag(const char *tagname, int flags, ExplainState *es);
static void ExplainJSONLineEnding(ExplainState *es);
static void ExplainYAMLLineStarting(ExplainState *es);
/*
@ -35,54 +105,60 @@ void
MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params)
{
MultiTreeRoot *multiTree = NULL;
MultiPlan *multiPlan = NULL;
Query *queryCopy = NULL;
CmdType commandType = query->commandType;
CmdType commandType = CMD_UNKNOWN;
PlannedStmt *initialPlan = NULL;
Job *workerJob = NULL;
bool routerExecutablePlan = false;
instr_time planStart;
instr_time planDuration;
/* if local query, run the standard explain and return */
bool localQuery = !NeedsDistributedPlanning(query);
if (localQuery)
{
PlannedStmt *plan = NULL;
instr_time planstart;
instr_time planduration;
INSTR_TIME_SET_CURRENT(planstart);
INSTR_TIME_SET_CURRENT(planStart);
/* plan the query */
plan = pg_plan_query(query, 0, params);
INSTR_TIME_SET_CURRENT(planduration);
INSTR_TIME_SUBTRACT(planduration, planstart);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
/* run it (if needed) and produce output */
ExplainOnePlan(plan, into, es, queryString, params, &planduration);
ExplainOnePlan(plan, into, es, queryString, params, &planDuration);
return;
}
/* error out early if the query is a modification */
/* measure the full planning time to display in EXPLAIN ANALYZE */
INSTR_TIME_SET_CURRENT(planStart);
/* call standard planner to modify the query structure before multi planning */
initialPlan = standard_planner(query, 0, params);
commandType = initialPlan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE)
{
if (es->analyze)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot show execution plan for distributed modification"),
errdetail("EXPLAIN commands are unsupported for distributed "
"modifications.")));
errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
"distributed tables is not supported.")));
}
}
/* call standard planner to modify the query structure before multi planning */
standard_planner(query, 0, params);
queryCopy = copyObject(query);
multiPlan = CreatePhysicalPlan(query);
/* create the logical and physical plan */
multiTree = MultiLogicalPlanCreate(queryCopy);
MultiLogicalPlanOptimize(multiTree);
multiPlan = MultiPhysicalPlanCreate(multiTree);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
if (ExplainMultiLogicalPlan)
{
MultiTreeRoot *multiTree = MultiLogicalPlanCreate(query);
char *logicalPlanString = CitusNodeToString(multiTree);
char *formattedPlanString = pretty_format_node_dump(logicalPlanString);
@ -99,10 +175,739 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
appendStringInfo(es->str, "%s\n", formattedPlanString);
}
/* if explain printing isn't enabled, print error only after planning */
if (!ExplainMultiLogicalPlan && !ExplainMultiPhysicalPlan)
if (!ExplainDistributedQueries)
{
appendStringInfo(es->str, "explain statements for distributed queries ");
appendStringInfo(es->str, "are currently unsupported\n");
appendStringInfo(es->str, "are not enabled\n");
return;
}
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "Distributed Query");
if (multiPlan->masterTableName != NULL)
{
appendStringInfo(es->str, " into %s", multiPlan->masterTableName);
}
appendStringInfo(es->str, "\n");
es->indent += 1;
}
routerExecutablePlan = RouterExecutablePlan(multiPlan, TaskExecutorType);
if (routerExecutablePlan)
{
ExplainPropertyText("Executor", "Router", es);
}
else
{
switch (TaskExecutorType)
{
case MULTI_EXECUTOR_REAL_TIME:
{
ExplainPropertyText("Executor", "Real-Time", es);
}
break;
case MULTI_EXECUTOR_TASK_TRACKER:
{
ExplainPropertyText("Executor", "Task-Tracker", es);
}
break;
default:
{
ExplainPropertyText("Executor", "Other", es);
}
break;
}
}
workerJob = multiPlan->workerJob;
ExplainJob(workerJob, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent -= 1;
}
if (!routerExecutablePlan)
{
PlannedStmt *masterPlan = MultiQueryContainerNode(initialPlan, multiPlan);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "Master Query\n");
es->indent += 1;
}
ExplainMasterPlan(masterPlan, into, es, queryString, params, &planDuration);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent -= 1;
}
}
}
/*
* ExplainMasterPlan generates EXPLAIN output for the master query that merges results.
* When using EXPLAIN ANALYZE, this function shows the execution time of the master query
* in isolation. Calling ExplainOnePlan directly would show the overall execution time of
* the distributed query, which makes it hard to determine how much time the master query
* took.
*
* Parts of this function are copied directly from ExplainOnePlan.
*/
static void
ExplainMasterPlan(PlannedStmt *masterPlan, IntoClause *into,
ExplainState *es, const char *queryString,
ParamListInfo params, const instr_time *planDuration)
{
DestReceiver *dest = NULL;
int eflags = 0;
QueryDesc *queryDesc = NULL;
int instrument_option = 0;
if (es->analyze && es->timing)
{
instrument_option |= INSTRUMENT_TIMER;
}
else if (es->analyze)
{
instrument_option |= INSTRUMENT_ROWS;
}
if (es->buffers)
{
instrument_option |= INSTRUMENT_BUFFERS;
}
/*
* Use a snapshot with an updated command ID to ensure this query sees
* results of any previously executed queries.
*/
PushCopiedSnapshot(GetActiveSnapshot());
UpdateActiveSnapshotCommandId();
/*
* Normally we discard the query's output, but if explaining CREATE TABLE
* AS, we'd better use the appropriate tuple receiver.
*/
if (into)
{
dest = CreateIntoRelDestReceiver(into);
}
else
{
dest = None_Receiver;
}
/* Create a QueryDesc for the query */
queryDesc = CreateQueryDesc(masterPlan, queryString,
GetActiveSnapshot(), InvalidSnapshot,
dest, params, instrument_option);
/* Select execution options */
if (es->analyze)
{
eflags = 0; /* default run-to-completion flags */
}
else
{
eflags = EXEC_FLAG_EXPLAIN_ONLY;
}
if (into)
{
eflags |= GetIntoRelEFlags(into);
}
/*
* ExecutorStart creates the merge table. If using ANALYZE, it also executes the
* worker job and populates the merge table.
*/
ExecutorStart(queryDesc, eflags);
if (es->analyze)
{
ScanDirection dir;
/* if using analyze, then finish query execution */
/* EXPLAIN ANALYZE CREATE TABLE AS WITH NO DATA is weird */
if (into && into->skipData)
{
dir = NoMovementScanDirection;
}
else
{
dir = ForwardScanDirection;
}
/* run the plan */
ExecutorRun(queryDesc, dir, 0L);
/* run cleanup too */
ExecutorFinish(queryDesc);
}
/*
* ExplainOnePlan executes the master query again, which ensures that the execution
* time only shows the execution time of the master query itself, instead of the
* overall execution time.
*/
ExplainOnePlan(queryDesc->plannedstmt, into, es, queryString, params, planDuration);
/*
* ExecutorEnd for the distributed query is deferred until after the master query
* is executed again, otherwise the merge table would be dropped.
*/
ExecutorEnd(queryDesc);
FreeQueryDesc(queryDesc);
PopActiveSnapshot();
}
/*
* ExplainJob shows the EXPLAIN output for a Job in the physical plan of
* a distributed query by showing the remote EXPLAIN for the first task,
* or all tasks if citus.explain_all_tasks is on.
*/
static void
ExplainJob(Job *job, ExplainState *es)
{
List *dependedJobList = job->dependedJobList;
int dependedJobCount = list_length(dependedJobList);
ListCell *dependedJobCell = NULL;
List *taskList = job->taskList;
int taskCount = list_length(taskList);
ExplainOpenGroup("Job", NULL, true, es);
ExplainPropertyInteger("Task Count", taskCount, es);
if (dependedJobCount > 0)
{
ExplainPropertyText("Tasks Shown", "None, not supported for re-partition "
"queries", es);
}
else if (ExplainAllTasks || taskCount <= 1)
{
ExplainPropertyText("Tasks Shown", "All", es);
}
else
{
StringInfo tasksShownText = makeStringInfo();
appendStringInfo(tasksShownText, "One of %d", taskCount);
ExplainPropertyText("Tasks Shown", tasksShownText->data, es);
}
/*
* We cannot fetch EXPLAIN plans for jobs that have dependencies, since the
* intermediate tables have not been created.
*/
if (dependedJobCount == 0)
{
ExplainOpenGroup("Tasks", "Tasks", false, es);
ExplainTaskList(taskList, es);
ExplainCloseGroup("Tasks", "Tasks", false, es);
}
ExplainCloseGroup("Job", NULL, true, es);
/* show explain output for depended jobs, if any */
foreach(dependedJobCell, dependedJobList)
{
Job *dependedJob = (Job *) lfirst(dependedJobCell);
if (CitusIsA(dependedJob, MapMergeJob))
{
ExplainMapMergeJob((MapMergeJob *) dependedJob, es);
}
}
}
/*
* ExplainMapMergeJob shows a very basic EXPLAIN plan for a MapMergeJob. It does
* not yet show the EXPLAIN plan for the individual tasks, because this requires
* specific logic for getting the query (which is wrapped in a UDF), and the
* queries may use intermediate tables that have not been created.
*/
static void
ExplainMapMergeJob(MapMergeJob *mapMergeJob, ExplainState *es)
{
List *dependedJobList = mapMergeJob->job.dependedJobList;
ListCell *dependedJobCell = NULL;
int mapTaskCount = list_length(mapMergeJob->mapTaskList);
int mergeTaskCount = list_length(mapMergeJob->mergeTaskList);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "-> MapMergeJob\n");
es->indent += 3;
}
ExplainOpenGroup("MapMergeJob", NULL, true, es);
ExplainPropertyInteger("Map Task Count", mapTaskCount, es);
ExplainPropertyInteger("Merge Task Count", mergeTaskCount, es);
ExplainCloseGroup("Job", NULL, true, es);
foreach(dependedJobCell, dependedJobList)
{
Job *dependedJob = (Job *) lfirst(dependedJobCell);
if (CitusIsA(dependedJob, MapMergeJob))
{
ExplainMapMergeJob((MapMergeJob *) dependedJob, es);
}
}
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent -= 3;
}
}
/*
* ExplainTaskList shows the remote EXPLAIN for the first task in taskList,
* or all tasks if citus.explain_all_tasks is on.
*/
static void
ExplainTaskList(List *taskList, ExplainState *es)
{
ListCell *taskCell = NULL;
ListCell *remoteExplainCell = NULL;
List *remoteExplainList = NIL;
foreach(taskCell, taskList)
{
Task *task = (Task *) lfirst(taskCell);
RemoteExplainPlan *remoteExplain = NULL;
remoteExplain = RemoteExplain(task, es);
remoteExplainList = lappend(remoteExplainList, remoteExplain);
if (!ExplainAllTasks)
{
break;
}
}
forboth(taskCell, taskList, remoteExplainCell, remoteExplainList)
{
Task *task = (Task *) lfirst(taskCell);
RemoteExplainPlan *remoteExplain =
(RemoteExplainPlan *) lfirst(remoteExplainCell);
ExplainTask(task, remoteExplain->placementIndex,
remoteExplain->explainOutputList, es);
}
}
/*
* RemoteExplain fetches the the remote EXPLAIN output for a single
* task. It tries each shard placement until one succeeds or all
* failed.
*/
static RemoteExplainPlan *
RemoteExplain(Task *task, ExplainState *es)
{
StringInfo explainQuery = NULL;
List *taskPlacementList = task->taskPlacementList;
int placementCount = list_length(taskPlacementList);
int placementIndex = 0;
RemoteExplainPlan *remotePlan = NULL;
remotePlan = (RemoteExplainPlan *) palloc0(sizeof(RemoteExplainPlan));
explainQuery = BuildRemoteExplainQuery(task->queryString, es);
for (placementIndex = 0; placementIndex < placementCount; placementIndex++)
{
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
char *nodeName = taskPlacement->nodeName;
uint32 nodePort = taskPlacement->nodePort;
remotePlan->placementIndex = placementIndex;
remotePlan->explainOutputList = ExecuteRemoteQuery(nodeName, nodePort,
NULL, explainQuery);
if (remotePlan->explainOutputList != NIL)
{
break;
}
}
return remotePlan;
}
/*
* ExplainTask shows the EXPLAIN output for an single task. The output has been
* fetched from the placement at index placementIndex. If explainOutputList is NIL,
* then the EXPLAIN output could not be fetched from any placement.
*/
static void
ExplainTask(Task *task, int placementIndex, List *explainOutputList, ExplainState *es)
{
ExplainOpenGroup("Task", NULL, true, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
appendStringInfo(es->str, "-> Task\n");
es->indent += 3;
}
if (explainOutputList != NIL)
{
List *taskPlacementList = task->taskPlacementList;
ShardPlacement *taskPlacement = list_nth(taskPlacementList, placementIndex);
ExplainTaskPlacement(taskPlacement, explainOutputList, es);
}
else
{
ExplainPropertyText("Error", "Could not get remote plan.", es);
}
ExplainCloseGroup("Task", NULL, true, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent -= 3;
}
}
/*
* ExplainTaskPlacement shows the EXPLAIN output for an individual task placement.
* It corrects the indentation of the remote explain output to match the local
* output.
*/
static void
ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
ExplainState *es)
{
int savedIndentation = es->indent;
StringInfo nodeAddress = makeStringInfo();
char *nodeName = taskPlacement->nodeName;
uint32 nodePort = taskPlacement->nodePort;
char *nodeDatabase = get_database_name(MyDatabaseId);
ListCell *explainOutputCell = NULL;
int rowIndex = 0;
appendStringInfo(nodeAddress, "host=%s port=%d dbname=%s", nodeName, nodePort,
nodeDatabase);
ExplainPropertyText("Node", nodeAddress->data, es);
ExplainOpenGroup("Remote Plan", "Remote Plan", false, es);
if (es->format == EXPLAIN_FORMAT_JSON || es->format == EXPLAIN_FORMAT_YAML)
{
/* prevent appending the remote EXPLAIN on the same line */
appendStringInfoChar(es->str, '\n');
}
foreach(explainOutputCell, explainOutputList)
{
StringInfo rowString = (StringInfo) lfirst(explainOutputCell);
int rowLength = 0;
char *lineStart = NULL;
rowLength = strlen(rowString->data);
lineStart = rowString->data;
/* parse the lines in the remote EXPLAIN for proper indentation */
while (lineStart < rowString->data + rowLength)
{
/* find the end-of-line */
char *lineEnd = strchr(lineStart, '\n');
if (lineEnd == NULL)
{
/* no end-of-line, use end of row string instead */
lineEnd = rowString->data + rowLength;
}
/* convert line to a separate string */
*lineEnd = '\0';
/* indentation that is applied to all lines */
appendStringInfoSpaces(es->str, es->indent * 2);
if (es->format == EXPLAIN_FORMAT_TEXT && rowIndex == 0)
{
/* indent the first line of the remote plan with an arrow */
appendStringInfoString(es->str, "-> ");
es->indent += 2;
}
/* show line in the output */
appendStringInfo(es->str, "%s\n", lineStart);
/* continue at the start of the next line */
lineStart = lineEnd + 1;
}
rowIndex++;
}
ExplainCloseGroup("Remote Plan", "Remote Plan", false, es);
if (es->format == EXPLAIN_FORMAT_TEXT)
{
es->indent = savedIndentation;
}
}
/*
* BuildRemoteExplainQuery returns an EXPLAIN query string
* to run on a worker node which explicitly contains all
* the options in the explain state.
*/
static StringInfo
BuildRemoteExplainQuery(char *queryString, ExplainState *es)
{
StringInfo explainQuery = makeStringInfo();
char *formatStr = NULL;
switch (es->format)
{
case EXPLAIN_FORMAT_XML:
{
formatStr = "XML";
}
break;
case EXPLAIN_FORMAT_JSON:
{
formatStr = "JSON";
}
break;
case EXPLAIN_FORMAT_YAML:
{
formatStr = "YAML";
}
break;
default:
{
formatStr = "TEXT";
}
break;
}
appendStringInfo(explainQuery,
"EXPLAIN (ANALYZE %s, VERBOSE %s, "
"COSTS %s, BUFFERS %s, TIMING %s, "
"FORMAT %s) %s",
es->analyze ? "TRUE" : "FALSE",
es->verbose ? "TRUE" : "FALSE",
es->costs ? "TRUE" : "FALSE",
es->buffers ? "TRUE" : "FALSE",
es->timing ? "TRUE" : "FALSE",
formatStr,
queryString);
return explainQuery;
}
/* below are private functions copied from explain.c */
/* *INDENT-OFF* */
/*
* Open a group of related objects.
*
* objtype is the type of the group object, labelname is its label within
* a containing object (if any).
*
* If labeled is true, the group members will be labeled properties,
* while if it's false, they'll be unlabeled objects.
*/
static void
ExplainOpenGroup(const char *objtype, const char *labelname,
bool labeled, ExplainState *es)
{
switch (es->format)
{
case EXPLAIN_FORMAT_TEXT:
/* nothing to do */
break;
case EXPLAIN_FORMAT_XML:
ExplainXMLTag(objtype, X_OPENING, es);
es->indent++;
break;
case EXPLAIN_FORMAT_JSON:
ExplainJSONLineEnding(es);
appendStringInfoSpaces(es->str, 2 * es->indent);
if (labelname)
{
escape_json(es->str, labelname);
appendStringInfoString(es->str, ": ");
}
appendStringInfoChar(es->str, labeled ? '{' : '[');
/*
* In JSON format, the grouping_stack is an integer list. 0 means
* we've emitted nothing at this grouping level, 1 means we've
* emitted something (and so the next item needs a comma). See
* ExplainJSONLineEnding().
*/
es->grouping_stack = lcons_int(0, es->grouping_stack);
es->indent++;
break;
case EXPLAIN_FORMAT_YAML:
/*
* In YAML format, the grouping stack is an integer list. 0 means
* we've emitted nothing at this grouping level AND this grouping
* level is unlabelled and must be marked with "- ". See
* ExplainYAMLLineStarting().
*/
ExplainYAMLLineStarting(es);
if (labelname)
{
appendStringInfo(es->str, "%s: ", labelname);
es->grouping_stack = lcons_int(1, es->grouping_stack);
}
else
{
appendStringInfoString(es->str, "- ");
es->grouping_stack = lcons_int(0, es->grouping_stack);
}
es->indent++;
break;
}
}
/*
* Close a group of related objects.
* Parameters must match the corresponding ExplainOpenGroup call.
*/
static void
ExplainCloseGroup(const char *objtype, const char *labelname,
bool labeled, ExplainState *es)
{
switch (es->format)
{
case EXPLAIN_FORMAT_TEXT:
/* nothing to do */
break;
case EXPLAIN_FORMAT_XML:
es->indent--;
ExplainXMLTag(objtype, X_CLOSING, es);
break;
case EXPLAIN_FORMAT_JSON:
es->indent--;
appendStringInfoChar(es->str, '\n');
appendStringInfoSpaces(es->str, 2 * es->indent);
appendStringInfoChar(es->str, labeled ? '}' : ']');
es->grouping_stack = list_delete_first(es->grouping_stack);
break;
case EXPLAIN_FORMAT_YAML:
es->indent--;
es->grouping_stack = list_delete_first(es->grouping_stack);
break;
}
}
/*
* Emit opening or closing XML tag.
*
* "flags" must contain X_OPENING, X_CLOSING, or X_CLOSE_IMMEDIATE.
* Optionally, OR in X_NOWHITESPACE to suppress the whitespace we'd normally
* add.
*
* XML tag names can't contain white space, so we replace any spaces in
* "tagname" with dashes.
*/
static void
ExplainXMLTag(const char *tagname, int flags, ExplainState *es)
{
const char *s;
if ((flags & X_NOWHITESPACE) == 0)
appendStringInfoSpaces(es->str, 2 * es->indent);
appendStringInfoCharMacro(es->str, '<');
if ((flags & X_CLOSING) != 0)
appendStringInfoCharMacro(es->str, '/');
for (s = tagname; *s; s++)
appendStringInfoCharMacro(es->str, (*s == ' ') ? '-' : *s);
if ((flags & X_CLOSE_IMMEDIATE) != 0)
appendStringInfoString(es->str, " /");
appendStringInfoCharMacro(es->str, '>');
if ((flags & X_NOWHITESPACE) == 0)
appendStringInfoCharMacro(es->str, '\n');
}
/*
* Emit a JSON line ending.
*
* JSON requires a comma after each property but the last. To facilitate this,
* in JSON format, the text emitted for each property begins just prior to the
* preceding line-break (and comma, if applicable).
*/
static void
ExplainJSONLineEnding(ExplainState *es)
{
Assert(es->format == EXPLAIN_FORMAT_JSON);
if (linitial_int(es->grouping_stack) != 0)
appendStringInfoChar(es->str, ',');
else
linitial_int(es->grouping_stack) = 1;
appendStringInfoChar(es->str, '\n');
}
/*
* Indent a YAML line.
*
* YAML lines are ordinarily indented by two spaces per indentation level.
* The text emitted for each property begins just prior to the preceding
* line-break, except for the first property in an unlabelled group, for which
* it begins immediately after the "- " that introduces the group. The first
* property of the group appears on the same line as the opening "- ".
*/
static void
ExplainYAMLLineStarting(ExplainState *es)
{
Assert(es->format == EXPLAIN_FORMAT_YAML);
if (linitial_int(es->grouping_stack) == 0)
{
linitial_int(es->grouping_stack) = 1;
}
else
{
appendStringInfoChar(es->str, '\n');
appendStringInfoSpaces(es->str, es->indent * 2);
}
}

View File

@ -34,9 +34,7 @@ static void CheckNodeIsDumpable(Node *node);
/* local function forward declarations */
static MultiPlan * CreatePhysicalPlan(Query *parse);
static char * GetMultiPlanString(PlannedStmt *result);
static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan);
/* Distributed planner hook */
@ -70,7 +68,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* target shards. SELECT queries go through the full logical plan/optimize/
* physical plan process needed to produce distributed query plans.
*/
static MultiPlan *
MultiPlan *
CreatePhysicalPlan(Query *parse)
{
Query *parseCopy = copyObject(parse);
@ -162,7 +160,7 @@ HasCitusToplevelNode(PlannedStmt *result)
* function, which has to be removed from the really executed plan tree before
* query execution.
*/
static PlannedStmt *
PlannedStmt *
MultiQueryContainerNode(PlannedStmt *result, MultiPlan *multiPlan)
{
FunctionScan *fauxFunctionScan = NULL;

View File

@ -272,6 +272,31 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."),
gettext_noop("When enabled, the Explain command shows remote and local "
"plans when used with a distributed query. It is enabled "
"by default, but can be disabled for regression tests."),
&ExplainDistributedQueries,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.explain_all_tasks",
gettext_noop("Enables showing output for all tasks in Explain."),
gettext_noop("The Explain command for distributed queries shows "
"the remote plan for a single task by default. When "
"this configuration entry is enabled, the plan for "
"all tasks is shown, but the Explain takes longer."),
&ExplainAllTasks,
false,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.all_modifications_commutative",
gettext_noop("Bypasses commutativity checks when enabled"),

View File

@ -15,6 +15,8 @@
/* Config variables managed via guc.c to explain distributed query plans */
extern bool ExplainMultiLogicalPlan;
extern bool ExplainMultiPhysicalPlan;
extern bool ExplainDistributedQueries;
extern bool ExplainAllTasks;
extern void MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params);

View File

@ -18,6 +18,9 @@ extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan;
extern struct MultiPlan * CreatePhysicalPlan(Query *parse);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan);
#endif /* MULTI_PLANNER_H */

View File

@ -191,6 +191,7 @@ extern void MultiRealTimeExecute(Job *job);
extern void MultiTaskTrackerExecute(Job *job);
/* Function declarations common to more than one executor */
extern bool RouterExecutablePlan(MultiPlan *multiPlan, MultiExecutorType executorType);
extern MultiExecutorType JobExecutorType(MultiPlan *multiPlan);
extern void RemoveJobDirectory(uint64 jobId);
extern TaskExecution * InitTaskExecution(Task *task, TaskExecStatus initialStatus);

View File

@ -0,0 +1,306 @@
--
-- MULTI_COPY
--
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_hash".
HINT: Run master_create_worker_shards to create shards and try again.
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- Test empty copy
COPY customer_copy_hash FROM STDIN;
-- Test syntax error
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
ERROR: invalid input syntax for integer: "1,customer1"
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
0
(1 row)
-- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
DETAIL: Key (c_custkey)=(2) already exists.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
0
(1 row)
-- Test headers option
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
-- Confirm that only first row was skipped
SELECT count(*) FROM customer_copy_hash;
count
-------
3
(1 row)
-- Test force_not_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
-- Confirm that value is not null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
count
-------
1
(1 row)
-- Test force_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
-- Confirm that value is null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
count
-------
0
(1 row)
-- Test null violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
ERROR: null value in column "c_name" violates not-null constraint
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
5
(1 row)
-- Test server-side copy from program
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
WITH (DELIMITER ' ');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
count
-------
1
(1 row)
-- Test server-side copy from file
COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.2.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
1006
(1 row)
-- Test client-side copy from file
\COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.3.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
count
-------
2006
(1 row)
-- Create a new hash-partitioned table with default now() function
CREATE TABLE customer_with_default(
c_custkey integer,
c_name varchar(25) not null,
c_time timestamp default now());
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('customer_with_default', 64, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- Test with default values for now() function
COPY customer_with_default (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
-- Confirm that data was copied with now() function
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
count
-------
2
(1 row)
-- Add columns to the table and perform a COPY
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+--------
10 | customer10 | | | | | | | 1 | 5
(1 row)
-- Test dropping an intermediate column
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------
11 | customer11 | | | | | | | 5
(1 row)
-- Test dropping the last column
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
-----------+------------+-----------+-------------+---------+-----------+--------------+-----------
12 | customer12 | | | | | |
(1 row)
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,
c_name varchar(25),
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
master_create_distributed_table
---------------------------------
(1 row)
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
ERROR: could not find any shards into which to copy
DETAIL: No shards exist for distributed table "customer_copy_range".
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
WHERE shardid = :new_shard_id;
-- Test copy into range-partitioned table
COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
-- Check whether data went into the right shard (maybe)
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
FROM customer_copy_range WHERE c_custkey <= 500;
min | max | avg | count
-----+-----+----------------------+-------
1 | 500 | 250.5000000000000000 | 500
(1 row)
-- Check whether data was copied
SELECT count(*) FROM customer_copy_range;
count
-------
1000
(1 row)
-- Create a new append-partitioned table into which to COPY
CREATE TABLE customer_copy_append (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Test syntax error
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
ERROR: invalid input syntax for integer: "notinteger"
CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger"
-- Test that no shard is created for failing copy
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
count
-------
0
(1 row)
-- Test empty copy
COPY customer_copy_append FROM STDIN;
-- Test that no shard is created for copying zero rows
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
count
-------
0
(1 row)
-- Test proper copy
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
-- Check whether data was copied properly
SELECT * FROM customer_copy_append;
c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment
-----------+-----------+-----------+-------------+---------+-----------+--------------+-----------
1 | customer1 | | | | | |
2 | customer2 | | | | | |
(2 rows)
-- Create lineitem table
CREATE TABLE lineitem_copy_append (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null);
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Test multiple shard creation
SET citus.shard_max_size TO '256kB';
COPY lineitem_copy_append FROM '/home/marco/citus/citus-explain4/src/test/regress/data/lineitem.1.data' with delimiter '|';
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
count
-------
5
(1 row)

View File

@ -0,0 +1,378 @@
--
-- MULTI_EXPLAIN
--
\a\t
SET citus.explain_distributed_queries TO on;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_0040
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_102010 lineitem
Master Query
-> Sort
Sort Key: (sum(((sum(intermediate_column_40_1))::bigint)))::bigint, intermediate_column_40_0
-> HashAggregate
Group Key: intermediate_column_40_0
-> Seq Scan on pg_merge_job_0040
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
[
"Executor": "Real-Time",
{
"Task Count": 6,
"Tasks Shown": "One of 6",
"Tasks": [
{
"Node": "host=localhost port=57637 dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_102010",
"Alias": "lineitem"
}
]
}
}
]
]
}
]
},
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["(sum(((sum(intermediate_column_41_1))::bigint)))::bigint", "intermediate_column_41_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Parent Relationship": "Outer",
"Group Key": ["intermediate_column_41_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_0041",
"Alias": "pg_merge_job_0041"
}
]
}
]
}
}
]
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Executor>Real-Time</Executor>
<Job>
<Task-Count>6</Task-Count>
<Tasks-Shown>One of 6</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=57637 dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_102010</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Sort-Key>
<Item>(sum(((sum(intermediate_column_42_1))::bigint)))::bigint</Item>
<Item>intermediate_column_42_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Parent-Relationship>Outer</Parent-Relationship>
<Group-Key>
<Item>intermediate_column_42_0</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_0042</Relation-Name>
<Alias>pg_merge_job_0042</Alias>
</Plan>
</Plans>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Executor: "Real-Time"
- Task Count: 6
Tasks Shown: "One of 6"
Tasks:
- Node: "host=localhost port=57637 dbname=regression"
Remote Plan:
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_102010"
Alias: "lineitem"
- Plan:
Node Type: "Sort"
Sort Key:
- "(sum(((sum(intermediate_column_43_1))::bigint)))::bigint"
- "intermediate_column_43_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Parent Relationship: "Outer"
Group Key:
- "intermediate_column_43_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_0043"
Alias: "pg_merge_job_0043"
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_0044
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_102010 lineitem
Master Query
-> Sort
Sort Key: (sum(((sum(intermediate_column_44_1))::bigint)))::bigint, intermediate_column_44_0
-> HashAggregate
Group Key: intermediate_column_44_0
-> Seq Scan on pg_merge_job_0044
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Distributed Query into pg_merge_job_0045
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_102010 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_45_0) / (sum(intermediate_column_45_1) / sum(intermediate_column_45_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_0045
Output: intermediate_column_45_0, intermediate_column_45_1, intermediate_column_45_2
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
ORDER BY l_quantity DESC LIMIT 10;
Distributed Query into pg_merge_job_0046
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem.l_quantity DESC
-> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_102010 lineitem
Filter: (l_quantity < '5'::numeric)
-> Hash
-> Seq Scan on orders_102015 orders
Master Query
-> Limit
-> Sort
Sort Key: intermediate_column_46_4 DESC
-> Seq Scan on pg_merge_job_0046
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_102009
-> Result
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_102009
-> Bitmap Heap Scan on lineitem_102009
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_102009
Index Cond: (l_orderkey = 1)
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_102009
-> Bitmap Heap Scan on lineitem_102009
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_102009
Index Cond: (l_orderkey = 1)
-- Test single-shard SELECT
EXPLAIN
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Distributed Query into pg_merge_job_0047
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_102009 lineitem (cost=4.31..19.58 rows=5 width=18)
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_102009 (cost=0.00..4.31 rows=5 width=0)
Index Cond: (l_orderkey = 5)
-- Test CREATE TABLE ... AS
EXPLAIN CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Distributed Query into pg_merge_job_0048
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_102010 lineitem (cost=0.00..58.80 rows=980 width=374)
Master Query
-> Seq Scan on pg_merge_job_0048 (cost=0.00..0.00 rows=0 width=0)
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_0049
Executor: Real-Time
Task Count: 3
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102012 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102013 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102014 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0049
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_0050
Executor: Task-Tracker
Task Count: 3
Tasks Shown: One of 3
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102012 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0050
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Distributed Query into pg_merge_job_0053
Executor: Task-Tracker
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 1
Merge Task Count: 1
-> MapMergeJob
Map Task Count: 6
Merge Task Count: 1
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0053

View File

@ -0,0 +1,378 @@
--
-- MULTI_EXPLAIN
--
\a\t
SET citus.explain_distributed_queries TO on;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_0040
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_102010 lineitem
Master Query
-> Sort
Sort Key: (sum(((sum(intermediate_column_40_1))::bigint)))::bigint, intermediate_column_40_0
-> HashAggregate
Group Key: intermediate_column_40_0
-> Seq Scan on pg_merge_job_0040
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
[
"Executor": "Real-Time",
{
"Task Count": 6,
"Tasks Shown": "One of 6",
"Tasks": [
{
"Node": "host=localhost port=57637 dbname=regression",
"Remote Plan": [
[
{
"Plan": {
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Group Key": ["l_quantity"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "lineitem_102010",
"Alias": "lineitem"
}
]
}
}
]
]
}
]
},
{
"Plan": {
"Node Type": "Sort",
"Sort Key": ["(sum(((sum(intermediate_column_41_1))::bigint)))::bigint", "intermediate_column_41_0"],
"Plans": [
{
"Node Type": "Aggregate",
"Strategy": "Hashed",
"Parent Relationship": "Outer",
"Group Key": ["intermediate_column_41_0"],
"Plans": [
{
"Node Type": "Seq Scan",
"Parent Relationship": "Outer",
"Relation Name": "pg_merge_job_0041",
"Alias": "pg_merge_job_0041"
}
]
}
]
}
}
]
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
<explain xmlns="http://www.postgresql.org/2009/explain">
<Executor>Real-Time</Executor>
<Job>
<Task-Count>6</Task-Count>
<Tasks-Shown>One of 6</Tasks-Shown>
<Tasks>
<Task>
<Node>host=localhost port=57637 dbname=regression</Node>
<Remote-Plan>
<explain xmlns="http://www.postgresql.org/2009/explain">
<Query>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Group-Key>
<Item>l_quantity</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>lineitem_102010</Relation-Name>
<Alias>lineitem</Alias>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
</Remote-Plan>
</Task>
</Tasks>
</Job>
<Query>
<Plan>
<Node-Type>Sort</Node-Type>
<Sort-Key>
<Item>(sum(((sum(intermediate_column_42_1))::bigint)))::bigint</Item>
<Item>intermediate_column_42_0</Item>
</Sort-Key>
<Plans>
<Plan>
<Node-Type>Aggregate</Node-Type>
<Strategy>Hashed</Strategy>
<Parent-Relationship>Outer</Parent-Relationship>
<Group-Key>
<Item>intermediate_column_42_0</Item>
</Group-Key>
<Plans>
<Plan>
<Node-Type>Seq Scan</Node-Type>
<Parent-Relationship>Outer</Parent-Relationship>
<Relation-Name>pg_merge_job_0042</Relation-Name>
<Alias>pg_merge_job_0042</Alias>
</Plan>
</Plans>
</Plan>
</Plans>
</Plan>
</Query>
</explain>
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Executor: "Real-Time"
- Task Count: 6
Tasks Shown: "One of 6"
Tasks:
- Node: "host=localhost port=57637 dbname=regression"
Remote Plan:
- Plan:
Node Type: "Aggregate"
Strategy: "Hashed"
Group Key:
- "l_quantity"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "lineitem_102010"
Alias: "lineitem"
- Plan:
Node Type: "Sort"
Sort Key:
- "(sum(((sum(intermediate_column_43_1))::bigint)))::bigint"
- "intermediate_column_43_0"
Plans:
- Node Type: "Aggregate"
Strategy: "Hashed"
Parent Relationship: "Outer"
Group Key:
- "intermediate_column_43_0"
Plans:
- Node Type: "Seq Scan"
Parent Relationship: "Outer"
Relation Name: "pg_merge_job_0043"
Alias: "pg_merge_job_0043"
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
Distributed Query into pg_merge_job_0044
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> HashAggregate
Group Key: l_quantity
-> Seq Scan on lineitem_102010 lineitem
Master Query
-> Sort
Sort Key: (sum(((sum(intermediate_column_44_1))::bigint)))::bigint, intermediate_column_44_0
-> HashAggregate
Group Key: intermediate_column_44_0
-> Seq Scan on pg_merge_job_0044
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
Distributed Query into pg_merge_job_0045
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
Output: sum(l_quantity), sum(l_quantity), count(l_quantity)
-> Seq Scan on public.lineitem_102010 lineitem
Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment
Master Query
-> Aggregate
Output: (sum(intermediate_column_45_0) / (sum(intermediate_column_45_1) / sum(intermediate_column_45_2)))
-> Seq Scan on pg_temp_2.pg_merge_job_0045
Output: intermediate_column_45_0, intermediate_column_45_1, intermediate_column_45_2
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
ORDER BY l_quantity DESC LIMIT 10;
Distributed Query into pg_merge_job_0046
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Limit
-> Sort
Sort Key: lineitem.l_quantity
-> Hash Join
Hash Cond: (lineitem.l_orderkey = orders.o_orderkey)
-> Seq Scan on lineitem_102010 lineitem
Filter: (l_quantity < 5::numeric)
-> Hash
-> Seq Scan on orders_102015 orders
Master Query
-> Limit
-> Sort
Sort Key: intermediate_column_46_4
-> Seq Scan on pg_merge_job_0046
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Insert on lineitem_102009
-> Result
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Update on lineitem_102009
-> Bitmap Heap Scan on lineitem_102009
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_102009
Index Cond: (l_orderkey = 1)
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
Distributed Query
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57638 dbname=regression
-> Delete on lineitem_102009
-> Bitmap Heap Scan on lineitem_102009
Recheck Cond: (l_orderkey = 1)
Filter: (l_partkey = 0)
-> Bitmap Index Scan on lineitem_pkey_102009
Index Cond: (l_orderkey = 1)
-- Test single-shard SELECT
EXPLAIN
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
Distributed Query into pg_merge_job_0047
Executor: Router
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Bitmap Heap Scan on lineitem_102009 lineitem (cost=4.31..19.58 rows=5 width=18)
Recheck Cond: (l_orderkey = 5)
-> Bitmap Index Scan on lineitem_pkey_102009 (cost=0.00..4.31 rows=5 width=0)
Index Cond: (l_orderkey = 5)
-- Test CREATE TABLE ... AS
EXPLAIN CREATE TABLE explain_result AS
SELECT * FROM lineitem;
Distributed Query into pg_merge_job_0048
Executor: Real-Time
Task Count: 6
Tasks Shown: One of 6
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on lineitem_102010 lineitem (cost=0.00..58.80 rows=980 width=374)
Master Query
-> Seq Scan on pg_merge_job_0048 (cost=0.00..0.00 rows=0 width=0)
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_0049
Executor: Real-Time
Task Count: 3
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102012 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57638 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102013 lineitem
Filter: (l_orderkey > 9030)
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102014 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0049
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
Distributed Query into pg_merge_job_0050
Executor: Task-Tracker
Task Count: 3
Tasks Shown: One of 3
-> Task
Node: host=localhost port=57637 dbname=regression
-> Aggregate
-> Seq Scan on lineitem_102012 lineitem
Filter: (l_orderkey > 9030)
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0050
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;
Distributed Query into pg_merge_job_0053
Executor: Task-Tracker
Task Count: 1
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 1
Merge Task Count: 1
-> MapMergeJob
Map Task Count: 6
Merge Task Count: 1
Master Query
-> Aggregate
-> Seq Scan on pg_merge_job_0053

View File

@ -16,248 +16,247 @@ CREATE TABLE orders_hash_partitioned (
o_clerk char(15),
o_shippriority integer,
o_comment varchar(79) );
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'append');
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
UPDATE pg_dist_partition SET partmethod = 'h'
WHERE logicalrelid = 'orders_hash_partitioned'::regclass;
-- Create logical shards with shardids 110, 111, 112 and 113
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
VALUES ('orders_hash_partitioned'::regclass, 110, 't', -1905060026, -1905060026),
('orders_hash_partitioned'::regclass, 111, 't', 1134484726, 1134484726),
('orders_hash_partitioned'::regclass, 112, 't', -1905060026, -28094569),
('orders_hash_partitioned'::regclass, 113, 't', -1011077333, 0);
-- Create shard placements for shards 110, 111, 112 and 113
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 110, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 111, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 112, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 113, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
SET client_min_messages TO DEBUG2;
-- Check that we can prune shards for simple cases, boolean expressions and
-- immutable functions.
EXPLAIN SELECT count(*) FROM orders_hash_partitioned;
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned;
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: Plan is router executable
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
DEBUG: predicate pruning for shardId 112
DEBUG: predicate pruning for shardId 110
DEBUG: predicate pruning for shardId 113
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: Plan is router executable
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
DEBUG: predicate pruning for shardId 110
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: Plan is router executable
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
DEBUG: predicate pruning for shardId 110
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: Plan is router executable
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
DEBUG: predicate pruning for shardId 112
DEBUG: predicate pruning for shardId 110
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102036
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2;
DEBUG: predicate pruning for shardId 113
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: Plan is router executable
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey is NULL;
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102036
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM
SELECT count(*) FROM
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
DEBUG: Creating router plan
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: Plan is router executable
count
-------
0
(1 row)
-- Check that we don't support pruning for ANY (array expression) and give
-- a notice message when used with the partition column
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}');
NOTICE: cannot use shard pruning with ANY/ALL (array expression)
HINT: Consider rewriting the expression with OR/AND clauses.
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
count
-------
0
(1 row)
-- Check that we don't show the message if the operator is not
-- equality operator
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey < ALL ('{1,2,3}');
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
count
-------
0
(1 row)
-- Check that we don't give a spurious hint message when non-partition
-- columns are used with ANY/IN/ALL
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
count
-------
0
(1 row)
-- Check that we cannot prune for mutable functions.
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() OR o_orderkey = 1;
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
count
-------
0
(1 row)
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() AND o_orderkey = 1;
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
count
-------
0
(1 row)
-- Check that we can do join pruning.
EXPLAIN SELECT count(*)
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey;
DEBUG: join prunable for intervals [-1905060026,-28094569] and [1134484726,1134484726]
DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0]
DEBUG: join prunable for intervals [-1905060026,-1905060026] and [1134484726,1134484726]
DEBUG: join prunable for intervals [-1011077333,0] and [-1905060026,-1905060026]
DEBUG: join prunable for intervals [-1011077333,0] and [1134484726,1134484726]
DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-28094569]
DEBUG: join prunable for intervals [1134484726,1134484726] and [-1905060026,-1905060026]
DEBUG: join prunable for intervals [1134484726,1134484726] and [-1011077333,0]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [-1073741824,-1]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [1073741824,2147483647]
DEBUG: join prunable for intervals [-1073741824,-1] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [-1073741824,-1] and [0,1073741823]
DEBUG: join prunable for intervals [-1073741824,-1] and [1073741824,2147483647]
DEBUG: join prunable for intervals [0,1073741823] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [0,1073741823] and [-1073741824,-1]
DEBUG: join prunable for intervals [0,1073741823] and [1073741824,2147483647]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-1073741825]
DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1]
DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823]
count
-------
0
(1 row)
EXPLAIN SELECT count(*)
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey
AND orders1.o_orderkey = 1
AND orders2.o_orderkey is NULL;
DEBUG: predicate pruning for shardId 113
DEBUG: predicate pruning for shardId 111
DEBUG: predicate pruning for shardId 112
DEBUG: predicate pruning for shardId 110
DEBUG: predicate pruning for shardId 111
DEBUG: join prunable for intervals [-1905060026,-1905060026] and [-1011077333,0]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102035
DEBUG: predicate pruning for shardId 102036
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102036
DEBUG: join prunable for intervals [-2147483648,-1073741825] and [0,1073741823]
count
-------
(1 row)
SET client_min_messages TO NOTICE;

View File

@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_ADDITIONAL
--
-- Set configuration to print table join order and pruned shards
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2;
-- The following query checks that we can correctly handle self-joins
@ -37,8 +38,8 @@ DEBUG: join prunable for intervals [13921,14947] and [4965,5986]
DEBUG: join prunable for intervals [13921,14947] and [8997,11554]
DEBUG: join prunable for intervals [13921,14947] and [11554,13920]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Update configuration to treat lineitem and orders tables as large
@ -52,8 +53,8 @@ EXPLAIN SELECT count(*) FROM lineitem, orders
OR (l_orderkey = o_orderkey AND l_quantity < 10);
LOG: join order: [ "lineitem" ][ local partition join "orders" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT l_quantity FROM lineitem, orders
@ -73,8 +74,8 @@ EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ single partition join "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Verify we handle local joins between two hash-partitioned tables.
@ -84,8 +85,8 @@ EXPLAIN SELECT count(*) FROM orders, lineitem
WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ local partition join "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@ -99,8 +100,8 @@ EXPLAIN SELECT count(*) FROM customer, nation
WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer" ][ broadcast join "nation" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@ -113,8 +114,8 @@ EXPLAIN SELECT count(*) FROM orders, lineitem, customer
WHERE o_custkey = l_partkey AND o_custkey = c_nationkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Validate that we don't chose a single-partition join method with a
@ -125,8 +126,8 @@ EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE
@ -139,8 +140,8 @@ EXPLAIN SELECT count(*) FROM orders, customer
WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ single partition join "customer" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
UPDATE pg_dist_partition SET partmethod = 'a' WHERE

View File

@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_TPCH_LARGE
--
-- Enable configuration to print table join order
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem, orders, customer, and part tables as
@ -21,8 +22,8 @@ WHERE
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #3 from the TPC-H decision support benchmark
@ -50,8 +51,8 @@ ORDER BY
o_orderdate;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #10 from the TPC-H decision support benchmark
@ -88,8 +89,8 @@ ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ single partition join "customer" ][ broadcast join "nation" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #19 from the TPC-H decision support benchmark (modified)
@ -124,8 +125,8 @@ WHERE
);
LOG: join order: [ "lineitem" ][ single partition join "part" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query to test multiple re-partition jobs in a single query
@ -141,8 +142,8 @@ GROUP BY
l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ single partition join "part" ][ single partition join "customer" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Reset client logging level to its previous value

View File

@ -2,6 +2,7 @@
-- MULTI_JOIN_ORDER_TPCH_SMALL
--
-- Enable configuration to print table join order
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;
-- Change configuration to treat lineitem and orders tables as large
@ -18,8 +19,8 @@ WHERE
and l_quantity < 24;
LOG: join order: [ "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #3 from the TPC-H decision support benchmark
@ -47,8 +48,8 @@ ORDER BY
o_orderdate;
LOG: join order: [ "orders" ][ broadcast join "customer" ][ local partition join "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #10 from the TPC-H decision support benchmark
@ -85,8 +86,8 @@ ORDER BY
revenue DESC;
LOG: join order: [ "orders" ][ broadcast join "customer" ][ broadcast join "nation" ][ local partition join "lineitem" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Query #19 from the TPC-H decision support benchmark (modified)
@ -121,8 +122,8 @@ WHERE
);
LOG: join order: [ "lineitem" ][ broadcast join "part" ]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Reset client logging level to its previous value

View File

@ -4,6 +4,7 @@
-- Check that join-pruning works for joins between two large relations. For now
-- we only check for join-pruning between locally partitioned relations. In the
-- future we want to check for pruning between re-partitioned relations as well.
SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Change configuration to treat all tables as large
SET citus.large_table_shard_count TO 2;
@ -75,8 +76,8 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}]
DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*)
@ -85,8 +86,8 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)]
DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Test that large table joins on partition varchar columns work
@ -96,8 +97,8 @@ EXPLAIN SELECT count(*)
DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6]
DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6]
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
SET client_min_messages TO NOTICE;

View File

@ -74,7 +74,7 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102035 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4
@ -90,9 +90,9 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102034 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint"
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: generated sql query for job 1252 and task 9
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102033 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint"
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT '30'::bigint"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4

View File

@ -74,7 +74,7 @@ DEBUG: join prunable for intervals [6001,7000] and [1,1000]
DEBUG: generated sql query for job 1251 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000019".intermediate_column_1250_0, "pg_merge_job_1250.task_000019".intermediate_column_1250_1, "pg_merge_job_1250.task_000019".intermediate_column_1250_2, "pg_merge_job_1250.task_000019".intermediate_column_1250_3, "pg_merge_job_1250.task_000019".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000019 "pg_merge_job_1250.task_000019" JOIN part_102019 part ON (("pg_merge_job_1250.task_000019".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: generated sql query for job 1251 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102035 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DETAIL: query string: "SELECT "pg_merge_job_1250.task_000026".intermediate_column_1250_0, "pg_merge_job_1250.task_000026".intermediate_column_1250_1, "pg_merge_job_1250.task_000026".intermediate_column_1250_2, "pg_merge_job_1250.task_000026".intermediate_column_1250_3, "pg_merge_job_1250.task_000026".intermediate_column_1250_4 FROM (pg_merge_job_1250.task_000026 "pg_merge_job_1250.task_000026" JOIN part_102039 part ON (("pg_merge_job_1250.task_000026".intermediate_column_1250_0 = part.p_partkey))) WHERE (part.p_size > 8)"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 19
DEBUG: pruning merge fetch taskId 4
@ -90,9 +90,9 @@ DEBUG: join prunable for intervals [6001,7000] and [1001,2000]
DEBUG: generated sql query for job 1252 and task 3
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000007".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000007 "pg_merge_job_1251.task_000007" JOIN customer_102017 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000007".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000007".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000007".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1, "pg_merge_job_1251.task_000007".intermediate_column_1251_0, "pg_merge_job_1251.task_000007".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 6
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102034 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint"
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000010".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000010 "pg_merge_job_1251.task_000010" JOIN customer_102038 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000010".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000010".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000010".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1, "pg_merge_job_1251.task_000010".intermediate_column_1251_0, "pg_merge_job_1251.task_000010".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: generated sql query for job 1252 and task 9
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102033 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint"
DETAIL: query string: "SELECT "pg_merge_job_1251.task_000013".intermediate_column_1251_0 AS l_partkey, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 AS o_orderkey, count(*) AS count FROM (pg_merge_job_1251.task_000013 "pg_merge_job_1251.task_000013" JOIN customer_102037 customer ON ((customer.c_custkey = "pg_merge_job_1251.task_000013".intermediate_column_1251_4))) WHERE ((("pg_merge_job_1251.task_000013".intermediate_column_1251_2 > 5.0) OR ("pg_merge_job_1251.task_000013".intermediate_column_1251_3 > 1200.0)) AND (customer.c_acctbal < 5000.0)) GROUP BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 ORDER BY "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1, "pg_merge_job_1251.task_000013".intermediate_column_1251_0, "pg_merge_job_1251.task_000013".intermediate_column_1251_1 LIMIT 30::bigint"
DEBUG: pruning merge fetch taskId 1
DETAIL: Creating dependency on merge taskId 7
DEBUG: pruning merge fetch taskId 4

View File

@ -57,8 +57,8 @@ WHERE
o_custkey = c_custkey AND
c_custkey < 0;
DEBUG: predicate pruning for shardId 102017
DEBUG: predicate pruning for shardId 102034
DEBUG: predicate pruning for shardId 102033
DEBUG: predicate pruning for shardId 102038
DEBUG: predicate pruning for shardId 102037
count
-------

View File

@ -3,6 +3,7 @@
--
-- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client.
SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Adding additional l_orderkey = 1 to make this query not router executable
SELECT l_orderkey, l_linenumber, l_shipdate FROM lineitem WHERE l_orderkey = 9030 or l_orderkey = 1;
@ -163,24 +164,24 @@ INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename,
EXPLAIN SELECT count(*) FROM varchar_partitioned_table WHERE varchar_column = 'BA2';
DEBUG: predicate pruning for shardId 100
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM array_partitioned_table
WHERE array_column > '{BA1000U2AMO4ZGX, BZZXSP27F21T6}';
DEBUG: predicate pruning for shardId 102
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM composite_partitioned_table
WHERE composite_column < '(b,5,c)'::composite_type;
DEBUG: predicate pruning for shardId 105
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
SET client_min_messages TO NOTICE;

View File

@ -1,6 +1,7 @@
--
-- MULTI_TASK_ASSIGNMENT
--
SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
-- and shard placement data into system catalogs. We next run Explain command,
@ -58,8 +59,8 @@ DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@ -70,8 +71,8 @@ DEBUG: assigned task 2 to node localhost:57638
DEBUG: assigned task 4 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Next test the first-replica task assignment policy
@ -87,8 +88,8 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@ -99,8 +100,8 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
-- Round-robin task assignment relies on the current jobId. We therefore need to
@ -130,8 +131,8 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@ -142,8 +143,8 @@ DEBUG: assigned task 4 to node localhost:57638
DEBUG: assigned task 2 to node localhost:57637
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
EXPLAIN SELECT count(*) FROM task_assignment_test_table;
@ -154,8 +155,8 @@ DEBUG: assigned task 4 to node localhost:57637
DEBUG: assigned task 2 to node localhost:57638
DEBUG: CommitTransactionCommand
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
------------------------------------------------------------
explain statements for distributed queries are not enabled
(1 row)
RESET citus.task_assignment_policy;

View File

@ -21,17 +21,6 @@ COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
-- cursors may not involve distributed tables
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
ERROR: DECLARE CURSOR can only be used in transaction blocks
-- EXPLAIN support isn't implemented for distributed queries...
EXPLAIN SELECT * FROM sharded_table;
QUERY PLAN
----------------------------------------------------------------------
explain statements for distributed queries are currently unsupported
(1 row)
-- ... or for distributed modifications
EXPLAIN INSERT INTO sharded_table VALUES ('dan', 4);
ERROR: cannot show execution plan for distributed modification
DETAIL: EXPLAIN commands are unsupported for distributed modifications.
-- verify PREPARE functionality
PREPARE sharded_insert AS INSERT INTO sharded_table VALUES ('adam', 1);
PREPARE sharded_update AS UPDATE sharded_table SET name = 'bob' WHERE id = 1;

View File

@ -42,7 +42,7 @@ SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');
SELECT count(*) FROM customer_delete_protocol;
-- Verify that empty shards are deleted if no condition is provided
SELECT master_create_empty_shard('customer_delete_protocol');
SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol
WHERE c_custkey > 1000');
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol');

View File

@ -26,6 +26,7 @@ test: multi_stage_data
# Miscellaneous tests to check our query planning behavior
# ----------
test: multi_basic_queries multi_complex_expressions multi_verify_no_subquery
test: multi_explain
test: multi_subquery
test: multi_single_relation_subquery
test: multi_agg_distinct multi_limit_clause multi_limit_clause_approximate

View File

@ -67,10 +67,10 @@ SELECT count(*) FROM customer_delete_protocol;
(1 row)
-- Verify that empty shards are deleted if no condition is provided
SELECT master_create_empty_shard('customer_delete_protocol');
master_create_empty_shard
---------------------------
102041
SELECT 1 AS one FROM master_create_empty_shard('customer_delete_protocol');
one
-----
1
(1 row)
SELECT master_apply_delete_command('DELETE FROM customer_delete_protocol

View File

@ -0,0 +1,256 @@
--
-- MULTI_COPY
--
-- Create a new hash-partitioned table into which to COPY
CREATE TABLE customer_copy_hash (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
-- Test COPY into empty hash-partitioned table
COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
-- Test empty copy
COPY customer_copy_hash FROM STDIN;
\.
-- Test syntax error
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
1,customer1
2,customer2,
notinteger,customernot
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test primary key violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
1,customer1
2,customer2
2,customer2
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test headers option
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
# header
1,customer1
2,customer2
3,customer3
\.
-- Confirm that only first row was skipped
SELECT count(*) FROM customer_copy_hash;
-- Test force_not_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
"4","customer4",""
\.
-- Confirm that value is not null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
-- Test force_null option
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
"5","customer5",""
\.
-- Confirm that value is null
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
-- Test null violation
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
6,customer6
7,customer7
8,
\.
-- Confirm that no data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test server-side copy from program
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
WITH (DELIMITER ' ');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
-- Test server-side copy from file
COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.2.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
-- Test client-side copy from file
\COPY customer_copy_hash FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.3.data' WITH (DELIMITER '|');
-- Confirm that data was copied
SELECT count(*) FROM customer_copy_hash;
-- Create a new hash-partitioned table with default now() function
CREATE TABLE customer_with_default(
c_custkey integer,
c_name varchar(25) not null,
c_time timestamp default now());
SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash');
SELECT master_create_worker_shards('customer_with_default', 64, 1);
-- Test with default values for now() function
COPY customer_with_default (c_custkey, c_name) FROM STDIN
WITH (FORMAT 'csv');
1,customer1
2,customer2
\.
-- Confirm that data was copied with now() function
SELECT count(*) FROM customer_with_default where c_time IS NOT NULL;
-- Add columns to the table and perform a COPY
ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0;
ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0;
COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV;
10,customer10,1,5
\.
SELECT * FROM customer_copy_hash WHERE extra1 = 1;
-- Test dropping an intermediate column
ALTER TABLE customer_copy_hash DROP COLUMN extra1;
COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV;
11,customer11,5
\.
SELECT * FROM customer_copy_hash WHERE c_custkey = 11;
-- Test dropping the last column
ALTER TABLE customer_copy_hash DROP COLUMN extra2;
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV;
12,customer12
\.
SELECT * FROM customer_copy_hash WHERE c_custkey = 12;
-- Create a new range-partitioned table into which to COPY
CREATE TABLE customer_copy_range (
c_custkey integer,
c_name varchar(25),
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
-- Test COPY into empty range-partitioned table
COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
WHERE shardid = :new_shard_id;
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
\gset
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
WHERE shardid = :new_shard_id;
-- Test copy into range-partitioned table
COPY customer_copy_range FROM '/home/marco/citus/citus-explain4/src/test/regress/data/customer.1.data' WITH (DELIMITER '|');
-- Check whether data went into the right shard (maybe)
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
FROM customer_copy_range WHERE c_custkey <= 500;
-- Check whether data was copied
SELECT count(*) FROM customer_copy_range;
-- Create a new append-partitioned table into which to COPY
CREATE TABLE customer_copy_append (
c_custkey integer,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('customer_copy_append', 'c_custkey', 'append');
-- Test syntax error
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
1,customer1
2,customer2
notinteger,customernot
\.
-- Test that no shard is created for failing copy
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
-- Test empty copy
COPY customer_copy_append FROM STDIN;
\.
-- Test that no shard is created for copying zero rows
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass;
-- Test proper copy
COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv');
1,customer1
2,customer2
\.
-- Check whether data was copied properly
SELECT * FROM customer_copy_append;
-- Create lineitem table
CREATE TABLE lineitem_copy_append (
l_orderkey bigint not null,
l_partkey integer not null,
l_suppkey integer not null,
l_linenumber integer not null,
l_quantity decimal(15, 2) not null,
l_extendedprice decimal(15, 2) not null,
l_discount decimal(15, 2) not null,
l_tax decimal(15, 2) not null,
l_returnflag char(1) not null,
l_linestatus char(1) not null,
l_shipdate date not null,
l_commitdate date not null,
l_receiptdate date not null,
l_shipinstruct char(25) not null,
l_shipmode char(10) not null,
l_comment varchar(44) not null);
SELECT master_create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append');
-- Test multiple shard creation
SET citus.shard_max_size TO '256kB';
COPY lineitem_copy_append FROM '/home/marco/citus/citus-explain4/src/test/regress/data/lineitem.1.data' with delimiter '|';
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;

View File

@ -0,0 +1,88 @@
--
-- MULTI_EXPLAIN
--
\a\t
SET citus.explain_distributed_queries TO on;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
-- Test JSON format
EXPLAIN (COSTS FALSE, FORMAT JSON)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
-- Test XML format
EXPLAIN (COSTS FALSE, FORMAT XML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
-- Test YAML format
EXPLAIN (COSTS FALSE, FORMAT YAML)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
-- Test Text format
EXPLAIN (COSTS FALSE, FORMAT TEXT)
SELECT l_quantity, count(*) count_quantity FROM lineitem
GROUP BY l_quantity ORDER BY count_quantity, l_quantity;
-- Test verbose
EXPLAIN (COSTS FALSE, VERBOSE TRUE)
SELECT sum(l_quantity) / avg(l_quantity) FROM lineitem;
-- Test join
EXPLAIN (COSTS FALSE)
SELECT * FROM lineitem
JOIN orders ON l_orderkey = o_orderkey AND l_quantity < 5
ORDER BY l_quantity DESC LIMIT 10;
-- Test insert
EXPLAIN (COSTS FALSE)
INSERT INTO lineitem VALUES(1,0);
-- Test update
EXPLAIN (COSTS FALSE)
UPDATE lineitem
SET l_suppkey = 12
WHERE l_orderkey = 1 AND l_partkey = 0;
-- Test delete
EXPLAIN (COSTS FALSE)
DELETE FROM lineitem
WHERE l_orderkey = 1 AND l_partkey = 0;
-- Test single-shard SELECT
EXPLAIN
SELECT l_quantity FROM lineitem WHERE l_orderkey = 5;
-- Test CREATE TABLE ... AS
EXPLAIN CREATE TABLE explain_result AS
SELECT * FROM lineitem;
-- Test all tasks output
SET citus.explain_all_tasks TO on;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
-- Test track tracker
SET citus.task_executor_type TO 'task-tracker';
SET citus.explain_all_tasks TO off;
EXPLAIN (COSTS FALSE)
SELECT avg(l_linenumber) FROM lineitem WHERE l_orderkey > 9030;
-- Test re-partition join
SET citus.large_table_shard_count TO 1;
EXPLAIN (COSTS FALSE)
SELECT count(*)
FROM lineitem, orders, customer, supplier
WHERE l_orderkey = o_orderkey
AND o_custkey = c_custkey
AND l_suppkey = s_suppkey;

View File

@ -19,111 +19,69 @@ CREATE TABLE orders_hash_partitioned (
o_clerk char(15),
o_shippriority integer,
o_comment varchar(79) );
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'append');
UPDATE pg_dist_partition SET partmethod = 'h'
WHERE logicalrelid = 'orders_hash_partitioned'::regclass;
-- Create logical shards with shardids 110, 111, 112 and 113
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue)
VALUES ('orders_hash_partitioned'::regclass, 110, 't', -1905060026, -1905060026),
('orders_hash_partitioned'::regclass, 111, 't', 1134484726, 1134484726),
('orders_hash_partitioned'::regclass, 112, 't', -1905060026, -28094569),
('orders_hash_partitioned'::regclass, 113, 't', -1011077333, 0);
-- Create shard placements for shards 110, 111, 112 and 113
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 110, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 111, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 112, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport)
SELECT 113, 1, 1, nodename, nodeport
FROM pg_dist_shard_placement
GROUP BY nodename, nodeport
ORDER BY nodename, nodeport ASC
LIMIT 1;
SELECT master_create_distributed_table('orders_hash_partitioned', 'o_orderkey', 'hash');
SELECT master_create_worker_shards('orders_hash_partitioned', 4, 1);
SET client_min_messages TO DEBUG2;
-- Check that we can prune shards for simple cases, boolean expressions and
-- immutable functions.
EXPLAIN SELECT count(*) FROM orders_hash_partitioned;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
SELECT count(*) FROM orders_hash_partitioned;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 1;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 2;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 3;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = 4;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is NULL;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey is not NULL;
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey > 2;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey = 2;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_clerk = 'aaa';
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 AND o_clerk = 'aaa';
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR (o_orderkey = 3 AND o_clerk = 'aaa');
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_orderkey is NULL;
EXPLAIN SELECT count(*) FROM
SELECT count(*) FROM
(SELECT o_orderkey FROM orders_hash_partitioned WHERE o_orderkey = 1) AS orderkeys;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = abs(-1);
-- Check that we don't support pruning for ANY (array expression) and give
-- a notice message when used with the partition column
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = ANY ('{1,2,3}');
-- Check that we don't show the message if the operator is not
-- equality operator
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey < ALL ('{1,2,3}');
-- Check that we don't give a spurious hint message when non-partition
-- columns are used with ANY/IN/ALL
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = 1 OR o_totalprice IN (2, 5);
-- Check that we cannot prune for mutable functions.
EXPLAIN SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned WHERE o_orderkey = random();
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() OR o_orderkey = 1;
EXPLAIN SELECT count(*) FROM orders_hash_partitioned
SELECT count(*) FROM orders_hash_partitioned
WHERE o_orderkey = random() AND o_orderkey = 1;
-- Check that we can do join pruning.
EXPLAIN SELECT count(*)
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey;
EXPLAIN SELECT count(*)
SELECT count(*)
FROM orders_hash_partitioned orders1, orders_hash_partitioned orders2
WHERE orders1.o_orderkey = orders2.o_orderkey
AND orders1.o_orderkey = 1
AND orders2.o_orderkey is NULL;
SET client_min_messages TO NOTICE;

View File

@ -4,6 +4,7 @@
-- Set configuration to print table join order and pruned shards
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO DEBUG2;

View File

@ -4,6 +4,7 @@
-- Enable configuration to print table join order
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;

View File

@ -4,6 +4,7 @@
-- Enable configuration to print table join order
SET citus.explain_distributed_queries TO off;
SET citus.log_multi_join_order TO TRUE;
SET client_min_messages TO LOG;

View File

@ -6,6 +6,7 @@
-- we only check for join-pruning between locally partitioned relations. In the
-- future we want to check for pruning between re-partitioned relations as well.
SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Change configuration to treat all tables as large

View File

@ -5,6 +5,7 @@
-- Tests to verify that we correctly prune unreferenced shards. For this, we
-- need to increase the logging verbosity of messages displayed on the client.
SET citus.explain_distributed_queries TO off;
SET client_min_messages TO DEBUG2;
-- Adding additional l_orderkey = 1 to make this query not router executable

View File

@ -2,6 +2,8 @@
-- MULTI_TASK_ASSIGNMENT
--
SET citus.explain_distributed_queries TO off;
-- Check that our policies for assigning tasks to worker nodes run as expected.
-- To test this, we first create a shell table, and then manually insert shard
-- and shard placement data into system catalogs. We next run Explain command,

View File

@ -13,12 +13,6 @@ COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
-- cursors may not involve distributed tables
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
-- EXPLAIN support isn't implemented for distributed queries...
EXPLAIN SELECT * FROM sharded_table;
-- ... or for distributed modifications
EXPLAIN INSERT INTO sharded_table VALUES ('dan', 4);
-- verify PREPARE functionality
PREPARE sharded_insert AS INSERT INTO sharded_table VALUES ('adam', 1);
PREPARE sharded_update AS UPDATE sharded_table SET name = 'bob' WHERE id = 1;