debugs line added

m3hm3t/dist_func_parameter
Mehmet YILMAZ 2024-09-09 07:34:46 +00:00
parent 036d47565f
commit 31003204c6
9 changed files with 400 additions and 94 deletions

1
citus-tools Submodule

@ -0,0 +1 @@
Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf

View File

@ -48,6 +48,7 @@
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_manager.h"
#include "miscadmin.h" // for elog functions
/* global variable tracking whether we are in a delegated procedure call */
@ -63,6 +64,32 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
FuncExpr *funcExpr = callStmt->funcexpr;
Oid functionId = funcExpr->funcid;
/* Log the function ID being called */
elog(DEBUG1, "Calling distributed procedure with functionId: %u", functionId);
/* Log additional details from CallStmt */
if (callStmt->funcexpr != NULL)
{
elog(DEBUG1, "Function expression type: %d", nodeTag(callStmt->funcexpr));
}
if (funcExpr->args != NIL)
{
ListCell *lc;
int argIndex = 0;
foreach(lc, funcExpr->args)
{
Node *arg = (Node *) lfirst(lc);
elog(DEBUG1, "Argument %d: NodeTag: %d", argIndex, nodeTag(arg));
argIndex++;
}
}
else
{
elog(DEBUG1, "No arguments in the function expression");
}
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
functionId, 0);
if (procedure == NULL || !procedure->isDistributed)
@ -167,6 +194,7 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) callStmt));
{
elog(DEBUG1, "Generated CALL statement: %s", callCommand->data);
Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem);
TupleDesc tupleDesc = CallStmtResultDesc(callStmt);
TupleTableSlot *slot = MakeSingleTupleTableSlot(tupleDesc,
@ -195,19 +223,93 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
};
EnableWorkerMessagePropagation();
elog(DEBUG1, "Worker message propagation enabled");
bool localExecutionSupported = true;
elog(DEBUG1, "Local execution supported: %d", localExecutionSupported);
/* Log task details */
elog(DEBUG1, "Creating execution params for task");
/* Create execution parameters */
ExecutionParams *executionParams = CreateBasicExecutionParams(
ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize,
localExecutionSupported
);
executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore,
tupleDesc);
);
// Create a ParamListInfo structure
List *argList = funcExpr->args; // Extract arguments from the function expression
int paramCount = list_length(argList); // Get the number of arguments
ParamListInfo paramListInfo = makeParamList(paramCount); // Create ParamListInfo structure
// Loop through the argument list and populate ParamListInfo
int paramIndex = 0;
ListCell *argCell;
foreach(argCell, argList)
{
Node *argNode = (Node *) lfirst(argCell);
if (IsA(argNode, Const))
{
Const *constArg = (Const *) argNode;
paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type
paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value
paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null
}
else if (IsA(argNode, Param))
{
Param *paramArg = (Param *) argNode;
// Set the parameter type
paramListInfo->params[paramIndex].ptype = paramArg->paramtype;
// Fetch the value of the parameter if necessary
if (paramListInfo->paramFetch != NULL)
{
ParamExternData paramData;
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, &paramData);
paramListInfo->params[paramIndex].value = paramData.value;
paramListInfo->params[paramIndex].isnull = paramData.isnull;
// Log fetched value and type
elog(DEBUG1, "Fetched dynamic parameter: paramIndex: %d, paramType: %d, paramValue: %d",
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
}
else
{
// Handle the case where paramFetch is NULL
elog(ERROR, "Could not fetch value for parameter: %d", paramArg->paramid);
}
}
// Log populated parameters
elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d",
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
paramIndex++;
}
/* Set tuple destination and execution properties */
executionParams->tupleDestination = CreateTupleStoreTupleDest(tupleStore, tupleDesc);
executionParams->expectResults = expectResults;
executionParams->xactProperties = xactProperties;
executionParams->isUtilityCommand = true;
executionParams->paramListInfo = paramListInfo;
/* Log before executing task list */
elog(DEBUG1, "Executing task list with ExecuteTaskListExtended");
/* Execute the task list */
ExecuteTaskListExtended(executionParams);
/* Log after task execution */
elog(DEBUG1, "Task list execution completed");
DisableWorkerMessagePropagation();
while (tuplestore_gettupleslot(tupleStore, true, false, slot))

View File

@ -273,6 +273,13 @@ citus_ProcessUtility(PlannedStmt *pstmt,
{
CallStmt *callStmt = (CallStmt *) parsetree;
/* Log the entire parsetree as a string */
elog(DEBUG1, "Parse Tree: %s", nodeToString(parsetree));
/* Log other information as before */
elog(DEBUG1, "Processing CallStmt for procedure");
elog(DEBUG1, "Procedure context: %d", context);
/*
* If the procedure is distributed and we are using MX then we have the
* possibility of calling it on the worker. If the data is located on

View File

@ -770,16 +770,51 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
/*
* AdaptiveExecutor is called via CitusExecScan on the
* first call of CitusExecScan. The function fills the tupleStore
* of the input scanScate.
* of the input scanState.
*/
TupleTableSlot *
AdaptiveExecutor(CitusScanState *scanState)
{
/* Log entry into the function */
elog(DEBUG1, "Entering AdaptiveExecutor");
TupleTableSlot *resultSlot = NULL;
DistributedPlan *distributedPlan = scanState->distributedPlan;
EState *executorState = ScanStateGetExecutorState(scanState);
ParamListInfo paramListInfo = executorState->es_param_list_info;
/* Log details about the distributed plan */
if (distributedPlan != NULL)
{
elog(DEBUG1, "Distributed plan modLevel: %d", distributedPlan->modLevel);
}
else
{
elog(DEBUG1, "Distributed plan is NULL");
}
/* Log details about paramListInfo */
if (paramListInfo != NULL)
{
elog(DEBUG1, "paramListInfo is populated with %d parameters", paramListInfo->numParams);
/* Log each parameter */
for (int i = 0; i < paramListInfo->numParams; i++)
{
ParamExternData *param = &paramListInfo->params[i];
elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d",
i + 1, param->ptype, param->isnull);
if (!param->isnull)
{
elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value);
}
}
}
else
{
elog(DEBUG1, "paramListInfo is NULL");
}
bool randomAccess = true;
bool interTransactions = false;
int targetPoolSize = MaxAdaptiveExecutorPoolSize;
@ -847,16 +882,53 @@ AdaptiveExecutor(CitusScanState *scanState)
distributedPlan->modLevel, taskList, excludeFromXact);
/*
* In some rare cases, we have prepared statements that pass a parameter
* and never used in the query, mark such parameters' type as Invalid(0),
* which will be used later in ExtractParametersFromParamList() to map them
* to a generic datatype. Skip for dynamic parameters.
*/
* In some rare cases, we have prepared statements that pass a parameter
* and never used in the query, mark such parameters' type as Invalid(0),
* which will be used later in ExtractParametersFromParamList() to map them
* to a generic datatype. Skip for dynamic parameters.
*/
if (paramListInfo && !paramListInfo->paramFetch)
{
/* Copy the ParamListInfo for safety */
paramListInfo = copyParamList(paramListInfo);
/* Mark any unreferenced parameters */
MarkUnreferencedExternParams((Node *) job->jobQuery, paramListInfo);
/* Debug: Log the parameters before proceeding */
elog(DEBUG1, "paramListInfo: number of parameters = %d", paramListInfo->numParams);
/* Log details of each parameter */
for (int i = 0; i < paramListInfo->numParams; i++)
{
ParamExternData *param = &paramListInfo->params[i];
elog(DEBUG1, "Parameter %d: ptype = %d, isnull = %d",
i + 1, param->ptype, param->isnull);
if (!param->isnull)
{
elog(DEBUG1, "Parameter %d: value = %ld", i + 1, param->value);
}
}
}
else
{
/* If paramListInfo is NULL or paramFetch is true, log that information */
if (paramListInfo == NULL)
{
elog(DEBUG1, "paramListInfo is NULL");
}
else if (paramListInfo->paramFetch)
{
elog(DEBUG1, "paramListInfo uses dynamic parameter fetching");
}
}
/* Log the parameters before creating the distributed execution */
elog(DEBUG1, "Creating DistributedExecution with modLevel: %d, taskList size: %d",
distributedPlan->modLevel, list_length(taskList));
elog(DEBUG1, "Target pool size: %d, Local execution supported: %d",
targetPoolSize, localExecutionSupported);
DistributedExecution *execution = CreateDistributedExecution(
distributedPlan->modLevel,
@ -868,6 +940,9 @@ AdaptiveExecutor(CitusScanState *scanState)
jobIdList,
localExecutionSupported);
/*
* Make sure that we acquire the appropriate locks even if the local tasks
* are going to be executed with local execution.
@ -918,6 +993,13 @@ AdaptiveExecutor(CitusScanState *scanState)
static void
RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution)
{
/* Log entry into the function */
elog(DEBUG1, "Entering RunLocalExecution");
/* Log scan state and execution details */
elog(DEBUG1, "ScanState: distributedPlan type: %d", nodeTag(scanState->distributedPlan));
elog(DEBUG1, "Local task list length: %d", list_length(execution->localTaskList));
EState *estate = ScanStateGetExecutorState(scanState);
bool isUtilityCommand = false;
uint64 rowsProcessed = ExecuteLocalTaskListExtended(execution->localTaskList,
@ -1086,52 +1168,83 @@ ExecuteTaskListIntoTupleDest(RowModifyLevel modLevel, List *taskList,
uint64
ExecuteTaskListExtended(ExecutionParams *executionParams)
{
/* Log task list length */
elog(DEBUG1, "Starting ExecuteTaskListExtended, number of tasks: %d", list_length(executionParams->taskList));
/* if there are no tasks to execute, we can return early */
if (list_length(executionParams->taskList) == 0)
{
elog(DEBUG1, "No tasks to execute, returning early");
return 0;
}
uint64 locallyProcessedRows = 0;
TupleDestination *defaultTupleDest = executionParams->tupleDestination;
/* Log connection type */
elog(DEBUG1, "MultiShardConnectionType: %d", MultiShardConnectionType);
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
elog(DEBUG1, "Switching to sequential connection mode");
executionParams->targetPoolSize = 1;
}
DistributedExecution *execution =
CreateDistributedExecution(
executionParams->modLevel, executionParams->taskList,
executionParams->paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
/* Log before creating distributed execution */
elog(DEBUG1, "Creating distributed execution for task list");
/*
* If current transaction accessed local placements and task list includes
* tasks that should be executed locally (accessing any of the local placements),
* then we should error out as it would cause inconsistencies across the
* remote connection and local execution.
*/
DistributedExecution *execution = CreateDistributedExecution(
executionParams->modLevel, executionParams->taskList,
executionParams->paramListInfo, executionParams->targetPoolSize,
defaultTupleDest, &executionParams->xactProperties,
executionParams->jobIdList, executionParams->localExecutionSupported);
/* Log details of created execution */
elog(DEBUG1, "DistributedExecution created: %d tasks, local execution supported: %d",
list_length(execution->remoteTaskList), execution->localExecutionSupported);
/* Ensure local execution state is compatible */
EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
/* run the remote execution */
/* Log before running distributed execution */
elog(DEBUG1, "Starting distributed execution");
/* Start the distributed execution */
StartDistributedExecution(execution);
/* Log after StartDistributedExecution */
elog(DEBUG1, "StartDistributedExecution completed");
/* Run the distributed execution */
RunDistributedExecution(execution);
/* Log after RunDistributedExecution */
elog(DEBUG1, "RunDistributedExecution completed");
/* Finish the distributed execution */
FinishDistributedExecution(execution);
/* now, switch back to the local execution */
/* Log after FinishDistributedExecution */
elog(DEBUG1, "FinishDistributedExecution completed");
/* Log that the entire distributed execution process is complete */
elog(DEBUG1, "Distributed execution completed");
/* Log before running local execution */
if (executionParams->isUtilityCommand)
{
elog(DEBUG1, "Running local utility task list");
locallyProcessedRows += ExecuteLocalUtilityTaskList(execution->localTaskList);
}
else
{
locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList,
defaultTupleDest);
elog(DEBUG1, "Running local task list");
locallyProcessedRows += ExecuteLocalTaskList(execution->localTaskList, defaultTupleDest);
}
elog(DEBUG1, "Task execution completed, total rows processed: %lu", execution->rowsProcessed + locallyProcessedRows);
return execution->rowsProcessed + locallyProcessedRows;
}
@ -1905,10 +2018,17 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
void
RunDistributedExecution(DistributedExecution *execution)
{
elog(DEBUG1, "Starting RunDistributedExecution with %d unfinished tasks", execution->unfinishedTaskCount);
/* Assign tasks to connections or worker pool */
AssignTasksToConnectionsOrWorkerPool(execution);
elog(DEBUG1, "Assigned tasks to connections or worker pool");
PG_TRY();
{
/* Log before stepping state machines */
elog(DEBUG1, "Stepping state machines for all sessions");
/* Preemptively step state machines in case of immediate errors */
WorkerSession *session = NULL;
foreach_ptr(session, execution->sessionList)
@ -1921,23 +2041,9 @@ RunDistributedExecution(DistributedExecution *execution)
/* always (re)build the wait event set the first time */
execution->rebuildWaitEventSet = true;
/*
* Iterate until all the tasks are finished. Once all the tasks
* are finished, ensure that all the connection initializations
* are also finished. Otherwise, those connections are terminated
* abruptly before they are established (or failed). Instead, we let
* the ConnectionStateMachine() to properly handle them.
*
* Note that we could have the connections that are not established
* as a side effect of slow-start algorithm. At the time the algorithm
* decides to establish new connections, the execution might have tasks
* to finish. But, the execution might finish before the new connections
* are established.
*
* Note that the rules explained above could be overriden by any
* cancellation to the query. In that case, we terminate the execution
* irrespective of the current status of the tasks or the connections.
*/
elog(DEBUG1, "Entering task/event loop with unfinishedTaskCount: %d", execution->unfinishedTaskCount);
/* Iterate until all the tasks are finished */
while (!cancellationReceived &&
(execution->unfinishedTaskCount > 0 ||
HasIncompleteConnectionEstablishment(execution)))
@ -1951,14 +2057,13 @@ RunDistributedExecution(DistributedExecution *execution)
bool skipWaitEvents = false;
if (execution->remoteTaskList == NIL)
{
/*
* All the tasks are failed over to the local execution, no need
* to wait for any connection activity.
*/
/* Log when all tasks have failed over to local execution */
elog(DEBUG1, "All tasks failed over to local execution");
continue;
}
else if (execution->rebuildWaitEventSet)
{
elog(DEBUG1, "Rebuilding wait event set");
RebuildWaitEventSet(execution);
skipWaitEvents =
@ -1966,6 +2071,7 @@ RunDistributedExecution(DistributedExecution *execution)
}
else if (execution->waitFlagsChanged)
{
elog(DEBUG1, "Rebuilding wait event set flags");
RebuildWaitEventSetFlags(execution->waitEventSet, execution->sessionList);
execution->waitFlagsChanged = false;
@ -1975,35 +2081,35 @@ RunDistributedExecution(DistributedExecution *execution)
if (skipWaitEvents)
{
/*
* Some operation on the wait event set is failed, retry
* as we already removed the problematic connections.
*/
elog(DEBUG1, "Skipping wait events due to failure, retrying");
execution->rebuildWaitEventSet = true;
continue;
}
/* wait for I/O events */
/* Log before waiting for I/O events */
long timeout = NextEventTimeout(execution);
int eventCount =
WaitEventSetWait(execution->waitEventSet, timeout, execution->events,
execution->eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, execution->events, eventCount,
&cancellationReceived);
}
FreeExecutionWaitEvents(execution);
elog(DEBUG1, "Finished task/event loop, cleaning up");
/* Clean up after distributed execution */
FreeExecutionWaitEvents(execution);
CleanUpSessions(execution);
}
PG_CATCH();
{
/*
* We can still recover from error using ROLLBACK TO SAVEPOINT,
* unclaim all connections to allow that.
*/
/* Log in case of errors */
elog(DEBUG1, "Error occurred, unclaiming all session connections");
UnclaimAllSessionConnections(execution->sessionList);
FreeExecutionWaitEvents(execution);
@ -2014,6 +2120,7 @@ RunDistributedExecution(DistributedExecution *execution)
}
/*
* ProcessSessionsWithFailedWaitEventSetOperations goes over the session list
* and processes sessions with failed wait event set operations.
@ -3905,10 +4012,6 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
}
/*
* SendNextQuery sends the next query for placementExecution on the given
* session.
*/
static bool
SendNextQuery(TaskPlacementExecution *placementExecution,
WorkerSession *session)
@ -3924,26 +4027,62 @@ SendNextQuery(TaskPlacementExecution *placementExecution,
int querySent = 0;
uint32 queryIndex = placementExecution->queryIndex;
elog(DEBUG1, "Sending next query: queryIndex = %d, task->queryCount = %d", queryIndex, task->queryCount);
Assert(queryIndex < task->queryCount);
char *queryString = TaskQueryStringAtIndex(task, queryIndex);
elog(DEBUG1, "Query to be sent: %s", queryString);
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
{
elog(DEBUG1, "ParamListInfo is not null and parameters not resolved");
}
else if (paramListInfo == NULL)
{
elog(DEBUG1, "ParamListInfo is NULL");
}
else
{
elog(DEBUG1, "Task parameters are already resolved");
}
if (paramListInfo != NULL && !task->parametersInQueryStringResolved)
{
int parameterCount = paramListInfo->numParams;
Oid *parameterTypes = NULL;
const char **parameterValues = NULL;
elog(DEBUG1, "ParamListInfo is not null, parameterCount: %d", parameterCount);
/* force evaluation of bound params */
paramListInfo = copyParamList(paramListInfo);
/* Log the start of parameter extraction */
elog(DEBUG1, "Extracting parameters for remote execution");
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
/* Log extracted parameter values and types */
for (int i = 0; i < parameterCount; i++)
{
elog(DEBUG1, "Parameter %d: Type = %d, Value = %s", i + 1, parameterTypes[i], parameterValues[i]);
}
/* Send the remote command with parameters */
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
parameterTypes, parameterValues,
binaryResults);
elog(DEBUG1, "Query sent with parameters, result = %d", querySent);
}
else
{
/* If no parameters, send the query without params */
elog(DEBUG1, "Sending query without parameters");
/*
* We only need to use SendRemoteCommandParams when we desire
* binaryResults. One downside of SendRemoteCommandParams is that it

View File

@ -45,6 +45,8 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo,
{
int parameterCount = paramListInfo->numParams;
elog(DEBUG1, "Extracting %d parameters from ParamListInfo", parameterCount);
*parameterTypes = (Oid *) palloc0(parameterCount * sizeof(Oid));
*parameterValues = (const char **) palloc0(parameterCount * sizeof(char *));
@ -55,49 +57,48 @@ ExtractParametersFromParamList(ParamListInfo paramListInfo,
Oid typeOutputFunctionId = InvalidOid;
bool variableLengthType = false;
/* Log parameter type */
elog(DEBUG1, "Processing parameter %d, type: %d", parameterIndex + 1, parameterData->ptype);
/*
* Use 0 for data types where the oid values can be different on
* the coordinator and worker nodes. Therefore, the worker nodes can
* infer the correct oid.
* the coordinator and worker nodes.
*/
if (parameterData->ptype >= FirstNormalObjectId && !useOriginalCustomTypeOids)
{
(*parameterTypes)[parameterIndex] = 0;
elog(DEBUG1, "Using default OID (0) for parameter %d", parameterIndex + 1);
}
else
{
(*parameterTypes)[parameterIndex] = parameterData->ptype;
}
/*
* If the parameter is not referenced / used (ptype == 0) and
* would otherwise have errored out inside standard_planner()),
* don't pass a value to the remote side, and pass text oid to prevent
* undetermined data type errors on workers.
*/
/* Handle unreferenced parameter */
if (parameterData->ptype == 0)
{
(*parameterValues)[parameterIndex] = NULL;
(*parameterTypes)[parameterIndex] = TEXTOID;
elog(DEBUG1, "Parameter %d has ptype 0, setting TEXTOID", parameterIndex + 1);
continue;
}
/*
* If the parameter is NULL then we preserve its type, but
* don't need to evaluate its value.
*/
/* Handle NULL parameter */
if (parameterData->isnull)
{
(*parameterValues)[parameterIndex] = NULL;
elog(DEBUG1, "Parameter %d is NULL", parameterIndex + 1);
continue;
}
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId,
&variableLengthType);
/* Log the type output function */
getTypeOutputInfo(parameterData->ptype, &typeOutputFunctionId, &variableLengthType);
elog(DEBUG1, "Type output function ID for parameter %d: %u", parameterIndex + 1, typeOutputFunctionId);
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId,
parameterData->value);
/* Log the parameter value */
(*parameterValues)[parameterIndex] = OidOutputFunctionCall(typeOutputFunctionId, parameterData->value);
elog(DEBUG1, "Parameter %d value after output function call: %s", parameterIndex + 1, (*parameterValues)[parameterIndex]);
}
}

View File

@ -108,6 +108,7 @@
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "miscadmin.h" // for elog functions
/* controlled via a GUC */
bool EnableLocalExecution = true;
@ -211,6 +212,39 @@ ExecuteLocalTaskListExtended(List *taskList,
TupleDestination *defaultTupleDest,
bool isUtilityCommand)
{
/* Log the size of the task list */
elog(DEBUG1, "Executing local task list, number of tasks: %d", list_length(taskList));
/* Log the distributed plan type */
if (distributedPlan != NULL)
{
elog(DEBUG1, "Distributed plan type: %d", nodeTag(distributedPlan));
}
else
{
elog(DEBUG1, "Distributed plan is NULL");
}
/* Log if the command is a utility command */
elog(DEBUG1, "Is Utility Command: %d", isUtilityCommand);
/* Log the parameters */
if (orig_paramListInfo != NULL)
{
elog(DEBUG1, "Original ParamListInfo has %d params", orig_paramListInfo->numParams);
/* Optionally log details of each parameter */
for (int i = 0; i < orig_paramListInfo->numParams; i++)
{
ParamExternData param = orig_paramListInfo->params[i];
elog(DEBUG1, "Param %d has type OID %d", i + 1, param.ptype);
}
}
else
{
elog(DEBUG1, "Original ParamListInfo is NULL");
}
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
uint64 totalRowsProcessed = 0;
int numParams = 0;

View File

@ -514,17 +514,32 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure,
CitusTableCacheEntry *cacheEntry,
PlannedStmt *plan)
{
/* Log distribution argument index and argument list size */
elog(DEBUG1, "Distribution Argument Index: %d, Argument List Length: %d",
procedure->distributionArgIndex, list_length(argumentList));
if (procedure->distributionArgIndex < 0 ||
procedure->distributionArgIndex >= list_length(argumentList))
{
ereport(DEBUG1, (errmsg("cannot push down invalid distribution_argument_index")));
/* Add more detailed log for invalid distribution argument index */
ereport(DEBUG1, (errmsg("Invalid distribution argument index: %d",
procedure->distributionArgIndex)));
return NULL;
}
Node *partitionValueNode = (Node *) list_nth(argumentList,
procedure->distributionArgIndex);
/* Get the partition value node */
Node *partitionValueNode = (Node *) list_nth(argumentList, procedure->distributionArgIndex);
/* Log the partition value node before stripping implicit coercions */
elog(DEBUG1, "Partition Value Node before stripping: %s", nodeToString(partitionValueNode));
/* Strip implicit coercions */
partitionValueNode = strip_implicit_coercions(partitionValueNode);
/* Log the partition value node after stripping implicit coercions */
elog(DEBUG1, "Partition Value Node after stripping: %s", nodeToString(partitionValueNode));
if (IsA(partitionValueNode, Param))
{
Param *partitionParam = (Param *) partitionValueNode;

View File

@ -826,6 +826,21 @@ class Postgres(QueryRunner):
pgconf.write("max_worker_processes = 50\n")
pgconf.write("max_replication_slots = 50\n")
pgconf.write("log_min_messages = DEBUG1\n")
pgconf.write("citus.work_min_messages = DEBUG1\n")
# Enable logging of all SQL statements
# pgconf.write("log_statement = 'all'\n")
# # Enable logging of query execution time (duration)
# pgconf.write("log_duration = on\n")
# # Enable logging of statement execution statistics
# pgconf.write("log_statement_stats = on\n")
# # Optionally, log queries that take 0 ms (this logs every query)
# pgconf.write("log_min_duration_statement = 0\n")
# We need to make the log go to stderr so that the tests can
# check what is being logged. This should be the default, but
# some packagings change the default configuration.

View File

@ -36,10 +36,7 @@ def test_call_param2(cluster):
# Get the coordinator node from the Citus cluster
coord = cluster.coordinator
# Step 1: Create a distributed table `t`
coord.sql("CREATE TABLE t (p int, i int)")
# Step 2: Create a stored procedure `f` similar to the one in the C# code
coord.sql(
"""
CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS $$
@ -49,20 +46,15 @@ def test_call_param2(cluster):
END; $$
"""
)
# Step 3: Insert data and call the procedure, simulating parameterized queries
sql_insert_and_call = "CALL f(1, %s);"
# Step 4: Distribute the table
coord.sql("SELECT create_distributed_table('t', 'p')")
# Step 5: Distribute the procedure
coord.sql(
"SELECT create_distributed_function('f(int, int)', distribution_arg_name := '_p', colocate_with := 't')"
)
# time.sleep(10)
cluster.coordinator.psql_debug()
sql_insert_and_call = "CALL f(1, %s);"
# cluster.coordinator.psql_debug()
# cluster.debug()
# After distributing the table, insert more data and call the procedure again
coord.sql_prepared(sql_insert_and_call, (2,))
@ -70,4 +62,4 @@ def test_call_param2(cluster):
# Step 6: Check the result
sum_i = coord.sql_value("SELECT count(*) FROM t;")
assert sum_i == 0
assert sum_i == 1