From ecc67255ec9adf8efcbd4f3e3b8c6e4412b43cec Mon Sep 17 00:00:00 2001 From: Mehmet Yilmaz Date: Mon, 21 Jul 2025 07:39:03 +0000 Subject: [PATCH] restore --- src/backend/distributed/commands/call.c | 228 +--------------- .../distributed/executor/adaptive_executor.c | 255 ++++-------------- .../executor/executor_util_params.c | 35 ++- .../distributed/executor/local_executor.c | 34 --- 4 files changed, 78 insertions(+), 474 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index ed63b3a5b..9e54513c6 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -48,7 +48,6 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" -#include "miscadmin.h" // for elog functions /* global variable tracking whether we are in a delegated procedure call */ @@ -64,59 +63,6 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) FuncExpr *funcExpr = callStmt->funcexpr; Oid functionId = funcExpr->funcid; - /* Log the function ID being called */ - elog(DEBUG1, "Calling distributed procedure with functionId: %u", functionId); - - /* Log additional details from CallStmt */ - if (callStmt->funcexpr != NULL) - { - elog(DEBUG1, "Function expression type: %d", nodeTag(callStmt->funcexpr)); - } - - if (funcExpr->args != NIL) - { - ListCell *lc; - int argIndex = 0; - foreach(lc, funcExpr->args) - { - Node *arg = (Node *) lfirst(lc); - elog(DEBUG1, "Argument %d: NodeTag: %d", argIndex, nodeTag(arg)); - argIndex++; - } - } - else - { - elog(DEBUG1, "No arguments in the function expression"); - } - - ListCell *argCell1; - int argIndex1 = 0; - - /* Iterate over the arguments and log them */ - foreach(argCell1, callStmt->funcexpr->args) - { - Node *argNode = (Node *) lfirst(argCell1); - - // Check if the node is valid - if (argNode != NULL) - { - // Use nodeToString() to convert the node into a string representation for debugging - char *argStr = nodeToString(argNode); - - // Log the argument index and its string representation - elog(DEBUG1, "Argument %d: %s", argIndex1, argStr); - - // Free the string memory after logging (it's a good practice to avoid memory leaks) - pfree(argStr); - } - else - { - elog(DEBUG1, "Argument %d: (null)", argIndex1); - } - argIndex1++; - } - - DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); if (procedure == NULL || !procedure->isDistributed) @@ -221,7 +167,6 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt)); { - elog(DEBUG1, "Generated CALL statement: %s", callCommand->data); Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); TupleTableSlot *slot = MakeSingleTupleTableSlot(tupleDesc, @@ -250,186 +195,19 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) }; EnableWorkerMessagePropagation(); - elog(DEBUG1, "Worker message propagation enabled"); bool localExecutionSupported = true; - elog(DEBUG1, "Local execution supported: %d", localExecutionSupported); - - /* Log task details */ - elog(DEBUG1, "Creating execution params for task"); - - /* Create execution parameters */ ExecutionParams *executionParams = CreateBasicExecutionParams( ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, localExecutionSupported - ); - - const char* NodeTagToString(NodeTag tag) - { - switch (tag) - { - case T_Var: return "Var"; - case T_Const: return "Const"; - case T_Param: return "Param"; - case T_FuncExpr: return "FuncExpr"; - case T_OpExpr: return "OpExpr"; - case T_BoolExpr: return "BoolExpr"; - case T_Aggref: return "Aggref"; - case T_WindowFunc: return "WindowFunc"; - case T_SubLink: return "SubLink"; - case T_CoalesceExpr: return "CoalesceExpr"; - case T_CaseExpr: return "CaseExpr"; - case T_NullTest: return "NullTest"; - case T_CollateExpr: return "CollateExpr"; - case T_FieldSelect: return "FieldSelect"; - case T_FieldStore: return "FieldStore"; - case T_SubPlan: return "SubPlan"; - default: return "Unknown"; - } - } - - // Create a ParamListInfo structure - List *argList = funcExpr->args; // Extract arguments from the function expression - int paramCount = list_length(argList); // Get the number of arguments - ParamListInfo paramListInfo = makeParamList(paramCount); // Create ParamListInfo structure - - // Loop through the argument list and populate ParamListInfo - int paramIndex = 0; - ListCell *argCell; - foreach(argCell, argList) - { - Node *argNode = (Node *) lfirst(argCell); - - // Log the type of the argument - NodeTag nodeType = nodeTag(argNode); - elog(DEBUG1, "Processing argument at index %d of type: %s", paramIndex, NodeTagToString(nodeType)); - - - if (IsA(argNode, Const)) - { - Const *constArg = (Const *) argNode; - paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type - paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value - paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null - - // Log the constant parameter's type, value, and null status - elog(DEBUG1, "Populating ParamListInfo with constant parameter: paramIndex: %d, paramType: %d, isNull: %s", - paramIndex, paramListInfo->params[paramIndex].ptype, - constArg->constisnull ? "true" : "false"); - } - else if (IsA(argNode, Param)) - { - Param *paramArg = (Param *) argNode; - - // Set the parameter type - paramListInfo->params[paramIndex].ptype = paramArg->paramtype; - - // Fetch the value of the parameter if necessary - if (paramListInfo->paramFetch != NULL) - { - ParamExternData paramData; - paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData); - - // Log the fetched parameter details - elog(DEBUG1, "paramFetch for paramId: %d returned value: %d, type: %d, isNull: %s", - paramArg->paramid, DatumGetInt32(paramData.value), paramData.ptype, - paramData.isnull ? "true" : "false"); - - paramListInfo->params[paramIndex].value = paramData.value; - paramListInfo->params[paramIndex].isnull = paramData.isnull; - - // Log fetched value and type - elog(DEBUG1, "Fetched dynamic parameter: paramIndex: %d, paramType: %d, paramValue: %d", - paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); - } - else - { - // Handle the case where paramFetch is NULL - elog(DEBUG1, "Could not fetch value for parameter: %d", paramArg->paramid); - } - } - else if (IsA(argNode, FuncExpr)) - { - // FuncExpr *funcExpr = (FuncExpr *) argNode; - - // Log function expression details - elog(DEBUG1, "Processing function expression: funcid: %d", funcExpr->funcid); - - // Iterate through the arguments of the function expression - ListCell *funcArgCell; - foreach(funcArgCell, funcExpr->args) - { - Node *funcArgNode = (Node *) lfirst(funcArgCell); - - // Check if the argument is a Param or Const - if (IsA(funcArgNode, Param)) - { - Param *paramArg = (Param *) funcArgNode; - - // Fetch the parameter value (same as your param-fetch logic) - ParamExternData paramData; - paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData); - - // Populate ParamListInfo with fetched param - paramListInfo->params[paramIndex].ptype = paramArg->paramtype; - paramListInfo->params[paramIndex].value = paramData.value; - paramListInfo->params[paramIndex].isnull = paramData.isnull; - - // Log fetched parameter details - elog(DEBUG1, "Populating ParamListInfo with fetched parameter: paramIndex: %d, paramType: %d, paramValue: %d", - paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); - } - else if (IsA(funcArgNode, Const)) - { - Const *constArg = (Const *) funcArgNode; - - // Handle Const values within the function expression - paramListInfo->params[paramIndex].ptype = constArg->consttype; - paramListInfo->params[paramIndex].value = constArg->constvalue; - paramListInfo->params[paramIndex].isnull = constArg->constisnull; - - // Log constant parameter - elog(DEBUG1, "Populating ParamListInfo with constant parameter inside function expression: paramIndex: %d, paramType: %d", - paramIndex, paramListInfo->params[paramIndex].ptype); - } - else - { - elog(DEBUG1, "Unsupported argument type in function expression at paramIndex: %d", paramIndex); - } - } - } - else - { - // Handle other cases if necessary - elog(DEBUG1, "Unsupported argument type at paramIndex: %d", paramIndex); - } - - // Log populated parameters - // elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d", - // paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); - - paramIndex++; - } - - - - /* Set tuple destination and execution properties */ - executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, tupleDesc); + ); + executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, + tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; executionParams->isUtilityCommand = true; - executionParams->paramListInfo = paramListInfo; - - /* Log before executing task list */ - elog(DEBUG1, "Executing task list with ExecuteTaskListExtended"); - - /* Execute the task list */ ExecuteTaskListExtended(executionParams); - /* Log after task execution */ - elog(DEBUG1, "Task list execution completed"); - - DisableWorkerMessagePropagation(); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 3c4a68125..895f01ae7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -769,51 +769,16 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) /* * AdaptiveExecutor is called via CitusExecScan on the * first call of CitusExecScan. The function fills the tupleStore - * of the input scanState. + * of the input scanScate. */ TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState) { - /* Log entry into the function */ - elog(DEBUG1, "Entering AdaptiveExecutor"); - TupleTableSlot *resultSlot = NULL; DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; - - /* Log details about the distributed plan */ - if (distributedPlan != NULL) - { - elog(DEBUG1, "Distributed plan modLevel: %d", distributedPlan->modLevel); - } - else - { - elog(DEBUG1, "Distributed plan is NULL"); - } - - /* Log details about paramListInfo */ - if (paramListInfo != NULL) - { - elog(DEBUG1, "paramListInfo is populated with %d parameters", paramListInfo->numParams); - - /* Log each parameter */ - for (int i = 0; i < paramListInfo->numParams; i++) - { - ParamExternData *param = ¶mListInfo->params[i]; - elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d", - i + 1, param->ptype, param->isnull); - if (!param->isnull) - { - elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value); - } - } - } - else - { - elog(DEBUG1, "paramListInfo is NULL"); - } bool randomAccess = true; bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; @@ -881,53 +846,16 @@ AdaptiveExecutor(CitusScanState *scanState) distributedPlan->modLevel, taskList, excludeFromXact); /* - * In some rare cases, we have prepared statements that pass a parameter - * and never used in the query, mark such parameters' type as Invalid(0), - * which will be used later in ExtractParametersFromParamList() to map them - * to a generic datatype. Skip for dynamic parameters. - */ + * In some rare cases, we have prepared statements that pass a parameter + * and never used in the query, mark such parameters' type as Invalid(0), + * which will be used later in ExtractParametersFromParamList() to map them + * to a generic datatype. Skip for dynamic parameters. + */ if (paramListInfo && !paramListInfo->paramFetch) { - /* Copy the ParamListInfo for safety */ paramListInfo = copyParamList(paramListInfo); - - /* Mark any unreferenced parameters */ MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo); - - /* Debug: Log the parameters before proceeding */ - elog(DEBUG1, "paramListInfo: number of parameters = %d", paramListInfo->numParams); - - /* Log details of each parameter */ - for (int i = 0; i < paramListInfo->numParams; i++) - { - ParamExternData *param = ¶mListInfo->params[i]; - elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d", - i + 1, param->ptype, param->isnull); - if (!param->isnull) - { - elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value); - } - } } - else - { - /* If paramListInfo is NULL or paramFetch is true, log that information */ - if (paramListInfo == NULL) - { - elog(DEBUG1, "paramListInfo is NULL"); - } - else if (paramListInfo->paramFetch) - { - elog(DEBUG1, "paramListInfo uses dynamic parameter fetching"); - } - } - - /* Log the parameters before creating the distributed execution */ - elog(DEBUG1, "Creating DistributedExecution with modLevel: %d, taskList size: %d", - distributedPlan->modLevel, list_length(taskList)); - - elog(DEBUG1, "Target pool size: %d, Local execution supported: %d", - targetPoolSize, localExecutionSupported); DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, @@ -939,9 +867,6 @@ AdaptiveExecutor(CitusScanState *scanState) jobIdList, localExecutionSupported); - - - /* * Make sure that we acquire the appropriate locks even if the local tasks * are going to be executed with local execution. @@ -992,13 +917,6 @@ AdaptiveExecutor(CitusScanState *scanState) static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { - /* Log entry into the function */ - elog(DEBUG1, "Entering RunLocalExecution"); - - /* Log scan state and execution details */ - elog(DEBUG1, "ScanState: distributedPlan type: %d", nodeTag(scanState->distributedPlan)); - elog(DEBUG1, "Local task list length: %d", list_length(execution->localTaskList)); - EState *estate = ScanStateGetExecutorState(scanState); bool isUtilityCommand = false; uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, @@ -1167,83 +1085,52 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, uint64 ExecuteTaskListExtended(ExecutionParams *executionParams) { - /* Log task list length */ - elog(DEBUG1, "Starting ExecuteTaskListExtended, number of tasks: %d", list_length(executionParams->taskList)); - /* if there are no tasks to execute, we can return early */ if (list_length(executionParams->taskList) == 0) { - elog(DEBUG1, "No tasks to execute, returning early"); return 0; } uint64 locallyProcessedRows = 0; - TupleDestination *defaultTupleDest = executionParams->tupleDestination; - /* Log connection type */ - elog(DEBUG1, "MultiShardConnectionType: %d", MultiShardConnectionType); + TupleDestination *defaultTupleDest = executionParams->tupleDestination; if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { - elog(DEBUG1, "Switching to sequential connection mode"); executionParams->targetPoolSize = 1; } - /* Log before creating distributed execution */ - elog(DEBUG1, "Creating distributed execution for task list"); + DistributedExecution *execution = + CreateDistributedExecution( + executionParams->modLevel, executionParams->taskList, + executionParams->paramListInfo, executionParams->targetPoolSize, + defaultTupleDest, &executionParams->xactProperties, + executionParams->jobIdList, executionParams->localExecutionSupported); - DistributedExecution *execution = CreateDistributedExecution( - executionParams->modLevel, executionParams->taskList, - executionParams->paramListInfo, executionParams->targetPoolSize, - defaultTupleDest, &executionParams->xactProperties, - executionParams->jobIdList, executionParams->localExecutionSupported); - - /* Log details of created execution */ - elog(DEBUG1, "DistributedExecution created: %d tasks, local execution supported: %d", - list_length(execution->remoteTaskList), execution->localExecutionSupported); - - /* Ensure local execution state is compatible */ + /* + * If current transaction accessed local placements and task list includes + * tasks that should be executed locally (accessing any of the local placements), + * then we should error out as it would cause inconsistencies across the + * remote connection and local execution. + */ EnsureCompatibleLocalExecutionState(execution->remoteTaskList); - /* Log before running distributed execution */ - elog(DEBUG1, "Starting distributed execution"); - - /* Start the distributed execution */ + /* run the remote execution */ StartDistributedExecution(execution); - - /* Log after StartDistributedExecution */ - elog(DEBUG1, "StartDistributedExecution completed"); - - /* Run the distributed execution */ RunDistributedExecution(execution); - - /* Log after RunDistributedExecution */ - elog(DEBUG1, "RunDistributedExecution completed"); - - /* Finish the distributed execution */ FinishDistributedExecution(execution); - /* Log after FinishDistributedExecution */ - elog(DEBUG1, "FinishDistributedExecution completed"); - - /* Log that the entire distributed execution process is complete */ -elog(DEBUG1, "Distributed execution completed"); - - - /* Log before running local execution */ + /* now, switch back to the local execution */ if (executionParams->isUtilityCommand) { - elog(DEBUG1, "Running local utility task list"); locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList); } else { - elog(DEBUG1, "Running local task list"); - locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, defaultTupleDest); + locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, + defaultTupleDest); } - elog(DEBUG1, "Task execution completed, total rows processed: %lu", execution->rowsProcessed + locallyProcessedRows); - return execution->rowsProcessed + locallyProcessedRows; } @@ -2010,17 +1897,10 @@ SequentialRunDistributedExecution(DistributedExecution *execution) void RunDistributedExecution(DistributedExecution *execution) { - elog(DEBUG1, "Starting RunDistributedExecution with %d unfinished tasks", execution->unfinishedTaskCount); - - /* Assign tasks to connections or worker pool */ AssignTasksToConnectionsOrWorkerPool(execution); - elog(DEBUG1, "Assigned tasks to connections or worker pool"); PG_TRY(); { - /* Log before stepping state machines */ - elog(DEBUG1, "Stepping state machines for all sessions"); - /* Preemptively step state machines in case of immediate errors */ WorkerSession *session = NULL; foreach_declared_ptr(session, execution->sessionList) @@ -2033,9 +1913,23 @@ RunDistributedExecution(DistributedExecution *execution) /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; - elog(DEBUG1, "Entering task/event loop with unfinishedTaskCount: %d", execution->unfinishedTaskCount); - - /* Iterate until all the tasks are finished */ + /* + * Iterate until all the tasks are finished. Once all the tasks + * are finished, ensure that all the connection initializations + * are also finished. Otherwise, those connections are terminated + * abruptly before they are established (or failed). Instead, we let + * the ConnectionStateMachine() to properly handle them. + * + * Note that we could have the connections that are not established + * as a side effect of slow-start algorithm. At the time the algorithm + * decides to establish new connections, the execution might have tasks + * to finish. But, the execution might finish before the new connections + * are established. + * + * Note that the rules explained above could be overriden by any + * cancellation to the query. In that case, we terminate the execution + * irrespective of the current status of the tasks or the connections. + */ while (!cancellationReceived && (execution->unfinishedTaskCount > 0 || HasIncompleteConnectionEstablishment(execution))) @@ -2049,13 +1943,14 @@ RunDistributedExecution(DistributedExecution *execution) bool skipWaitEvents = false; if (execution->remoteTaskList == NIL) { - /* Log when all tasks have failed over to local execution */ - elog(DEBUG1, "All tasks failed over to local execution"); + /* + * All the tasks are failed over to the local execution, no need + * to wait for any connection activity. + */ continue; } else if (execution->rebuildWaitEventSet) { - elog(DEBUG1, "Rebuilding wait event set"); RebuildWaitEventSet(execution); skipWaitEvents = @@ -2063,7 +1958,6 @@ RunDistributedExecution(DistributedExecution *execution) } else if (execution->waitFlagsChanged) { - elog(DEBUG1, "Rebuilding wait event set flags"); RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; @@ -2073,35 +1967,35 @@ RunDistributedExecution(DistributedExecution *execution) if (skipWaitEvents) { - elog(DEBUG1, "Skipping wait events due to failure, retrying"); + /* + * Some operation on the wait event set is failed, retry + * as we already removed the problematic connections. + */ execution->rebuildWaitEventSet = true; continue; } - /* Log before waiting for I/O events */ + /* wait for I/O events */ long timeout = NextEventTimeout(execution); - int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, execution->eventSetSize, WAIT_EVENT_CLIENT_READ); - ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived); } - elog(DEBUG1, "Finished task/event loop, cleaning up"); - - /* Clean up after distributed execution */ FreeExecutionWaitEvents(execution); + CleanUpSessions(execution); } PG_CATCH(); { - /* Log in case of errors */ - elog(DEBUG1, "Error occurred, unclaiming all session connections"); - + /* + * We can still recover from error using ROLLBACK TO SAVEPOINT, + * unclaim all connections to allow that. + */ UnclaimAllSessionConnections(execution->sessionList); FreeExecutionWaitEvents(execution); @@ -2112,7 +2006,6 @@ RunDistributedExecution(DistributedExecution *execution) } - /* * ProcessSessionsWithFailedWaitEventSetOperations goes over the session list * and processes sessions with failed wait event set operations. @@ -4011,6 +3904,10 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, } +/* + * SendNextQuery sends the next query for placementExecution on the given + * session. + */ static bool SendNextQuery(TaskPlacementExecution *placementExecution, WorkerSession *session) @@ -4026,62 +3923,26 @@ SendNextQuery(TaskPlacementExecution *placementExecution, int querySent = 0; uint32 queryIndex = placementExecution->queryIndex; - elog(DEBUG1, "Sending next query: queryIndex = %d, task->queryCount = %d", queryIndex, task->queryCount); - Assert(queryIndex < task->queryCount); char *queryString = TaskQueryStringAtIndex(task, queryIndex); - elog(DEBUG1, "Query to be sent: %s", queryString); - - if (paramListInfo != NULL && !task->parametersInQueryStringResolved) - { - elog(DEBUG1, "ParamListInfo is not null and parameters not resolved"); - } - else if (paramListInfo == NULL) - { - elog(DEBUG1, "ParamListInfo is NULL"); - } - else - { - elog(DEBUG1, "Task parameters are already resolved"); - } - - if (paramListInfo != NULL && !task->parametersInQueryStringResolved) { int parameterCount = paramListInfo->numParams; Oid *parameterTypes = NULL; const char **parameterValues = NULL; - elog(DEBUG1, "ParamListInfo is not null, parameterCount: %d", parameterCount); - /* force evaluation of bound params */ paramListInfo = copyParamList(paramListInfo); - /* Log the start of parameter extraction */ - elog(DEBUG1, "Extracting parameters for remote execution"); - ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, ¶meterValues); - - /* Log extracted parameter values and types */ - for (int i = 0; i < parameterCount; i++) - { - elog(DEBUG1, "Parameter %d: Type = %d, Value = %s", i + 1, parameterTypes[i], parameterValues[i]); - } - - /* Send the remote command with parameters */ querySent = SendRemoteCommandParams(connection, queryString, parameterCount, parameterTypes, parameterValues, binaryResults); - - elog(DEBUG1, "Query sent with parameters, result = %d", querySent); } else { - /* If no parameters, send the query without params */ - elog(DEBUG1, "Sending query without parameters"); - /* * We only need to use SendRemoteCommandParams when we desire * binaryResults. One downside of SendRemoteCommandParams is that it diff --git a/src/backend/distributed/executor/executor_util_params.c b/src/backend/distributed/executor/executor_util_params.c index 2c3c0cb53..975654f22 100644 --- a/src/backend/distributed/executor/executor_util_params.c +++ b/src/backend/distributed/executor/executor_util_params.c @@ -45,8 +45,6 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo, { int parameterCount = paramListInfo->numParams; - elog(DEBUG1, "Extracting %d parameters from ParamListInfo", parameterCount); - *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); @@ -57,48 +55,49 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid typeOutputFunctionId = InvalidOid; bool variableLengthType = false; - /* Log parameter type */ - elog(DEBUG1, "Processing parameter %d, type: %d", parameterIndex + 1, parameterData->ptype); - /* * Use 0 for data types where the oid values can be different on - * the coordinator and worker nodes. + * the coordinator and worker nodes. Therefore, the worker nodes can + * infer the correct oid. */ if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids) { (*parameterTypes)[parameterIndex] = 0; - elog(DEBUG1, "Using default OID (0) for parameter %d", parameterIndex + 1); } else { (*parameterTypes)[parameterIndex] = parameterData->ptype; } - /* Handle unreferenced parameter */ + /* + * If the parameter is not referenced / used (ptype == 0) and + * would otherwise have errored out inside standard_planner()), + * don't pass a value to the remote side, and pass text oid to prevent + * undetermined data type errors on workers. + */ if (parameterData->ptype == 0) { (*parameterValues)[parameterIndex] = NULL; (*parameterTypes)[parameterIndex] = TEXTOID; - elog(DEBUG1, "Parameter %d has ptype 0, setting TEXTOID", parameterIndex + 1); continue; } - /* Handle NULL parameter */ + /* + * If the parameter is NULL then we preserve its type, but + * don't need to evaluate its value. + */ if (parameterData->isnull) { (*parameterValues)[parameterIndex] = NULL; - elog(DEBUG1, "Parameter %d is NULL", parameterIndex + 1); + continue; } - /* Log the type output function */ - getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, &variableLengthType); - elog(DEBUG1, "Type output function ID for parameter %d: %u", parameterIndex + 1, typeOutputFunctionId); + getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, + &variableLengthType); - /* Log the parameter value */ - (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, parameterData->value); - elog(DEBUG1, "Parameter %d value after output function call: %s", parameterIndex + 1, (*parameterValues)[parameterIndex]); + (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, + parameterData->value); } } - diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 31178d163..2ced0a43f 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -108,7 +108,6 @@ #include "distributed/transaction_management.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" -#include "miscadmin.h" // for elog functions /* controlled via a GUC */ bool EnableLocalExecution = true; @@ -212,39 +211,6 @@ ExecuteLocalTaskListExtended(List *taskList, TupleDestination *defaultTupleDest, bool isUtilityCommand) { - /* Log the size of the task list */ - elog(DEBUG1, "Executing local task list, number of tasks: %d", list_length(taskList)); - - /* Log the distributed plan type */ - if (distributedPlan != NULL) - { - elog(DEBUG1, "Distributed plan type: %d", nodeTag(distributedPlan)); - } - else - { - elog(DEBUG1, "Distributed plan is NULL"); - } - - /* Log if the command is a utility command */ - elog(DEBUG1, "Is Utility Command: %d", isUtilityCommand); - - /* Log the parameters */ - if (orig_paramListInfo != NULL) - { - elog(DEBUG1, "Original ParamListInfo has %d params", orig_paramListInfo->numParams); - - /* Optionally log details of each parameter */ - for (int i = 0; i < orig_paramListInfo->numParams; i++) - { - ParamExternData param = orig_paramListInfo->params[i]; - elog(DEBUG1, "Param %d has type OID %d", i + 1, param.ptype); - } - } - else - { - elog(DEBUG1, "Original ParamListInfo is NULL"); - } - ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); uint64 totalRowsProcessed = 0; int numParams = 0;