From 003c77593b2e3366bec0cd12e53d16a920e3e2ce Mon Sep 17 00:00:00 2001 From: Mehmet YILMAZ Date: Mon, 9 Sep 2024 14:10:35 +0000 Subject: [PATCH] more debug line added --- src/backend/distributed/commands/call.c | 128 +++++++++++++++++- .../test/test_prepared_statements.py | 15 +- 2 files changed, 133 insertions(+), 10 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 45dd059fc..ed63b3a5b 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -89,6 +89,33 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) elog(DEBUG1, "No arguments in the function expression"); } + ListCell *argCell1; + int argIndex1 = 0; + + /* Iterate over the arguments and log them */ + foreach(argCell1, callStmt->funcexpr->args) + { + Node *argNode = (Node *) lfirst(argCell1); + + // Check if the node is valid + if (argNode != NULL) + { + // Use nodeToString() to convert the node into a string representation for debugging + char *argStr = nodeToString(argNode); + + // Log the argument index and its string representation + elog(DEBUG1, "Argument %d: %s", argIndex1, argStr); + + // Free the string memory after logging (it's a good practice to avoid memory leaks) + pfree(argStr); + } + else + { + elog(DEBUG1, "Argument %d: (null)", argIndex1); + } + argIndex1++; + } + DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); @@ -236,7 +263,30 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) ROW_MODIFY_NONE, list_make1(task), MaxAdaptiveExecutorPoolSize, localExecutionSupported ); - + + const char* NodeTagToString(NodeTag tag) + { + switch (tag) + { + case T_Var: return "Var"; + case T_Const: return "Const"; + case T_Param: return "Param"; + case T_FuncExpr: return "FuncExpr"; + case T_OpExpr: return "OpExpr"; + case T_BoolExpr: return "BoolExpr"; + case T_Aggref: return "Aggref"; + case T_WindowFunc: return "WindowFunc"; + case T_SubLink: return "SubLink"; + case T_CoalesceExpr: return "CoalesceExpr"; + case T_CaseExpr: return "CaseExpr"; + case T_NullTest: return "NullTest"; + case T_CollateExpr: return "CollateExpr"; + case T_FieldSelect: return "FieldSelect"; + case T_FieldStore: return "FieldStore"; + case T_SubPlan: return "SubPlan"; + default: return "Unknown"; + } + } // Create a ParamListInfo structure List *argList = funcExpr->args; // Extract arguments from the function expression @@ -250,12 +300,22 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) { Node *argNode = (Node *) lfirst(argCell); + // Log the type of the argument + NodeTag nodeType = nodeTag(argNode); + elog(DEBUG1, "Processing argument at index %d of type: %s", paramIndex, NodeTagToString(nodeType)); + + if (IsA(argNode, Const)) { Const *constArg = (Const *) argNode; paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null + + // Log the constant parameter's type, value, and null status + elog(DEBUG1, "Populating ParamListInfo with constant parameter: paramIndex: %d, paramType: %d, isNull: %s", + paramIndex, paramListInfo->params[paramIndex].ptype, + constArg->constisnull ? "true" : "false"); } else if (IsA(argNode, Param)) { @@ -270,6 +330,11 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) ParamExternData paramData; paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData); + // Log the fetched parameter details + elog(DEBUG1, "paramFetch for paramId: %d returned value: %d, type: %d, isNull: %s", + paramArg->paramid, DatumGetInt32(paramData.value), paramData.ptype, + paramData.isnull ? "true" : "false"); + paramListInfo->params[paramIndex].value = paramData.value; paramListInfo->params[paramIndex].isnull = paramData.isnull; @@ -280,13 +345,68 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) else { // Handle the case where paramFetch is NULL - elog(ERROR, "Could not fetch value for parameter: %d", paramArg->paramid); + elog(DEBUG1, "Could not fetch value for parameter: %d", paramArg->paramid); } } + else if (IsA(argNode, FuncExpr)) + { + // FuncExpr *funcExpr = (FuncExpr *) argNode; + + // Log function expression details + elog(DEBUG1, "Processing function expression: funcid: %d", funcExpr->funcid); + + // Iterate through the arguments of the function expression + ListCell *funcArgCell; + foreach(funcArgCell, funcExpr->args) + { + Node *funcArgNode = (Node *) lfirst(funcArgCell); + + // Check if the argument is a Param or Const + if (IsA(funcArgNode, Param)) + { + Param *paramArg = (Param *) funcArgNode; + + // Fetch the parameter value (same as your param-fetch logic) + ParamExternData paramData; + paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData); + + // Populate ParamListInfo with fetched param + paramListInfo->params[paramIndex].ptype = paramArg->paramtype; + paramListInfo->params[paramIndex].value = paramData.value; + paramListInfo->params[paramIndex].isnull = paramData.isnull; + + // Log fetched parameter details + elog(DEBUG1, "Populating ParamListInfo with fetched parameter: paramIndex: %d, paramType: %d, paramValue: %d", + paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); + } + else if (IsA(funcArgNode, Const)) + { + Const *constArg = (Const *) funcArgNode; + + // Handle Const values within the function expression + paramListInfo->params[paramIndex].ptype = constArg->consttype; + paramListInfo->params[paramIndex].value = constArg->constvalue; + paramListInfo->params[paramIndex].isnull = constArg->constisnull; + + // Log constant parameter + elog(DEBUG1, "Populating ParamListInfo with constant parameter inside function expression: paramIndex: %d, paramType: %d", + paramIndex, paramListInfo->params[paramIndex].ptype); + } + else + { + elog(DEBUG1, "Unsupported argument type in function expression at paramIndex: %d", paramIndex); + } + } + } + else + { + // Handle other cases if necessary + elog(DEBUG1, "Unsupported argument type at paramIndex: %d", paramIndex); + } // Log populated parameters - elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d", - paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); + // elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d", + // paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value)); paramIndex++; } diff --git a/src/test/regress/citus_tests/test/test_prepared_statements.py b/src/test/regress/citus_tests/test/test_prepared_statements.py index e67f2982a..73b286191 100644 --- a/src/test/regress/citus_tests/test/test_prepared_statements.py +++ b/src/test/regress/citus_tests/test/test_prepared_statements.py @@ -36,30 +36,33 @@ def test_call_param2(cluster): # Get the coordinator node from the Citus cluster coord = cluster.coordinator + coord.sql("DROP TABLE IF EXISTS t CASCADE") coord.sql("CREATE TABLE t (p int, i int)") + coord.sql("SELECT create_distributed_table('t', 'p')") coord.sql( """ CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS $$ BEGIN - -- Example logic that uses the parameters (you can add your own logic) INSERT INTO t (p, i) VALUES (_p, _i); + PERFORM _i; END; $$ """ ) - coord.sql("SELECT create_distributed_table('t', 'p')") coord.sql( "SELECT create_distributed_function('f(int, int)', distribution_arg_name := '_p', colocate_with := 't')" ) - sql_insert_and_call = "CALL f(1, %s);" + sql_insert_and_call = "CALL f(1, 33);" + # sql_insert_and_call = "CALL f(%s, 1);" # cluster.coordinator.psql_debug() # cluster.debug() # After distributing the table, insert more data and call the procedure again - coord.sql_prepared(sql_insert_and_call, (2,)) + # coord.sql_prepared(sql_insert_and_call, (33,)) + coord.sql(sql_insert_and_call) # Step 6: Check the result - sum_i = coord.sql_value("SELECT count(*) FROM t;") + sum_i = coord.sql_value("SELECT i FROM t limit 1;") - assert sum_i == 1 \ No newline at end of file + assert sum_i == 33 \ No newline at end of file