add command execution

replace_m_row_insert_select
Onder Kalaci 2020-05-08 15:22:00 +02:00
parent beb86c782a
commit 075940f9d2
1 changed files with 82 additions and 5 deletions

View File

@ -109,8 +109,9 @@ CoordinatorInsertSelectExecScan(CustomScanState *node)
insertSelectExecutorLevel--;
return result;
}
PlannedStmt *
GenerateValuesPlannedStmt(Query *parse);
#include "nodes/print.h"
/*
* CoordinatorInsertSelectExecScan executes an INSERT INTO distributed_table
* SELECT .. query by setting up a DestReceiver that copies tuples into the
@ -164,12 +165,26 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
* want it to be replanned every time if it is stored in a prepared
* statement.
*/
selectQuery = copyObject(selectQuery);
//selectQuery = copyObject(selectQuery);
/* plan the subquery, this may be another distributed query */
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
PlannedStmt *selectPlan = pg_plan_query(selectQuery, cursorOptions,
paramListInfo);
//elog(WARNING, "selectQuery before: %s", pretty_format_node_dump(nodeToString(selectQuery)));
//PlannedStmt *origselectPlan = pg_plan_query(selectQuery, cursorOptions,
// paramListInfo);
//elog(ERROR, "selectQuery after: %s", pretty_format_node_dump(nodeToString(selectQuery)));
Form_pg_attribute att_tup0 = TupleDescAttr(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor, 0);
Form_pg_attribute att_tup1 = TupleDescAttr(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor, 1);
Form_pg_attribute att_tup2 = TupleDescAttr(node->ss.ss_ScanTupleSlot->tts_tupleDescriptor, 2);
//elog(INFO, "%d-%d-%d", att_tup0->atttypid,att_tup1->atttypid,att_tup2->atttypid);
PlannedStmt *selectPlan = GenerateValuesPlannedStmt(selectQuery);
//elog(INFO, "selectPlan: %s", pretty_format_node_dump(nodeToString(selectPlan)));
//elog(INFO, "origselectPlan: %s", pretty_format_node_dump(nodeToString(origselectPlan)));
/*
* If we are dealing with partitioned table, we also need to lock its
@ -388,6 +403,68 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
return resultSlot;
}
#if PG_VERSION_NUM >= PG_VERSION_12
#include "optimizer/optimizer.h"
#else
#include "optimizer/var.h"
#include "optimizer/predtest.h"
#include "optimizer/clauses.h"
#endif
PlannedStmt *
GenerateValuesPlannedStmt(Query *parse)
{
PlannedStmt *result = makeNode(PlannedStmt);
ValuesScan *valuesScanNode = makeNode(ValuesScan);
Plan *plan = &valuesScanNode->scan.plan;
plan->plan_width = 20;
plan->plan_rows = 2;
RangeTblEntry *rte_1 = linitial(parse->rtable);
Query *subquery = rte_1->subquery;
ListCell *lc = NULL;
foreach(lc, parse->targetList)
{
TargetEntry *t = lfirst(lc);
Var *v = t->expr;
v->varno = 2;
}
RangeTblEntry *rte2 = linitial(subquery->rtable);
/* there is only a single relation rte */
valuesScanNode->values_lists = rte2->values_lists;
valuesScanNode->values_lists = (List *) eval_const_expressions(NULL, (Node *) valuesScanNode->values_lists);
plan->targetlist =
copyObject(FetchStatementTargetList((Node *) parse));
plan->qual = NULL;
plan->lefttree = NULL;
plan->righttree = NULL;
plan->plan_node_id = 0;
valuesScanNode->scan.scanrelid = 2;
/* rtable is used for access permission checks */
result->commandType = parse->commandType;
result->queryId = parse->queryId;
result->stmt_len = parse->stmt_len;
rte_1->subquery = NULL;
rte2->values_lists = NIL;
result->rtable = list_make2(rte_1, rte2);
result->planTree = (Plan *) plan;
result->hasReturning = (parse->returningList != NIL);
return result;
}
/*