mirror of https://github.com/citusdata/citus.git
restore
parent
11127b4938
commit
ecc67255ec
|
|
@ -48,7 +48,6 @@
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "miscadmin.h" // for elog functions
|
|
||||||
|
|
||||||
|
|
||||||
/* global variable tracking whether we are in a delegated procedure call */
|
/* global variable tracking whether we are in a delegated procedure call */
|
||||||
|
|
@ -64,59 +63,6 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
FuncExpr *funcExpr = callStmt->funcexpr;
|
FuncExpr *funcExpr = callStmt->funcexpr;
|
||||||
Oid functionId = funcExpr->funcid;
|
Oid functionId = funcExpr->funcid;
|
||||||
|
|
||||||
/* Log the function ID being called */
|
|
||||||
elog(DEBUG1, "Calling distributed procedure with functionId: %u", functionId);
|
|
||||||
|
|
||||||
/* Log additional details from CallStmt */
|
|
||||||
if (callStmt->funcexpr != NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Function expression type: %d", nodeTag(callStmt->funcexpr));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (funcExpr->args != NIL)
|
|
||||||
{
|
|
||||||
ListCell *lc;
|
|
||||||
int argIndex = 0;
|
|
||||||
foreach(lc, funcExpr->args)
|
|
||||||
{
|
|
||||||
Node *arg = (Node *) lfirst(lc);
|
|
||||||
elog(DEBUG1, "Argument %d: NodeTag: %d", argIndex, nodeTag(arg));
|
|
||||||
argIndex++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "No arguments in the function expression");
|
|
||||||
}
|
|
||||||
|
|
||||||
ListCell *argCell1;
|
|
||||||
int argIndex1 = 0;
|
|
||||||
|
|
||||||
/* Iterate over the arguments and log them */
|
|
||||||
foreach(argCell1, callStmt->funcexpr->args)
|
|
||||||
{
|
|
||||||
Node *argNode = (Node *) lfirst(argCell1);
|
|
||||||
|
|
||||||
// Check if the node is valid
|
|
||||||
if (argNode != NULL)
|
|
||||||
{
|
|
||||||
// Use nodeToString() to convert the node into a string representation for debugging
|
|
||||||
char *argStr = nodeToString(argNode);
|
|
||||||
|
|
||||||
// Log the argument index and its string representation
|
|
||||||
elog(DEBUG1, "Argument %d: %s", argIndex1, argStr);
|
|
||||||
|
|
||||||
// Free the string memory after logging (it's a good practice to avoid memory leaks)
|
|
||||||
pfree(argStr);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Argument %d: (null)", argIndex1);
|
|
||||||
}
|
|
||||||
argIndex1++;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
|
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
|
||||||
functionId, 0);
|
functionId, 0);
|
||||||
if (procedure == NULL || !procedure->isDistributed)
|
if (procedure == NULL || !procedure->isDistributed)
|
||||||
|
|
@ -221,7 +167,6 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
|
|
||||||
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt));
|
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt));
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Generated CALL statement: %s", callCommand->data);
|
|
||||||
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
||||||
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
|
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupleDesc,
|
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupleDesc,
|
||||||
|
|
@ -250,186 +195,19 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
};
|
};
|
||||||
|
|
||||||
EnableWorkerMessagePropagation();
|
EnableWorkerMessagePropagation();
|
||||||
elog(DEBUG1, "Worker message propagation enabled");
|
|
||||||
|
|
||||||
bool localExecutionSupported = true;
|
bool localExecutionSupported = true;
|
||||||
elog(DEBUG1, "Local execution supported: %d", localExecutionSupported);
|
|
||||||
|
|
||||||
/* Log task details */
|
|
||||||
elog(DEBUG1, "Creating execution params for task");
|
|
||||||
|
|
||||||
/* Create execution parameters */
|
|
||||||
ExecutionParams *executionParams = CreateBasicExecutionParams(
|
ExecutionParams *executionParams = CreateBasicExecutionParams(
|
||||||
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
|
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
|
||||||
localExecutionSupported
|
localExecutionSupported
|
||||||
);
|
);
|
||||||
|
executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore,
|
||||||
const char* NodeTagToString(NodeTag tag)
|
tupleDesc);
|
||||||
{
|
|
||||||
switch (tag)
|
|
||||||
{
|
|
||||||
case T_Var: return "Var";
|
|
||||||
case T_Const: return "Const";
|
|
||||||
case T_Param: return "Param";
|
|
||||||
case T_FuncExpr: return "FuncExpr";
|
|
||||||
case T_OpExpr: return "OpExpr";
|
|
||||||
case T_BoolExpr: return "BoolExpr";
|
|
||||||
case T_Aggref: return "Aggref";
|
|
||||||
case T_WindowFunc: return "WindowFunc";
|
|
||||||
case T_SubLink: return "SubLink";
|
|
||||||
case T_CoalesceExpr: return "CoalesceExpr";
|
|
||||||
case T_CaseExpr: return "CaseExpr";
|
|
||||||
case T_NullTest: return "NullTest";
|
|
||||||
case T_CollateExpr: return "CollateExpr";
|
|
||||||
case T_FieldSelect: return "FieldSelect";
|
|
||||||
case T_FieldStore: return "FieldStore";
|
|
||||||
case T_SubPlan: return "SubPlan";
|
|
||||||
default: return "Unknown";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a ParamListInfo structure
|
|
||||||
List *argList = funcExpr->args; // Extract arguments from the function expression
|
|
||||||
int paramCount = list_length(argList); // Get the number of arguments
|
|
||||||
ParamListInfo paramListInfo = makeParamList(paramCount); // Create ParamListInfo structure
|
|
||||||
|
|
||||||
// Loop through the argument list and populate ParamListInfo
|
|
||||||
int paramIndex = 0;
|
|
||||||
ListCell *argCell;
|
|
||||||
foreach(argCell, argList)
|
|
||||||
{
|
|
||||||
Node *argNode = (Node *) lfirst(argCell);
|
|
||||||
|
|
||||||
// Log the type of the argument
|
|
||||||
NodeTag nodeType = nodeTag(argNode);
|
|
||||||
elog(DEBUG1, "Processing argument at index %d of type: %s", paramIndex, NodeTagToString(nodeType));
|
|
||||||
|
|
||||||
|
|
||||||
if (IsA(argNode, Const))
|
|
||||||
{
|
|
||||||
Const *constArg = (Const *) argNode;
|
|
||||||
paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type
|
|
||||||
paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value
|
|
||||||
paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null
|
|
||||||
|
|
||||||
// Log the constant parameter's type, value, and null status
|
|
||||||
elog(DEBUG1, "Populating ParamListInfo with constant parameter: paramIndex: %d, paramType: %d, isNull: %s",
|
|
||||||
paramIndex, paramListInfo->params[paramIndex].ptype,
|
|
||||||
constArg->constisnull ? "true" : "false");
|
|
||||||
}
|
|
||||||
else if (IsA(argNode, Param))
|
|
||||||
{
|
|
||||||
Param *paramArg = (Param *) argNode;
|
|
||||||
|
|
||||||
// Set the parameter type
|
|
||||||
paramListInfo->params[paramIndex].ptype = paramArg->paramtype;
|
|
||||||
|
|
||||||
// Fetch the value of the parameter if necessary
|
|
||||||
if (paramListInfo->paramFetch != NULL)
|
|
||||||
{
|
|
||||||
ParamExternData paramData;
|
|
||||||
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData);
|
|
||||||
|
|
||||||
// Log the fetched parameter details
|
|
||||||
elog(DEBUG1, "paramFetch for paramId: %d returned value: %d, type: %d, isNull: %s",
|
|
||||||
paramArg->paramid, DatumGetInt32(paramData.value), paramData.ptype,
|
|
||||||
paramData.isnull ? "true" : "false");
|
|
||||||
|
|
||||||
paramListInfo->params[paramIndex].value = paramData.value;
|
|
||||||
paramListInfo->params[paramIndex].isnull = paramData.isnull;
|
|
||||||
|
|
||||||
// Log fetched value and type
|
|
||||||
elog(DEBUG1, "Fetched dynamic parameter: paramIndex: %d, paramType: %d, paramValue: %d",
|
|
||||||
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Handle the case where paramFetch is NULL
|
|
||||||
elog(DEBUG1, "Could not fetch value for parameter: %d", paramArg->paramid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else if (IsA(argNode, FuncExpr))
|
|
||||||
{
|
|
||||||
// FuncExpr *funcExpr = (FuncExpr *) argNode;
|
|
||||||
|
|
||||||
// Log function expression details
|
|
||||||
elog(DEBUG1, "Processing function expression: funcid: %d", funcExpr->funcid);
|
|
||||||
|
|
||||||
// Iterate through the arguments of the function expression
|
|
||||||
ListCell *funcArgCell;
|
|
||||||
foreach(funcArgCell, funcExpr->args)
|
|
||||||
{
|
|
||||||
Node *funcArgNode = (Node *) lfirst(funcArgCell);
|
|
||||||
|
|
||||||
// Check if the argument is a Param or Const
|
|
||||||
if (IsA(funcArgNode, Param))
|
|
||||||
{
|
|
||||||
Param *paramArg = (Param *) funcArgNode;
|
|
||||||
|
|
||||||
// Fetch the parameter value (same as your param-fetch logic)
|
|
||||||
ParamExternData paramData;
|
|
||||||
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData);
|
|
||||||
|
|
||||||
// Populate ParamListInfo with fetched param
|
|
||||||
paramListInfo->params[paramIndex].ptype = paramArg->paramtype;
|
|
||||||
paramListInfo->params[paramIndex].value = paramData.value;
|
|
||||||
paramListInfo->params[paramIndex].isnull = paramData.isnull;
|
|
||||||
|
|
||||||
// Log fetched parameter details
|
|
||||||
elog(DEBUG1, "Populating ParamListInfo with fetched parameter: paramIndex: %d, paramType: %d, paramValue: %d",
|
|
||||||
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
|
||||||
}
|
|
||||||
else if (IsA(funcArgNode, Const))
|
|
||||||
{
|
|
||||||
Const *constArg = (Const *) funcArgNode;
|
|
||||||
|
|
||||||
// Handle Const values within the function expression
|
|
||||||
paramListInfo->params[paramIndex].ptype = constArg->consttype;
|
|
||||||
paramListInfo->params[paramIndex].value = constArg->constvalue;
|
|
||||||
paramListInfo->params[paramIndex].isnull = constArg->constisnull;
|
|
||||||
|
|
||||||
// Log constant parameter
|
|
||||||
elog(DEBUG1, "Populating ParamListInfo with constant parameter inside function expression: paramIndex: %d, paramType: %d",
|
|
||||||
paramIndex, paramListInfo->params[paramIndex].ptype);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Unsupported argument type in function expression at paramIndex: %d", paramIndex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Handle other cases if necessary
|
|
||||||
elog(DEBUG1, "Unsupported argument type at paramIndex: %d", paramIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Log populated parameters
|
|
||||||
// elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d",
|
|
||||||
// paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
|
||||||
|
|
||||||
paramIndex++;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/* Set tuple destination and execution properties */
|
|
||||||
executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, tupleDesc);
|
|
||||||
executionParams->expectResults = expectResults;
|
executionParams->expectResults = expectResults;
|
||||||
executionParams->xactProperties = xactProperties;
|
executionParams->xactProperties = xactProperties;
|
||||||
executionParams->isUtilityCommand = true;
|
executionParams->isUtilityCommand = true;
|
||||||
executionParams->paramListInfo = paramListInfo;
|
|
||||||
|
|
||||||
/* Log before executing task list */
|
|
||||||
elog(DEBUG1, "Executing task list with ExecuteTaskListExtended");
|
|
||||||
|
|
||||||
/* Execute the task list */
|
|
||||||
ExecuteTaskListExtended(executionParams);
|
ExecuteTaskListExtended(executionParams);
|
||||||
|
|
||||||
/* Log after task execution */
|
|
||||||
elog(DEBUG1, "Task list execution completed");
|
|
||||||
|
|
||||||
|
|
||||||
DisableWorkerMessagePropagation();
|
DisableWorkerMessagePropagation();
|
||||||
|
|
||||||
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
|
while (tuplestore_gettupleslot(tupleStore, true, false, slot))
|
||||||
|
|
|
||||||
|
|
@ -769,51 +769,16 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
/*
|
/*
|
||||||
* AdaptiveExecutor is called via CitusExecScan on the
|
* AdaptiveExecutor is called via CitusExecScan on the
|
||||||
* first call of CitusExecScan. The function fills the tupleStore
|
* first call of CitusExecScan. The function fills the tupleStore
|
||||||
* of the input scanState.
|
* of the input scanScate.
|
||||||
*/
|
*/
|
||||||
TupleTableSlot *
|
TupleTableSlot *
|
||||||
AdaptiveExecutor(CitusScanState *scanState)
|
AdaptiveExecutor(CitusScanState *scanState)
|
||||||
{
|
{
|
||||||
/* Log entry into the function */
|
|
||||||
elog(DEBUG1, "Entering AdaptiveExecutor");
|
|
||||||
|
|
||||||
TupleTableSlot *resultSlot = NULL;
|
TupleTableSlot *resultSlot = NULL;
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
EState *executorState = ScanStateGetExecutorState(scanState);
|
EState *executorState = ScanStateGetExecutorState(scanState);
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
|
|
||||||
/* Log details about the distributed plan */
|
|
||||||
if (distributedPlan != NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Distributed plan modLevel: %d", distributedPlan->modLevel);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Distributed plan is NULL");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Log details about paramListInfo */
|
|
||||||
if (paramListInfo != NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "paramListInfo is populated with %d parameters", paramListInfo->numParams);
|
|
||||||
|
|
||||||
/* Log each parameter */
|
|
||||||
for (int i = 0; i < paramListInfo->numParams; i++)
|
|
||||||
{
|
|
||||||
ParamExternData *param = ¶mListInfo->params[i];
|
|
||||||
elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d",
|
|
||||||
i + 1, param->ptype, param->isnull);
|
|
||||||
if (!param->isnull)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "paramListInfo is NULL");
|
|
||||||
}
|
|
||||||
bool randomAccess = true;
|
bool randomAccess = true;
|
||||||
bool interTransactions = false;
|
bool interTransactions = false;
|
||||||
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
|
||||||
|
|
@ -881,53 +846,16 @@ AdaptiveExecutor(CitusScanState *scanState)
|
||||||
distributedPlan->modLevel, taskList, excludeFromXact);
|
distributedPlan->modLevel, taskList, excludeFromXact);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* In some rare cases, we have prepared statements that pass a parameter
|
* In some rare cases, we have prepared statements that pass a parameter
|
||||||
* and never used in the query, mark such parameters' type as Invalid(0),
|
* and never used in the query, mark such parameters' type as Invalid(0),
|
||||||
* which will be used later in ExtractParametersFromParamList() to map them
|
* which will be used later in ExtractParametersFromParamList() to map them
|
||||||
* to a generic datatype. Skip for dynamic parameters.
|
* to a generic datatype. Skip for dynamic parameters.
|
||||||
*/
|
*/
|
||||||
if (paramListInfo && !paramListInfo->paramFetch)
|
if (paramListInfo && !paramListInfo->paramFetch)
|
||||||
{
|
{
|
||||||
/* Copy the ParamListInfo for safety */
|
|
||||||
paramListInfo = copyParamList(paramListInfo);
|
paramListInfo = copyParamList(paramListInfo);
|
||||||
|
|
||||||
/* Mark any unreferenced parameters */
|
|
||||||
MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo);
|
MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo);
|
||||||
|
|
||||||
/* Debug: Log the parameters before proceeding */
|
|
||||||
elog(DEBUG1, "paramListInfo: number of parameters = %d", paramListInfo->numParams);
|
|
||||||
|
|
||||||
/* Log details of each parameter */
|
|
||||||
for (int i = 0; i < paramListInfo->numParams; i++)
|
|
||||||
{
|
|
||||||
ParamExternData *param = ¶mListInfo->params[i];
|
|
||||||
elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d",
|
|
||||||
i + 1, param->ptype, param->isnull);
|
|
||||||
if (!param->isnull)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
|
||||||
{
|
|
||||||
/* If paramListInfo is NULL or paramFetch is true, log that information */
|
|
||||||
if (paramListInfo == NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "paramListInfo is NULL");
|
|
||||||
}
|
|
||||||
else if (paramListInfo->paramFetch)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "paramListInfo uses dynamic parameter fetching");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Log the parameters before creating the distributed execution */
|
|
||||||
elog(DEBUG1, "Creating DistributedExecution with modLevel: %d, taskList size: %d",
|
|
||||||
distributedPlan->modLevel, list_length(taskList));
|
|
||||||
|
|
||||||
elog(DEBUG1, "Target pool size: %d, Local execution supported: %d",
|
|
||||||
targetPoolSize, localExecutionSupported);
|
|
||||||
|
|
||||||
DistributedExecution *execution = CreateDistributedExecution(
|
DistributedExecution *execution = CreateDistributedExecution(
|
||||||
distributedPlan->modLevel,
|
distributedPlan->modLevel,
|
||||||
|
|
@ -939,9 +867,6 @@ AdaptiveExecutor(CitusScanState *scanState)
|
||||||
jobIdList,
|
jobIdList,
|
||||||
localExecutionSupported);
|
localExecutionSupported);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Make sure that we acquire the appropriate locks even if the local tasks
|
* Make sure that we acquire the appropriate locks even if the local tasks
|
||||||
* are going to be executed with local execution.
|
* are going to be executed with local execution.
|
||||||
|
|
@ -992,13 +917,6 @@ AdaptiveExecutor(CitusScanState *scanState)
|
||||||
static void
|
static void
|
||||||
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
|
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
|
||||||
{
|
{
|
||||||
/* Log entry into the function */
|
|
||||||
elog(DEBUG1, "Entering RunLocalExecution");
|
|
||||||
|
|
||||||
/* Log scan state and execution details */
|
|
||||||
elog(DEBUG1, "ScanState: distributedPlan type: %d", nodeTag(scanState->distributedPlan));
|
|
||||||
elog(DEBUG1, "Local task list length: %d", list_length(execution->localTaskList));
|
|
||||||
|
|
||||||
EState *estate = ScanStateGetExecutorState(scanState);
|
EState *estate = ScanStateGetExecutorState(scanState);
|
||||||
bool isUtilityCommand = false;
|
bool isUtilityCommand = false;
|
||||||
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
|
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
|
||||||
|
|
@ -1167,83 +1085,52 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
|
||||||
uint64
|
uint64
|
||||||
ExecuteTaskListExtended(ExecutionParams *executionParams)
|
ExecuteTaskListExtended(ExecutionParams *executionParams)
|
||||||
{
|
{
|
||||||
/* Log task list length */
|
|
||||||
elog(DEBUG1, "Starting ExecuteTaskListExtended, number of tasks: %d", list_length(executionParams->taskList));
|
|
||||||
|
|
||||||
/* if there are no tasks to execute, we can return early */
|
/* if there are no tasks to execute, we can return early */
|
||||||
if (list_length(executionParams->taskList) == 0)
|
if (list_length(executionParams->taskList) == 0)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "No tasks to execute, returning early");
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64 locallyProcessedRows = 0;
|
uint64 locallyProcessedRows = 0;
|
||||||
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
|
|
||||||
|
|
||||||
/* Log connection type */
|
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
|
||||||
elog(DEBUG1, "MultiShardConnectionType: %d", MultiShardConnectionType);
|
|
||||||
|
|
||||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Switching to sequential connection mode");
|
|
||||||
executionParams->targetPoolSize = 1;
|
executionParams->targetPoolSize = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Log before creating distributed execution */
|
DistributedExecution *execution =
|
||||||
elog(DEBUG1, "Creating distributed execution for task list");
|
CreateDistributedExecution(
|
||||||
|
executionParams->modLevel, executionParams->taskList,
|
||||||
|
executionParams->paramListInfo, executionParams->targetPoolSize,
|
||||||
|
defaultTupleDest, &executionParams->xactProperties,
|
||||||
|
executionParams->jobIdList, executionParams->localExecutionSupported);
|
||||||
|
|
||||||
DistributedExecution *execution = CreateDistributedExecution(
|
/*
|
||||||
executionParams->modLevel, executionParams->taskList,
|
* If current transaction accessed local placements and task list includes
|
||||||
executionParams->paramListInfo, executionParams->targetPoolSize,
|
* tasks that should be executed locally (accessing any of the local placements),
|
||||||
defaultTupleDest, &executionParams->xactProperties,
|
* then we should error out as it would cause inconsistencies across the
|
||||||
executionParams->jobIdList, executionParams->localExecutionSupported);
|
* remote connection and local execution.
|
||||||
|
*/
|
||||||
/* Log details of created execution */
|
|
||||||
elog(DEBUG1, "DistributedExecution created: %d tasks, local execution supported: %d",
|
|
||||||
list_length(execution->remoteTaskList), execution->localExecutionSupported);
|
|
||||||
|
|
||||||
/* Ensure local execution state is compatible */
|
|
||||||
EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
|
EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
|
||||||
|
|
||||||
/* Log before running distributed execution */
|
/* run the remote execution */
|
||||||
elog(DEBUG1, "Starting distributed execution");
|
|
||||||
|
|
||||||
/* Start the distributed execution */
|
|
||||||
StartDistributedExecution(execution);
|
StartDistributedExecution(execution);
|
||||||
|
|
||||||
/* Log after StartDistributedExecution */
|
|
||||||
elog(DEBUG1, "StartDistributedExecution completed");
|
|
||||||
|
|
||||||
/* Run the distributed execution */
|
|
||||||
RunDistributedExecution(execution);
|
RunDistributedExecution(execution);
|
||||||
|
|
||||||
/* Log after RunDistributedExecution */
|
|
||||||
elog(DEBUG1, "RunDistributedExecution completed");
|
|
||||||
|
|
||||||
/* Finish the distributed execution */
|
|
||||||
FinishDistributedExecution(execution);
|
FinishDistributedExecution(execution);
|
||||||
|
|
||||||
/* Log after FinishDistributedExecution */
|
/* now, switch back to the local execution */
|
||||||
elog(DEBUG1, "FinishDistributedExecution completed");
|
|
||||||
|
|
||||||
/* Log that the entire distributed execution process is complete */
|
|
||||||
elog(DEBUG1, "Distributed execution completed");
|
|
||||||
|
|
||||||
|
|
||||||
/* Log before running local execution */
|
|
||||||
if (executionParams->isUtilityCommand)
|
if (executionParams->isUtilityCommand)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Running local utility task list");
|
|
||||||
locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList);
|
locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Running local task list");
|
locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList,
|
||||||
locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, defaultTupleDest);
|
defaultTupleDest);
|
||||||
}
|
}
|
||||||
|
|
||||||
elog(DEBUG1, "Task execution completed, total rows processed: %lu", execution->rowsProcessed + locallyProcessedRows);
|
|
||||||
|
|
||||||
return execution->rowsProcessed + locallyProcessedRows;
|
return execution->rowsProcessed + locallyProcessedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2010,17 +1897,10 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
|
||||||
void
|
void
|
||||||
RunDistributedExecution(DistributedExecution *execution)
|
RunDistributedExecution(DistributedExecution *execution)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Starting RunDistributedExecution with %d unfinished tasks", execution->unfinishedTaskCount);
|
|
||||||
|
|
||||||
/* Assign tasks to connections or worker pool */
|
|
||||||
AssignTasksToConnectionsOrWorkerPool(execution);
|
AssignTasksToConnectionsOrWorkerPool(execution);
|
||||||
elog(DEBUG1, "Assigned tasks to connections or worker pool");
|
|
||||||
|
|
||||||
PG_TRY();
|
PG_TRY();
|
||||||
{
|
{
|
||||||
/* Log before stepping state machines */
|
|
||||||
elog(DEBUG1, "Stepping state machines for all sessions");
|
|
||||||
|
|
||||||
/* Preemptively step state machines in case of immediate errors */
|
/* Preemptively step state machines in case of immediate errors */
|
||||||
WorkerSession *session = NULL;
|
WorkerSession *session = NULL;
|
||||||
foreach_declared_ptr(session, execution->sessionList)
|
foreach_declared_ptr(session, execution->sessionList)
|
||||||
|
|
@ -2033,9 +1913,23 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
/* always (re)build the wait event set the first time */
|
/* always (re)build the wait event set the first time */
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
elog(DEBUG1, "Entering task/event loop with unfinishedTaskCount: %d", execution->unfinishedTaskCount);
|
/*
|
||||||
|
* Iterate until all the tasks are finished. Once all the tasks
|
||||||
/* Iterate until all the tasks are finished */
|
* are finished, ensure that all the connection initializations
|
||||||
|
* are also finished. Otherwise, those connections are terminated
|
||||||
|
* abruptly before they are established (or failed). Instead, we let
|
||||||
|
* the ConnectionStateMachine() to properly handle them.
|
||||||
|
*
|
||||||
|
* Note that we could have the connections that are not established
|
||||||
|
* as a side effect of slow-start algorithm. At the time the algorithm
|
||||||
|
* decides to establish new connections, the execution might have tasks
|
||||||
|
* to finish. But, the execution might finish before the new connections
|
||||||
|
* are established.
|
||||||
|
*
|
||||||
|
* Note that the rules explained above could be overriden by any
|
||||||
|
* cancellation to the query. In that case, we terminate the execution
|
||||||
|
* irrespective of the current status of the tasks or the connections.
|
||||||
|
*/
|
||||||
while (!cancellationReceived &&
|
while (!cancellationReceived &&
|
||||||
(execution->unfinishedTaskCount > 0 ||
|
(execution->unfinishedTaskCount > 0 ||
|
||||||
HasIncompleteConnectionEstablishment(execution)))
|
HasIncompleteConnectionEstablishment(execution)))
|
||||||
|
|
@ -2049,13 +1943,14 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
bool skipWaitEvents = false;
|
bool skipWaitEvents = false;
|
||||||
if (execution->remoteTaskList == NIL)
|
if (execution->remoteTaskList == NIL)
|
||||||
{
|
{
|
||||||
/* Log when all tasks have failed over to local execution */
|
/*
|
||||||
elog(DEBUG1, "All tasks failed over to local execution");
|
* All the tasks are failed over to the local execution, no need
|
||||||
|
* to wait for any connection activity.
|
||||||
|
*/
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (execution->rebuildWaitEventSet)
|
else if (execution->rebuildWaitEventSet)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Rebuilding wait event set");
|
|
||||||
RebuildWaitEventSet(execution);
|
RebuildWaitEventSet(execution);
|
||||||
|
|
||||||
skipWaitEvents =
|
skipWaitEvents =
|
||||||
|
|
@ -2063,7 +1958,6 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
else if (execution->waitFlagsChanged)
|
else if (execution->waitFlagsChanged)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Rebuilding wait event set flags");
|
|
||||||
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
|
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
|
||||||
execution->waitFlagsChanged = false;
|
execution->waitFlagsChanged = false;
|
||||||
|
|
||||||
|
|
@ -2073,35 +1967,35 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
|
|
||||||
if (skipWaitEvents)
|
if (skipWaitEvents)
|
||||||
{
|
{
|
||||||
elog(DEBUG1, "Skipping wait events due to failure, retrying");
|
/*
|
||||||
|
* Some operation on the wait event set is failed, retry
|
||||||
|
* as we already removed the problematic connections.
|
||||||
|
*/
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Log before waiting for I/O events */
|
/* wait for I/O events */
|
||||||
long timeout = NextEventTimeout(execution);
|
long timeout = NextEventTimeout(execution);
|
||||||
|
|
||||||
int eventCount =
|
int eventCount =
|
||||||
WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
|
WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
|
||||||
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
|
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
|
||||||
|
|
||||||
|
|
||||||
ProcessWaitEvents(execution, execution->events, eventCount,
|
ProcessWaitEvents(execution, execution->events, eventCount,
|
||||||
&cancellationReceived);
|
&cancellationReceived);
|
||||||
}
|
}
|
||||||
|
|
||||||
elog(DEBUG1, "Finished task/event loop, cleaning up");
|
|
||||||
|
|
||||||
/* Clean up after distributed execution */
|
|
||||||
FreeExecutionWaitEvents(execution);
|
FreeExecutionWaitEvents(execution);
|
||||||
|
|
||||||
CleanUpSessions(execution);
|
CleanUpSessions(execution);
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
/* Log in case of errors */
|
/*
|
||||||
elog(DEBUG1, "Error occurred, unclaiming all session connections");
|
* We can still recover from error using ROLLBACK TO SAVEPOINT,
|
||||||
|
* unclaim all connections to allow that.
|
||||||
|
*/
|
||||||
UnclaimAllSessionConnections(execution->sessionList);
|
UnclaimAllSessionConnections(execution->sessionList);
|
||||||
|
|
||||||
FreeExecutionWaitEvents(execution);
|
FreeExecutionWaitEvents(execution);
|
||||||
|
|
@ -2112,7 +2006,6 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ProcessSessionsWithFailedWaitEventSetOperations goes over the session list
|
* ProcessSessionsWithFailedWaitEventSetOperations goes over the session list
|
||||||
* and processes sessions with failed wait event set operations.
|
* and processes sessions with failed wait event set operations.
|
||||||
|
|
@ -4011,6 +3904,10 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SendNextQuery sends the next query for placementExecution on the given
|
||||||
|
* session.
|
||||||
|
*/
|
||||||
static bool
|
static bool
|
||||||
SendNextQuery(TaskPlacementExecution *placementExecution,
|
SendNextQuery(TaskPlacementExecution *placementExecution,
|
||||||
WorkerSession *session)
|
WorkerSession *session)
|
||||||
|
|
@ -4026,62 +3923,26 @@ SendNextQuery(TaskPlacementExecution *placementExecution,
|
||||||
int querySent = 0;
|
int querySent = 0;
|
||||||
uint32 queryIndex = placementExecution->queryIndex;
|
uint32 queryIndex = placementExecution->queryIndex;
|
||||||
|
|
||||||
elog(DEBUG1, "Sending next query: queryIndex = %d, task->queryCount = %d", queryIndex, task->queryCount);
|
|
||||||
|
|
||||||
Assert(queryIndex < task->queryCount);
|
Assert(queryIndex < task->queryCount);
|
||||||
char *queryString = TaskQueryStringAtIndex(task, queryIndex);
|
char *queryString = TaskQueryStringAtIndex(task, queryIndex);
|
||||||
|
|
||||||
elog(DEBUG1, "Query to be sent: %s", queryString);
|
|
||||||
|
|
||||||
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "ParamListInfo is not null and parameters not resolved");
|
|
||||||
}
|
|
||||||
else if (paramListInfo == NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "ParamListInfo is NULL");
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Task parameters are already resolved");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
|
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
|
||||||
{
|
{
|
||||||
int parameterCount = paramListInfo->numParams;
|
int parameterCount = paramListInfo->numParams;
|
||||||
Oid *parameterTypes = NULL;
|
Oid *parameterTypes = NULL;
|
||||||
const char **parameterValues = NULL;
|
const char **parameterValues = NULL;
|
||||||
|
|
||||||
elog(DEBUG1, "ParamListInfo is not null, parameterCount: %d", parameterCount);
|
|
||||||
|
|
||||||
/* force evaluation of bound params */
|
/* force evaluation of bound params */
|
||||||
paramListInfo = copyParamList(paramListInfo);
|
paramListInfo = copyParamList(paramListInfo);
|
||||||
|
|
||||||
/* Log the start of parameter extraction */
|
|
||||||
elog(DEBUG1, "Extracting parameters for remote execution");
|
|
||||||
|
|
||||||
ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes,
|
ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes,
|
||||||
¶meterValues);
|
¶meterValues);
|
||||||
|
|
||||||
/* Log extracted parameter values and types */
|
|
||||||
for (int i = 0; i < parameterCount; i++)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Parameter %d: Type = %d, Value = %s", i + 1, parameterTypes[i], parameterValues[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Send the remote command with parameters */
|
|
||||||
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
|
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
|
||||||
parameterTypes, parameterValues,
|
parameterTypes, parameterValues,
|
||||||
binaryResults);
|
binaryResults);
|
||||||
|
|
||||||
elog(DEBUG1, "Query sent with parameters, result = %d", querySent);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* If no parameters, send the query without params */
|
|
||||||
elog(DEBUG1, "Sending query without parameters");
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only need to use SendRemoteCommandParams when we desire
|
* We only need to use SendRemoteCommandParams when we desire
|
||||||
* binaryResults. One downside of SendRemoteCommandParams is that it
|
* binaryResults. One downside of SendRemoteCommandParams is that it
|
||||||
|
|
|
||||||
|
|
@ -45,8 +45,6 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo,
|
||||||
{
|
{
|
||||||
int parameterCount = paramListInfo->numParams;
|
int parameterCount = paramListInfo->numParams;
|
||||||
|
|
||||||
elog(DEBUG1, "Extracting %d parameters from ParamListInfo", parameterCount);
|
|
||||||
|
|
||||||
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
|
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
|
||||||
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
|
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
|
||||||
|
|
||||||
|
|
@ -57,48 +55,49 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo,
|
||||||
Oid typeOutputFunctionId = InvalidOid;
|
Oid typeOutputFunctionId = InvalidOid;
|
||||||
bool variableLengthType = false;
|
bool variableLengthType = false;
|
||||||
|
|
||||||
/* Log parameter type */
|
|
||||||
elog(DEBUG1, "Processing parameter %d, type: %d", parameterIndex + 1, parameterData->ptype);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Use 0 for data types where the oid values can be different on
|
* Use 0 for data types where the oid values can be different on
|
||||||
* the coordinator and worker nodes.
|
* the coordinator and worker nodes. Therefore, the worker nodes can
|
||||||
|
* infer the correct oid.
|
||||||
*/
|
*/
|
||||||
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
|
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
|
||||||
{
|
{
|
||||||
(*parameterTypes)[parameterIndex] = 0;
|
(*parameterTypes)[parameterIndex] = 0;
|
||||||
elog(DEBUG1, "Using default OID (0) for parameter %d", parameterIndex + 1);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
(*parameterTypes)[parameterIndex] = parameterData->ptype;
|
(*parameterTypes)[parameterIndex] = parameterData->ptype;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle unreferenced parameter */
|
/*
|
||||||
|
* If the parameter is not referenced / used (ptype == 0) and
|
||||||
|
* would otherwise have errored out inside standard_planner()),
|
||||||
|
* don't pass a value to the remote side, and pass text oid to prevent
|
||||||
|
* undetermined data type errors on workers.
|
||||||
|
*/
|
||||||
if (parameterData->ptype == 0)
|
if (parameterData->ptype == 0)
|
||||||
{
|
{
|
||||||
(*parameterValues)[parameterIndex] = NULL;
|
(*parameterValues)[parameterIndex] = NULL;
|
||||||
(*parameterTypes)[parameterIndex] = TEXTOID;
|
(*parameterTypes)[parameterIndex] = TEXTOID;
|
||||||
|
|
||||||
elog(DEBUG1, "Parameter %d has ptype 0, setting TEXTOID", parameterIndex + 1);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handle NULL parameter */
|
/*
|
||||||
|
* If the parameter is NULL then we preserve its type, but
|
||||||
|
* don't need to evaluate its value.
|
||||||
|
*/
|
||||||
if (parameterData->isnull)
|
if (parameterData->isnull)
|
||||||
{
|
{
|
||||||
(*parameterValues)[parameterIndex] = NULL;
|
(*parameterValues)[parameterIndex] = NULL;
|
||||||
elog(DEBUG1, "Parameter %d is NULL", parameterIndex + 1);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Log the type output function */
|
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
|
||||||
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, &variableLengthType);
|
&variableLengthType);
|
||||||
elog(DEBUG1, "Type output function ID for parameter %d: %u", parameterIndex + 1, typeOutputFunctionId);
|
|
||||||
|
|
||||||
/* Log the parameter value */
|
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
|
||||||
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, parameterData->value);
|
parameterData->value);
|
||||||
elog(DEBUG1, "Parameter %d value after output function call: %s", parameterIndex + 1, (*parameterValues)[parameterIndex]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,6 @@
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "miscadmin.h" // for elog functions
|
|
||||||
|
|
||||||
/* controlled via a GUC */
|
/* controlled via a GUC */
|
||||||
bool EnableLocalExecution = true;
|
bool EnableLocalExecution = true;
|
||||||
|
|
@ -212,39 +211,6 @@ ExecuteLocalTaskListExtended(List *taskList,
|
||||||
TupleDestination *defaultTupleDest,
|
TupleDestination *defaultTupleDest,
|
||||||
bool isUtilityCommand)
|
bool isUtilityCommand)
|
||||||
{
|
{
|
||||||
/* Log the size of the task list */
|
|
||||||
elog(DEBUG1, "Executing local task list, number of tasks: %d", list_length(taskList));
|
|
||||||
|
|
||||||
/* Log the distributed plan type */
|
|
||||||
if (distributedPlan != NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Distributed plan type: %d", nodeTag(distributedPlan));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Distributed plan is NULL");
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Log if the command is a utility command */
|
|
||||||
elog(DEBUG1, "Is Utility Command: %d", isUtilityCommand);
|
|
||||||
|
|
||||||
/* Log the parameters */
|
|
||||||
if (orig_paramListInfo != NULL)
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Original ParamListInfo has %d params", orig_paramListInfo->numParams);
|
|
||||||
|
|
||||||
/* Optionally log details of each parameter */
|
|
||||||
for (int i = 0; i < orig_paramListInfo->numParams; i++)
|
|
||||||
{
|
|
||||||
ParamExternData param = orig_paramListInfo->params[i];
|
|
||||||
elog(DEBUG1, "Param %d has type OID %d", i + 1, param.ptype);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
elog(DEBUG1, "Original ParamListInfo is NULL");
|
|
||||||
}
|
|
||||||
|
|
||||||
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
|
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
|
||||||
uint64 totalRowsProcessed = 0;
|
uint64 totalRowsProcessed = 0;
|
||||||
int numParams = 0;
|
int numParams = 0;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue