implement parameterized inner loops

moonshot/custom-path
Nils Dijk 2021-11-09 19:12:09 +01:00
parent 9a4b1c2a49
commit 2a67d0057d
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
4 changed files with 210 additions and 13 deletions

View File

@ -209,6 +209,8 @@ typedef struct DistributedExecution
/* Parameters for parameterized plans. Can be NULL. */
ParamListInfo paramListInfo;
ParamExecData *paramExecData;
List *paramExecTypes;
/* list of workers involved in the execution */
List *workerList;
@ -611,6 +613,8 @@ typedef struct TaskPlacementExecution
static DistributedExecution * CreateDistributedExecution(RowModifyLevel modLevel,
List *taskList,
ParamListInfo paramListInfo,
ParamExecData *paramExecData,
List *paramExecTypes,
int targetPoolSize,
TupleDestination *
defaultTupleDest,
@ -821,6 +825,8 @@ AdaptiveExecutor(CitusScanState *scanState)
distributedPlan->modLevel,
taskList,
paramListInfo,
executorState->es_param_exec_vals,
executorState->es_plannedstmt->paramExecTypes,
targetPoolSize,
defaultTupleDest,
&xactProperties,
@ -1016,7 +1022,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
DistributedExecution *execution =
CreateDistributedExecution(
executionParams->modLevel, executionParams->taskList,
paramListInfo, executionParams->targetPoolSize,
paramListInfo, NULL, NIL, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
@ -1085,6 +1091,8 @@ CreateBasicExecutionParams(RowModifyLevel modLevel,
static DistributedExecution *
CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
ParamListInfo paramListInfo,
ParamExecData *paramExecData,
List *paramExecTypes,
int targetPoolSize, TupleDestination *defaultTupleDest,
TransactionProperties *xactProperties,
List *jobIdList, bool localExecutionSupported)
@ -1101,6 +1109,8 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
execution->remoteTaskList = NIL;
execution->paramListInfo = paramListInfo;
execution->paramExecData = paramExecData;
execution->paramExecTypes = paramExecTypes;
execution->workerList = NIL;
execution->sessionList = NIL;
execution->targetPoolSize = targetPoolSize;
@ -4355,23 +4365,101 @@ SendNextQuery(TaskPlacementExecution *placementExecution,
bool binaryResults = shardCommandExecution->binaryResults;
Task *task = shardCommandExecution->task;
ParamListInfo paramListInfo = execution->paramListInfo;
ParamExecData *paramExecData = execution->paramExecData;
List *paramExecTypes = execution->paramExecTypes;
int querySent = 0;
uint32 queryIndex = placementExecution->queryIndex;
Assert(queryIndex < task->queryCount);
char *queryString = TaskQueryStringAtIndex(task, queryIndex);
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
bool hasParameters = (paramListInfo != NULL)
|| (paramExecData != NULL && list_length(paramExecTypes) > 0);
if (hasParameters && !task->parametersInQueryStringResolved)
{
int parameterCount = paramListInfo->numParams;
Oid *parameterTypes = NULL;
const char **parameterValues = NULL;
int parameterCount = 0;
if (paramListInfo != NULL)
{
parameterCount += paramListInfo->numParams;
}
parameterCount += list_length(paramExecTypes);
/* force evaluation of bound params */
paramListInfo = copyParamList(paramListInfo);
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
Oid *parameterTypes = palloc0(parameterCount * sizeof(Oid));
const char **parameterValues = palloc0(parameterCount * sizeof(char *));
Index parameterIndex = 0;
if (paramListInfo)
{
/* force evaluation of bound params */
paramListInfo = copyParamList(paramListInfo);
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
parameterIndex += paramListInfo->numParams;
}
Oid execParamType = 0;
const bool useOriginalCustomTypeOids = false;
Index execParamIndex = 0;
foreach_oid(execParamType, paramExecTypes)
{
/*
* Use 0 for data types where the oid values can be different on
* the coordinator and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
*/
if (execParamType >= FirstNormalObjectId && !useOriginalCustomTypeOids)
{
parameterTypes[parameterIndex] = 0;
}
else
{
parameterTypes[parameterIndex] = execParamType;
}
/*
* If the parameter is not referenced / used (ptype == 0) and
* would otherwise have errored out inside standard_planner()),
* don't pass a value to the remote side, and pass text oid to prevent
* undetermined data type errors on workers.
*/
if (execParamType == 0)
{
parameterValues[parameterIndex] = NULL;
parameterTypes[parameterIndex] = TEXTOID;
execParamIndex++;
parameterIndex++;
continue;
}
/*
* If the parameter is NULL then we preserve its type, but
* don't need to evaluate its value.
*/
if (paramExecData[execParamIndex].isnull)
{
parameterValues[parameterIndex] = NULL;
execParamIndex++;
parameterIndex++;
continue;
}
Oid typeOutputFunctionId = 0;
bool variableLengthType = 0;
getTypeOutputInfo(execParamType, &typeOutputFunctionId,
&variableLengthType);
parameterValues[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
paramExecData[execParamIndex].value);
execParamIndex++;
parameterIndex++;
}
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
parameterTypes, parameterValues,
binaryResults);
@ -5538,8 +5626,15 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo,
{
int parameterCount = paramListInfo->numParams;
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
if (*parameterTypes == NULL)
{
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
}
if (*parameterValues == NULL)
{
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
}
/* get parameter types and values */
for (int parameterIndex = 0; parameterIndex < parameterCount; parameterIndex++)

View File

@ -28,16 +28,17 @@
#include "distributed/local_executor.h"
#include "distributed/local_plan_cache.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/query_stats.h"
#include "distributed/subplan_execution.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/optimizer.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/clauses.h"
#include "optimizer/optimizer.h"
#include "utils/memutils.h"
#include "utils/rel.h"
@ -565,6 +566,30 @@ RegenerateTaskForFasthPathQuery(Job *workerJob)
}
static bool
NumberOfParametersWalker(Node *node, int *count)
{
if (node == NULL)
{
return false;
}
else if (IsA(node, Param))
{
*count = *count + 1;
return false;
}
return expression_tree_walker(node, NumberOfParametersWalker, count);
}
static int
NumberOfParameters(Node *expr)
{
int count = 0;
NumberOfParametersWalker(expr, &count);
return count;
}
/*
* AdaptiveExecutorCreateScan creates the scan state for the adaptive executor.
*/
@ -583,6 +608,11 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
int numParams = NumberOfParameters((Node *) scan->custom_exprs);
scanState->numParameters = numParams;
scanState->paramValues = palloc0(scanState->numParameters * sizeof(Datum));
scanState->paramNulls = palloc0(scanState->numParameters * sizeof(bool));
return (Node *) scanState;
}
@ -691,6 +721,21 @@ CitusReScan(CustomScanState *node)
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
/* for all the changed parameters, store them locally so we can use them during query dispatch */
int x = -1;
while ((x = bms_next_member(node->ss.ps.chgParam, x)) >= 0)
{
/* make sure the parameter that changed is within bounds */
Assert(scanState->numParameters > x);
/* copy the parameter information into local state*/
ParamExecData paramExecData = executorState->es_param_exec_vals[x];
scanState->paramValues[x] = paramExecData.value;
scanState->paramNulls[x] = paramExecData.isnull;
ereport(NOTICE, (errmsg("changed parameter: %d", x)));
}
if (paramListInfo != NULL && !workerJob->parametersInJobQueryResolved)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),

View File

@ -22,6 +22,7 @@
#include "nodes/pathnodes.h"
#include "nodes/pg_list.h"
#include "nodes/plannodes.h"
#include "optimizer/paramassign.h"
#include "optimizer/pathnode.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/tlist.h"
@ -147,6 +148,55 @@ WrapTableAccessWithDistributedUnion(Path *originalPath, uint32 colocationId,
}
static Node *
TransformVarToParamExternMutator(Node *node, PlannerInfo *root)
{
if (node == NULL)
{
return NULL;
}
if (IsA(node, Var))
{
Var *var = castNode(Var, node);
/* If not to be replaced, we can just return the Var unmodified */
if (!bms_is_member(var->varno, root->curOuterRels))
{
return (Node *) var;
}
Param *paramExec = replace_nestloop_param_var(root, var);
/* TODO: figure out which Var's to replace by which parameters*/
/* TODO: hack - insert param 1 for now */
Param *paramExtern = makeNode(Param);
paramExtern->paramkind = PARAM_EXTERN;
/* Exec is 0-index, Extern is 1-indexed */
paramExtern->paramid = paramExec->paramid + 1;
paramExtern->paramtype = paramExec->paramtype;
paramExtern->paramtypmod = paramExec->paramtypmod;
paramExtern->paramcollid = paramExec->paramcollid;
paramExtern->location = paramExec->location;
return (Node *) paramExtern;
}
else if (IsA(node, Query))
{
return (Node *) query_tree_mutator((Query *) node,
TransformVarToParamExternMutator,
(void *) root,
0);
}
return expression_tree_mutator(node, TransformVarToParamExternMutator,
(void *) root);
}
static Plan *
CreateDistributedUnionPlan(PlannerInfo *root,
RelOptInfo *rel,
@ -183,6 +233,9 @@ CreateDistributedUnionPlan(PlannerInfo *root,
Query *qc = copyObject(q);
UpdateRelationToShardNames((Node *) qc, relationShardList);
/* transform Var's for other varno's to parameters */
qc = castNode(Query, TransformVarToParamExternMutator((Node *) qc, root));
StringInfoData buf;
initStringInfo(&buf);
pg_get_query_def(qc, &buf);

View File

@ -27,6 +27,10 @@ typedef struct CitusScanState
MultiExecutorType executorType; /* distributed executor type */
bool finishedRemoteScan; /* flag to check if remote scan is finished */
Tuplestorestate *tuplestorestate; /* tuple store to store distributed results */
int numParameters; /* number of parameters passed into node per rescan */
Datum *paramValues; /* array of last passed parameter values */
bool *paramNulls; /* array of last passed parameter null flags*/
} CitusScanState;