From 31003204c6d5a449a69983dd4b1e11f03f563a97 Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 9 Sep 2024 07:34:46 +0000 Subject: [PATCH] debugs line added --- citus-tools | 1 + src/backend/distributed/commands/call.c | 108 +++++++- .../distributed/commands/utility_hook.c | 7 + .../distributed/executor/adaptive_executor.c | 255 ++++++++++++++---- .../executor/executor_util_params.c | 35 +-- .../distributed/executor/local_executor.c | 34 +++ .../planner/function_call_delegation.c | 21 +- src/test/regress/citus_tests/common.py | 15 ++ .../test/test_prepared_statements.py | 18 +- 9 files changed, 400 insertions(+), 94 deletions(-) create mode 160000 citus-tools diff --git a/citus-tools b/citus-tools new file mode 160000 index 000000000..3376bd684 --- /dev/null +++ b/citus-tools @@ -0,0 +1 @@ +Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 9e54513c6..45dd059fc 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -48,6 +48,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" +#include "miscadmin.h" // for elog functions /* global variable tracking whether we are in a delegated procedure call */ @@ -63,6 +64,32 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) FuncExpr *funcExpr = callStmt->funcexpr; Oid functionId = funcExpr->funcid; + /* Log the function ID being called */ + elog(DEBUG1, "Calling distributed procedure with functionId: %u", functionId); + + /* Log additional details from CallStmt */ + if (callStmt->funcexpr != NULL) + { + elog(DEBUG1, "Function expression type: %d", nodeTag(callStmt->funcexpr)); + } + + if (funcExpr->args != NIL) + { + ListCell *lc; + int argIndex = 0; + foreach(lc, funcExpr->args) + { + Node *arg = (Node *) lfirst(lc); + elog(DEBUG1, "Argument %d: NodeTag: %d", argIndex, nodeTag(arg)); + argIndex++; + } + } + else + { + elog(DEBUG1, "No arguments in the function expression"); + } + + DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); if (procedure == NULL || !procedure->isDistributed) @@ -167,6 +194,7 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt)); { + elog(DEBUG1, "Generated CALL statement: %s", callCommand->data); Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); TupleTableSlot *slot = MakeSingleTupleTableSlot(tupleDesc, @@ -195,19 +223,93 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) }; EnableWorkerMessagePropagation(); + elog(DEBUG1, "Worker message propagation enabled"); bool localExecutionSupported = true; + elog(DEBUG1, "Local execution supported: %d", localExecutionSupported); + + /* Log task details */ + elog(DEBUG1, "Creating execution params for task"); + + /* Create execution parameters */ ExecutionParams *executionParams = CreateBasicExecutionParams( ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, localExecutionSupported - ); - executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, - tupleDesc); + ); + + + // Create a ParamListInfo structure + List *argList = funcExpr->args; // Extract arguments from the function expression + int paramCount = list_length(argList); // Get the number of arguments + ParamListInfo paramListInfo = makeParamList(paramCount); // Create ParamListInfo structure + + // Loop through the argument list and populate ParamListInfo + int paramIndex = 0; + ListCell *argCell; + foreach(argCell, argList) + { + Node *argNode = (Node *) lfirst(argCell); + + if (IsA(argNode, Const)) + { + Const *constArg = (Const *) argNode; + paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type + paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value + paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null + } + else if (IsA(argNode, Param)) + { + Param *paramArg = (Param *) argNode; + + // Set the parameter type + paramListInfo->params[paramIndex].ptype = paramArg->paramtype; + + // Fetch the value of the parameter if necessary + if (paramListInfo->paramFetch != NULL) + { + ParamExternData paramData; + paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData); + + paramListInfo->params[paramIndex].value = paramData.value; + paramListInfo->params[paramIndex].isnull = paramData.isnull; + + // Log fetched value and type + elog(DEBUG1, "Fetched dynamic parameter: paramIndex: %d, paramType: %d, paramValue: %d", + paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); + } + else + { + // Handle the case where paramFetch is NULL + elog(ERROR, "Could not fetch value for parameter: %d", paramArg->paramid); + } + } + + // Log populated parameters + elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d", + paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); + + paramIndex++; + } + + + + /* Set tuple destination and execution properties */ + executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, tupleDesc); executionParams->expectResults = expectResults; executionParams->xactProperties = xactProperties; executionParams->isUtilityCommand = true; + executionParams->paramListInfo = paramListInfo; + + /* Log before executing task list */ + elog(DEBUG1, "Executing task list with ExecuteTaskListExtended"); + + /* Execute the task list */ ExecuteTaskListExtended(executionParams); + /* Log after task execution */ + elog(DEBUG1, "Task list execution completed"); + + DisableWorkerMessagePropagation(); while (tuplestore_gettupleslot(tupleStore, true, false, slot)) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 9426e13c0..d898d4ece 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -273,6 +273,13 @@ citus_ProcessUtility(PlannedStmt *pstmt, { CallStmt *callStmt = (CallStmt *) parsetree; + /* Log the entire parsetree as a string */ + elog(DEBUG1, "Parse Tree: %s", nodeToString(parsetree)); + + /* Log other information as before */ + elog(DEBUG1, "Processing CallStmt for procedure"); + elog(DEBUG1, "Procedure context: %d", context); + /* * If the procedure is distributed and we are using MX then we have the * possibility of calling it on the worker. If the data is located on diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e912f418d..642f894ef 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -770,16 +770,51 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState) /* * AdaptiveExecutor is called via CitusExecScan on the * first call of CitusExecScan. The function fills the tupleStore - * of the input scanScate. + * of the input scanState. */ TupleTableSlot * AdaptiveExecutor(CitusScanState *scanState) { + /* Log entry into the function */ + elog(DEBUG1, "Entering AdaptiveExecutor"); + TupleTableSlot *resultSlot = NULL; DistributedPlan *distributedPlan = scanState->distributedPlan; EState *executorState = ScanStateGetExecutorState(scanState); ParamListInfo paramListInfo = executorState->es_param_list_info; + + /* Log details about the distributed plan */ + if (distributedPlan != NULL) + { + elog(DEBUG1, "Distributed plan modLevel: %d", distributedPlan->modLevel); + } + else + { + elog(DEBUG1, "Distributed plan is NULL"); + } + + /* Log details about paramListInfo */ + if (paramListInfo != NULL) + { + elog(DEBUG1, "paramListInfo is populated with %d parameters", paramListInfo->numParams); + + /* Log each parameter */ + for (int i = 0; i < paramListInfo->numParams; i++) + { + ParamExternData *param = ¶mListInfo->params[i]; + elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d", + i + 1, param->ptype, param->isnull); + if (!param->isnull) + { + elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value); + } + } + } + else + { + elog(DEBUG1, "paramListInfo is NULL"); + } bool randomAccess = true; bool interTransactions = false; int targetPoolSize = MaxAdaptiveExecutorPoolSize; @@ -847,16 +882,53 @@ AdaptiveExecutor(CitusScanState *scanState) distributedPlan->modLevel, taskList, excludeFromXact); /* - * In some rare cases, we have prepared statements that pass a parameter - * and never used in the query, mark such parameters' type as Invalid(0), - * which will be used later in ExtractParametersFromParamList() to map them - * to a generic datatype. Skip for dynamic parameters. - */ + * In some rare cases, we have prepared statements that pass a parameter + * and never used in the query, mark such parameters' type as Invalid(0), + * which will be used later in ExtractParametersFromParamList() to map them + * to a generic datatype. Skip for dynamic parameters. + */ if (paramListInfo && !paramListInfo->paramFetch) { + /* Copy the ParamListInfo for safety */ paramListInfo = copyParamList(paramListInfo); + + /* Mark any unreferenced parameters */ MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo); + + /* Debug: Log the parameters before proceeding */ + elog(DEBUG1, "paramListInfo: number of parameters = %d", paramListInfo->numParams); + + /* Log details of each parameter */ + for (int i = 0; i < paramListInfo->numParams; i++) + { + ParamExternData *param = ¶mListInfo->params[i]; + elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d", + i + 1, param->ptype, param->isnull); + if (!param->isnull) + { + elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value); + } + } } + else + { + /* If paramListInfo is NULL or paramFetch is true, log that information */ + if (paramListInfo == NULL) + { + elog(DEBUG1, "paramListInfo is NULL"); + } + else if (paramListInfo->paramFetch) + { + elog(DEBUG1, "paramListInfo uses dynamic parameter fetching"); + } + } + + /* Log the parameters before creating the distributed execution */ + elog(DEBUG1, "Creating DistributedExecution with modLevel: %d, taskList size: %d", + distributedPlan->modLevel, list_length(taskList)); + + elog(DEBUG1, "Target pool size: %d, Local execution supported: %d", + targetPoolSize, localExecutionSupported); DistributedExecution *execution = CreateDistributedExecution( distributedPlan->modLevel, @@ -868,6 +940,9 @@ AdaptiveExecutor(CitusScanState *scanState) jobIdList, localExecutionSupported); + + + /* * Make sure that we acquire the appropriate locks even if the local tasks * are going to be executed with local execution. @@ -918,6 +993,13 @@ AdaptiveExecutor(CitusScanState *scanState) static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution) { + /* Log entry into the function */ + elog(DEBUG1, "Entering RunLocalExecution"); + + /* Log scan state and execution details */ + elog(DEBUG1, "ScanState: distributedPlan type: %d", nodeTag(scanState->distributedPlan)); + elog(DEBUG1, "Local task list length: %d", list_length(execution->localTaskList)); + EState *estate = ScanStateGetExecutorState(scanState); bool isUtilityCommand = false; uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList, @@ -1086,52 +1168,83 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList, uint64 ExecuteTaskListExtended(ExecutionParams *executionParams) { + /* Log task list length */ + elog(DEBUG1, "Starting ExecuteTaskListExtended, number of tasks: %d", list_length(executionParams->taskList)); + /* if there are no tasks to execute, we can return early */ if (list_length(executionParams->taskList) == 0) { + elog(DEBUG1, "No tasks to execute, returning early"); return 0; } uint64 locallyProcessedRows = 0; - TupleDestination *defaultTupleDest = executionParams->tupleDestination; + /* Log connection type */ + elog(DEBUG1, "MultiShardConnectionType: %d", MultiShardConnectionType); + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { + elog(DEBUG1, "Switching to sequential connection mode"); executionParams->targetPoolSize = 1; } - DistributedExecution *execution = - CreateDistributedExecution( - executionParams->modLevel, executionParams->taskList, - executionParams->paramListInfo, executionParams->targetPoolSize, - defaultTupleDest, &executionParams->xactProperties, - executionParams->jobIdList, executionParams->localExecutionSupported); + /* Log before creating distributed execution */ + elog(DEBUG1, "Creating distributed execution for task list"); - /* - * If current transaction accessed local placements and task list includes - * tasks that should be executed locally (accessing any of the local placements), - * then we should error out as it would cause inconsistencies across the - * remote connection and local execution. - */ + DistributedExecution *execution = CreateDistributedExecution( + executionParams->modLevel, executionParams->taskList, + executionParams->paramListInfo, executionParams->targetPoolSize, + defaultTupleDest, &executionParams->xactProperties, + executionParams->jobIdList, executionParams->localExecutionSupported); + + /* Log details of created execution */ + elog(DEBUG1, "DistributedExecution created: %d tasks, local execution supported: %d", + list_length(execution->remoteTaskList), execution->localExecutionSupported); + + /* Ensure local execution state is compatible */ EnsureCompatibleLocalExecutionState(execution->remoteTaskList); - /* run the remote execution */ + /* Log before running distributed execution */ + elog(DEBUG1, "Starting distributed execution"); + + /* Start the distributed execution */ StartDistributedExecution(execution); + + /* Log after StartDistributedExecution */ + elog(DEBUG1, "StartDistributedExecution completed"); + + /* Run the distributed execution */ RunDistributedExecution(execution); + + /* Log after RunDistributedExecution */ + elog(DEBUG1, "RunDistributedExecution completed"); + + /* Finish the distributed execution */ FinishDistributedExecution(execution); - /* now, switch back to the local execution */ + /* Log after FinishDistributedExecution */ + elog(DEBUG1, "FinishDistributedExecution completed"); + + /* Log that the entire distributed execution process is complete */ +elog(DEBUG1, "Distributed execution completed"); + + + /* Log before running local execution */ if (executionParams->isUtilityCommand) { + elog(DEBUG1, "Running local utility task list"); locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList); } else { - locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, - defaultTupleDest); + elog(DEBUG1, "Running local task list"); + locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, defaultTupleDest); } + elog(DEBUG1, "Task execution completed, total rows processed: %lu", execution->rowsProcessed + locallyProcessedRows); + return execution->rowsProcessed + locallyProcessedRows; } @@ -1905,10 +2018,17 @@ SequentialRunDistributedExecution(DistributedExecution *execution) void RunDistributedExecution(DistributedExecution *execution) { + elog(DEBUG1, "Starting RunDistributedExecution with %d unfinished tasks", execution->unfinishedTaskCount); + + /* Assign tasks to connections or worker pool */ AssignTasksToConnectionsOrWorkerPool(execution); + elog(DEBUG1, "Assigned tasks to connections or worker pool"); PG_TRY(); { + /* Log before stepping state machines */ + elog(DEBUG1, "Stepping state machines for all sessions"); + /* Preemptively step state machines in case of immediate errors */ WorkerSession *session = NULL; foreach_ptr(session, execution->sessionList) @@ -1921,23 +2041,9 @@ RunDistributedExecution(DistributedExecution *execution) /* always (re)build the wait event set the first time */ execution->rebuildWaitEventSet = true; - /* - * Iterate until all the tasks are finished. Once all the tasks - * are finished, ensure that all the connection initializations - * are also finished. Otherwise, those connections are terminated - * abruptly before they are established (or failed). Instead, we let - * the ConnectionStateMachine() to properly handle them. - * - * Note that we could have the connections that are not established - * as a side effect of slow-start algorithm. At the time the algorithm - * decides to establish new connections, the execution might have tasks - * to finish. But, the execution might finish before the new connections - * are established. - * - * Note that the rules explained above could be overriden by any - * cancellation to the query. In that case, we terminate the execution - * irrespective of the current status of the tasks or the connections. - */ + elog(DEBUG1, "Entering task/event loop with unfinishedTaskCount: %d", execution->unfinishedTaskCount); + + /* Iterate until all the tasks are finished */ while (!cancellationReceived && (execution->unfinishedTaskCount > 0 || HasIncompleteConnectionEstablishment(execution))) @@ -1951,14 +2057,13 @@ RunDistributedExecution(DistributedExecution *execution) bool skipWaitEvents = false; if (execution->remoteTaskList == NIL) { - /* - * All the tasks are failed over to the local execution, no need - * to wait for any connection activity. - */ + /* Log when all tasks have failed over to local execution */ + elog(DEBUG1, "All tasks failed over to local execution"); continue; } else if (execution->rebuildWaitEventSet) { + elog(DEBUG1, "Rebuilding wait event set"); RebuildWaitEventSet(execution); skipWaitEvents = @@ -1966,6 +2071,7 @@ RunDistributedExecution(DistributedExecution *execution) } else if (execution->waitFlagsChanged) { + elog(DEBUG1, "Rebuilding wait event set flags"); RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList); execution->waitFlagsChanged = false; @@ -1975,35 +2081,35 @@ RunDistributedExecution(DistributedExecution *execution) if (skipWaitEvents) { - /* - * Some operation on the wait event set is failed, retry - * as we already removed the problematic connections. - */ + elog(DEBUG1, "Skipping wait events due to failure, retrying"); execution->rebuildWaitEventSet = true; continue; } - /* wait for I/O events */ + /* Log before waiting for I/O events */ long timeout = NextEventTimeout(execution); + int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, execution->events, execution->eventSetSize, WAIT_EVENT_CLIENT_READ); + ProcessWaitEvents(execution, execution->events, eventCount, &cancellationReceived); } - FreeExecutionWaitEvents(execution); + elog(DEBUG1, "Finished task/event loop, cleaning up"); + /* Clean up after distributed execution */ + FreeExecutionWaitEvents(execution); CleanUpSessions(execution); } PG_CATCH(); { - /* - * We can still recover from error using ROLLBACK TO SAVEPOINT, - * unclaim all connections to allow that. - */ + /* Log in case of errors */ + elog(DEBUG1, "Error occurred, unclaiming all session connections"); + UnclaimAllSessionConnections(execution->sessionList); FreeExecutionWaitEvents(execution); @@ -2014,6 +2120,7 @@ RunDistributedExecution(DistributedExecution *execution) } + /* * ProcessSessionsWithFailedWaitEventSetOperations goes over the session list * and processes sessions with failed wait event set operations. @@ -3905,10 +4012,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, } -/* - * SendNextQuery sends the next query for placementExecution on the given - * session. - */ static bool SendNextQuery(TaskPlacementExecution *placementExecution, WorkerSession *session) @@ -3924,26 +4027,62 @@ SendNextQuery(TaskPlacementExecution *placementExecution, int querySent = 0; uint32 queryIndex = placementExecution->queryIndex; + elog(DEBUG1, "Sending next query: queryIndex = %d, task->queryCount = %d", queryIndex, task->queryCount); + Assert(queryIndex < task->queryCount); char *queryString = TaskQueryStringAtIndex(task, queryIndex); + elog(DEBUG1, "Query to be sent: %s", queryString); + + if (paramListInfo != NULL && !task->parametersInQueryStringResolved) + { + elog(DEBUG1, "ParamListInfo is not null and parameters not resolved"); + } + else if (paramListInfo == NULL) + { + elog(DEBUG1, "ParamListInfo is NULL"); + } + else + { + elog(DEBUG1, "Task parameters are already resolved"); + } + + if (paramListInfo != NULL && !task->parametersInQueryStringResolved) { int parameterCount = paramListInfo->numParams; Oid *parameterTypes = NULL; const char **parameterValues = NULL; + elog(DEBUG1, "ParamListInfo is not null, parameterCount: %d", parameterCount); + /* force evaluation of bound params */ paramListInfo = copyParamList(paramListInfo); + /* Log the start of parameter extraction */ + elog(DEBUG1, "Extracting parameters for remote execution"); + ExtractParametersForRemoteExecution(paramListInfo, ¶meterTypes, ¶meterValues); + + /* Log extracted parameter values and types */ + for (int i = 0; i < parameterCount; i++) + { + elog(DEBUG1, "Parameter %d: Type = %d, Value = %s", i + 1, parameterTypes[i], parameterValues[i]); + } + + /* Send the remote command with parameters */ querySent = SendRemoteCommandParams(connection, queryString, parameterCount, parameterTypes, parameterValues, binaryResults); + + elog(DEBUG1, "Query sent with parameters, result = %d", querySent); } else { + /* If no parameters, send the query without params */ + elog(DEBUG1, "Sending query without parameters"); + /* * We only need to use SendRemoteCommandParams when we desire * binaryResults. One downside of SendRemoteCommandParams is that it diff --git a/src/backend/distributed/executor/executor_util_params.c b/src/backend/distributed/executor/executor_util_params.c index 975654f22..2c3c0cb53 100644 --- a/src/backend/distributed/executor/executor_util_params.c +++ b/src/backend/distributed/executor/executor_util_params.c @@ -45,6 +45,8 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo, { int parameterCount = paramListInfo->numParams; + elog(DEBUG1, "Extracting %d parameters from ParamListInfo", parameterCount); + *parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid)); *parameterValues = (const char **) palloc0(parameterCount * sizeof(char *)); @@ -55,49 +57,48 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo, Oid typeOutputFunctionId = InvalidOid; bool variableLengthType = false; + /* Log parameter type */ + elog(DEBUG1, "Processing parameter %d, type: %d", parameterIndex + 1, parameterData->ptype); + /* * Use 0 for data types where the oid values can be different on - * the coordinator and worker nodes. Therefore, the worker nodes can - * infer the correct oid. + * the coordinator and worker nodes. */ if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids) { (*parameterTypes)[parameterIndex] = 0; + elog(DEBUG1, "Using default OID (0) for parameter %d", parameterIndex + 1); } else { (*parameterTypes)[parameterIndex] = parameterData->ptype; } - /* - * If the parameter is not referenced / used (ptype == 0) and - * would otherwise have errored out inside standard_planner()), - * don't pass a value to the remote side, and pass text oid to prevent - * undetermined data type errors on workers. - */ + /* Handle unreferenced parameter */ if (parameterData->ptype == 0) { (*parameterValues)[parameterIndex] = NULL; (*parameterTypes)[parameterIndex] = TEXTOID; + elog(DEBUG1, "Parameter %d has ptype 0, setting TEXTOID", parameterIndex + 1); continue; } - /* - * If the parameter is NULL then we preserve its type, but - * don't need to evaluate its value. - */ + /* Handle NULL parameter */ if (parameterData->isnull) { (*parameterValues)[parameterIndex] = NULL; - + elog(DEBUG1, "Parameter %d is NULL", parameterIndex + 1); continue; } - getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, - &variableLengthType); + /* Log the type output function */ + getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, &variableLengthType); + elog(DEBUG1, "Type output function ID for parameter %d: %u", parameterIndex + 1, typeOutputFunctionId); - (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, - parameterData->value); + /* Log the parameter value */ + (*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, parameterData->value); + elog(DEBUG1, "Parameter %d value after output function call: %s", parameterIndex + 1, (*parameterValues)[parameterIndex]); } } + diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bedaa643e..c2a20bd52 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -108,6 +108,7 @@ #include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" +#include "miscadmin.h" // for elog functions /* controlled via a GUC */ bool EnableLocalExecution = true; @@ -211,6 +212,39 @@ ExecuteLocalTaskListExtended(List *taskList, TupleDestination *defaultTupleDest, bool isUtilityCommand) { + /* Log the size of the task list */ + elog(DEBUG1, "Executing local task list, number of tasks: %d", list_length(taskList)); + + /* Log the distributed plan type */ + if (distributedPlan != NULL) + { + elog(DEBUG1, "Distributed plan type: %d", nodeTag(distributedPlan)); + } + else + { + elog(DEBUG1, "Distributed plan is NULL"); + } + + /* Log if the command is a utility command */ + elog(DEBUG1, "Is Utility Command: %d", isUtilityCommand); + + /* Log the parameters */ + if (orig_paramListInfo != NULL) + { + elog(DEBUG1, "Original ParamListInfo has %d params", orig_paramListInfo->numParams); + + /* Optionally log details of each parameter */ + for (int i = 0; i < orig_paramListInfo->numParams; i++) + { + ParamExternData param = orig_paramListInfo->params[i]; + elog(DEBUG1, "Param %d has type OID %d", i + 1, param.ptype); + } + } + else + { + elog(DEBUG1, "Original ParamListInfo is NULL"); + } + ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); uint64 totalRowsProcessed = 0; int numParams = 0; diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 4a79dc25a..711bc414c 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -514,17 +514,32 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure, CitusTableCacheEntry *cacheEntry, PlannedStmt *plan) { + /* Log distribution argument index and argument list size */ + elog(DEBUG1, "Distribution Argument Index: %d, Argument List Length: %d", + procedure->distributionArgIndex, list_length(argumentList)); + if (procedure->distributionArgIndex < 0 || procedure->distributionArgIndex >= list_length(argumentList)) { - ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index"))); + /* Add more detailed log for invalid distribution argument index */ + ereport(DEBUG1, (errmsg("Invalid distribution argument index: %d", + procedure->distributionArgIndex))); return NULL; } - Node *partitionValueNode = (Node *) list_nth(argumentList, - procedure->distributionArgIndex); + /* Get the partition value node */ + Node *partitionValueNode = (Node *) list_nth(argumentList, procedure->distributionArgIndex); + + /* Log the partition value node before stripping implicit coercions */ + elog(DEBUG1, "Partition Value Node before stripping: %s", nodeToString(partitionValueNode)); + + /* Strip implicit coercions */ partitionValueNode = strip_implicit_coercions(partitionValueNode); + /* Log the partition value node after stripping implicit coercions */ + elog(DEBUG1, "Partition Value Node after stripping: %s", nodeToString(partitionValueNode)); + + if (IsA(partitionValueNode, Param)) { Param *partitionParam = (Param *) partitionValueNode; diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 6c09e0b38..b02d58a51 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -826,6 +826,21 @@ class Postgres(QueryRunner): pgconf.write("max_worker_processes = 50\n") pgconf.write("max_replication_slots = 50\n") + pgconf.write("log_min_messages = DEBUG1\n") + pgconf.write("citus.work_min_messages = DEBUG1\n") + + # Enable logging of all SQL statements + # pgconf.write("log_statement = 'all'\n") + + # # Enable logging of query execution time (duration) + # pgconf.write("log_duration = on\n") + + # # Enable logging of statement execution statistics + # pgconf.write("log_statement_stats = on\n") + + # # Optionally, log queries that take 0 ms (this logs every query) + # pgconf.write("log_min_duration_statement = 0\n") + # We need to make the log go to stderr so that the tests can # check what is being logged. This should be the default, but # some packagings change the default configuration. diff --git a/src/test/regress/citus_tests/test/test_prepared_statements.py b/src/test/regress/citus_tests/test/test_prepared_statements.py index 1beea8038..e67f2982a 100644 --- a/src/test/regress/citus_tests/test/test_prepared_statements.py +++ b/src/test/regress/citus_tests/test/test_prepared_statements.py @@ -36,10 +36,7 @@ def test_call_param2(cluster): # Get the coordinator node from the Citus cluster coord = cluster.coordinator - # Step 1: Create a distributed table `t` coord.sql("CREATE TABLE t (p int, i int)") - - # Step 2: Create a stored procedure `f` similar to the one in the C# code coord.sql( """ CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS $$ @@ -49,20 +46,15 @@ def test_call_param2(cluster): END; $$ """ ) - - # Step 3: Insert data and call the procedure, simulating parameterized queries - sql_insert_and_call = "CALL f(1, %s);" - - # Step 4: Distribute the table coord.sql("SELECT create_distributed_table('t', 'p')") - - # Step 5: Distribute the procedure coord.sql( "SELECT create_distributed_function('f(int, int)', distribution_arg_name := '_p', colocate_with := 't')" ) - # time.sleep(10) - cluster.coordinator.psql_debug() + sql_insert_and_call = "CALL f(1, %s);" + + # cluster.coordinator.psql_debug() + # cluster.debug() # After distributing the table, insert more data and call the procedure again coord.sql_prepared(sql_insert_and_call, (2,)) @@ -70,4 +62,4 @@ def test_call_param2(cluster): # Step 6: Check the result sum_i = coord.sql_value("SELECT count(*) FROM t;") - assert sum_i == 0 \ No newline at end of file + assert sum_i == 1 \ No newline at end of file