Fix intermediate result pruning for INSERT..SELECT

pull/3720/head
Marco Slot 2020-04-06 17:29:22 +02:00
parent 84672c3dbd
commit 2632343f64
6 changed files with 123 additions and 41 deletions

View File

@ -26,15 +26,15 @@
bool LogIntermediateResults = false;
static List * FindSubPlansUsedInNode(Node *node);
static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType);
static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
DistributedPlan *distributedPlan,
int workerNodeCount);
static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry);
static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry);
static List * RemoveLocalNodeFromWorkerList(List *workerNodeList);
static void LogIntermediateResultMulticastSummary(IntermediateResultsHashEntry *entry,
List *workerNodeList);
static void UpdateUsedPlanListLocation(List *subPlanList, int locationMask);
/*
@ -53,18 +53,33 @@ FindSubPlanUsages(DistributedPlan *plan)
if (plan->masterQuery != NULL)
{
localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery);
UpdateUsedPlanListLocation(localSubPlans, SUBPLAN_ACCESS_LOCAL);
localSubPlans = FindSubPlansUsedInNode((Node *) plan->masterQuery,
SUBPLAN_ACCESS_LOCAL);
}
if (plan->workerJob != NULL)
{
/*
* Mark the subplans as needed on remote side. Note that this decision is revisited
* on execution, when the query only consists of intermediate results.
* Mark the subplans as needed on remote side. Note that this decision is
* revisited on execution, when the query only consists of intermediate
* results.
*/
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery);
UpdateUsedPlanListLocation(remoteSubPlans, SUBPLAN_ACCESS_REMOTE);
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->workerJob->jobQuery,
SUBPLAN_ACCESS_REMOTE);
}
if (plan->insertSelectQuery != NULL)
{
/* INSERT..SELECT plans currently do not have a workerJob */
Assert(plan->workerJob == NULL);
/*
* The SELECT in an INSERT..SELECT is not fully planned yet and we cannot
* perform pruning. We therefore require all subplans used in the
* INSERT..SELECT to be available all nodes.
*/
remoteSubPlans = FindSubPlansUsedInNode((Node *) plan->insertSelectQuery,
SUBPLAN_ACCESS_ANYWHERE);
}
/* merge the used subplans */
@ -77,7 +92,7 @@ FindSubPlanUsages(DistributedPlan *plan)
* the input node.
*/
static List *
FindSubPlansUsedInNode(Node *node)
FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType)
{
List *rangeTableList = NIL;
ListCell *rangeTableCell = NULL;
@ -106,9 +121,7 @@ FindSubPlansUsedInNode(Node *node)
UsedDistributedSubPlan *usedPlan = CitusMakeNode(UsedDistributedSubPlan);
usedPlan->subPlanId = pstrdup(resultId);
/* the callers are responsible for setting the accurate location */
usedPlan->locationMask = SUBPLAN_ACCESS_NONE;
usedPlan->accessType = accessType;
usedSubPlanList = lappend(usedSubPlanList, usedPlan);
}
@ -160,13 +173,12 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
continue;
}
if (usedPlan->locationMask & SUBPLAN_ACCESS_LOCAL)
if (usedPlan->accessType == SUBPLAN_ACCESS_LOCAL)
{
/* subPlan needs to be written locally as the planner decided */
entry->writeLocalFile = true;
}
if (usedPlan->locationMask & SUBPLAN_ACCESS_REMOTE)
else if (usedPlan->accessType == SUBPLAN_ACCESS_REMOTE)
{
/*
* traverse the plan and add find all worker nodes
@ -179,6 +191,12 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash,
elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId);
}
else if (usedPlan->accessType == SUBPLAN_ACCESS_ANYWHERE)
{
/* subplan is needed on all nodes */
entry->writeLocalFile = true;
AppendAllWorkerNodes(entry);
}
}
/* descend into the subPlans */
@ -243,6 +261,25 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry,
}
/*
* AppendAllWorkerNodes appends all node IDs of readable worker nodes to the
* nodeIdList, meaning the corresponding intermediate result should be sent
* to all readable nodes.
*/
static void
AppendAllWorkerNodes(IntermediateResultsHashEntry *entry)
{
List *workerNodeList = ActiveReadableWorkerNodeList();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
entry->nodeIdList =
list_append_unique_int(entry->nodeIdList, workerNode->nodeId);
}
}
/*
* MakeIntermediateResultHTAB is a helper method that creates a Hash Table that
* stores information on the intermediate result.
@ -414,20 +451,3 @@ SearchIntermediateResult(HTAB *intermediateResultsHash, char *resultId)
return entry;
}
/*
* UpdateUsedPlanListLocation is a utility function which iterates over the list
* and updates the subPlanLocation to the input location.
*/
static void
UpdateUsedPlanListLocation(List *subPlanList, int locationMask)
{
ListCell *subPlanCell = NULL;
foreach(subPlanCell, subPlanList)
{
UsedDistributedSubPlan *subPlan = lfirst(subPlanCell);
subPlan->locationMask |= locationMask;
}
}

View File

@ -141,7 +141,7 @@ CopyNodeUsedDistributedSubPlan(COPYFUNC_ARGS)
DECLARE_FROM_AND_NEW_NODE(UsedDistributedSubPlan);
COPY_STRING_FIELD(subPlanId);
COPY_SCALAR_FIELD(locationMask);
COPY_SCALAR_FIELD(accessType);
}

View File

@ -222,7 +222,7 @@ OutUsedDistributedSubPlan(OUTFUNC_ARGS)
WRITE_NODE_TYPE("USEDDISTRIBUTEDSUBPLAN");
WRITE_STRING_FIELD(subPlanId);
WRITE_INT_FIELD(locationMask);
WRITE_ENUM_FIELD(accessType, SubPlanAccessType);
}

View File

@ -48,10 +48,6 @@
#define MERGE_FILES_AND_RUN_QUERY_COMMAND \
"SELECT worker_merge_files_and_run_query(" UINT64_FORMAT ", %d, %s, %s)"
#define SUBPLAN_ACCESS_NONE 0
#define SUBPLAN_ACCESS_LOCAL 1
#define SUBPLAN_ACCESS_REMOTE 2
typedef enum CitusRTEKind
{
@ -407,10 +403,14 @@ typedef struct DistributedPlan
/*
* List of subPlans that are used in the DistributedPlan
* Note that this is different that "subPlanList" field which
* contains the subplans generated part of the DistributedPlan.
* contains the subplans generated as part of the DistributedPlan.
*
* On the other hand, usedSubPlanNodeList keeps track of which subPlans
* are used within this distributed plan.
* are used within this distributed plan as a list of
* UsedDistributedSubPlan pointers.
*
* The list may contain duplicates if the subplan is referenced multiple
* times (e.g. a CTE appears in the query tree multiple times).
*/
List *usedSubPlanNodeList;
@ -444,6 +444,16 @@ typedef struct DistributedSubPlan
} DistributedSubPlan;
/* defines how a subplan is used by a distributed query */
typedef enum SubPlanAccessType
{
SUBPLAN_ACCESS_NONE,
SUBPLAN_ACCESS_LOCAL,
SUBPLAN_ACCESS_REMOTE,
SUBPLAN_ACCESS_ANYWHERE
} SubPlanAccessType;
/*
* UsedDistributedSubPlan contains information about a subPlan that is used in a
* distributed plan.
@ -452,8 +462,11 @@ typedef struct UsedDistributedSubPlan
{
CitusNode type;
/* subplan used by the distributed query */
char *subPlanId;
int locationMask;
/* how the subplan is used by a distributed query */
SubPlanAccessType accessType;
} UsedDistributedSubPlan;

View File

@ -1031,6 +1031,37 @@ DEBUG: Subplan XXX_4 will be sent to localhost:xxxxx
100
(1 row)
RESET citus.task_assignment_policy;
-- Insert..select is planned differently, make sure we have results everywhere.
-- We put the insert..select in a CTE here to prevent the CTE from being moved
-- into the select, which would follow the regular code path for select.
WITH stats AS (
SELECT count(key) m FROM table_3
),
inserts AS (
INSERT INTO table_2
SELECT key, count(*)
FROM table_1
WHERE key > (SELECT m FROM stats)
GROUP BY key
HAVING count(*) < (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM intermediate_result_pruning.table_3
DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO intermediate_result_pruning.table_2 (key, value) SELECT key, count(*) AS count FROM intermediate_result_pruning.table_1 WHERE (key OPERATOR(pg_catalog.>) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts
DEBUG: Subplan XXX_1 will be written to local file
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_2 will be written to local file
DEBUG: Collecting INSERT ... SELECT results on coordinator
count
---------------------------------------------------------------------
1
(1 row)
SET citus.task_assignment_policy to DEFAULT;
SET client_min_messages TO DEFAULT;
DROP TABLE table_1, table_2, table_3, ref_table, accounts, stats, range_partitioned;

View File

@ -607,6 +607,24 @@ FROM
FROM accounts_cte
INNER JOIN joined_stats_cte_2 USING (account_id)
) inner_query;
RESET citus.task_assignment_policy;
-- Insert..select is planned differently, make sure we have results everywhere.
-- We put the insert..select in a CTE here to prevent the CTE from being moved
-- into the select, which would follow the regular code path for select.
WITH stats AS (
SELECT count(key) m FROM table_3
),
inserts AS (
INSERT INTO table_2
SELECT key, count(*)
FROM table_1
WHERE key > (SELECT m FROM stats)
GROUP BY key
HAVING count(*) < (SELECT m FROM stats)
LIMIT 1
RETURNING *
) SELECT count(*) FROM inserts;
SET citus.task_assignment_policy to DEFAULT;
SET client_min_messages TO DEFAULT;