mirror of https://github.com/citusdata/citus.git
more debug line added
parent
31003204c6
commit
003c77593b
|
@ -89,6 +89,33 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
elog(DEBUG1, "No arguments in the function expression");
|
elog(DEBUG1, "No arguments in the function expression");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ListCell *argCell1;
|
||||||
|
int argIndex1 = 0;
|
||||||
|
|
||||||
|
/* Iterate over the arguments and log them */
|
||||||
|
foreach(argCell1, callStmt->funcexpr->args)
|
||||||
|
{
|
||||||
|
Node *argNode = (Node *) lfirst(argCell1);
|
||||||
|
|
||||||
|
// Check if the node is valid
|
||||||
|
if (argNode != NULL)
|
||||||
|
{
|
||||||
|
// Use nodeToString() to convert the node into a string representation for debugging
|
||||||
|
char *argStr = nodeToString(argNode);
|
||||||
|
|
||||||
|
// Log the argument index and its string representation
|
||||||
|
elog(DEBUG1, "Argument %d: %s", argIndex1, argStr);
|
||||||
|
|
||||||
|
// Free the string memory after logging (it's a good practice to avoid memory leaks)
|
||||||
|
pfree(argStr);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
elog(DEBUG1, "Argument %d: (null)", argIndex1);
|
||||||
|
}
|
||||||
|
argIndex1++;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
|
DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId,
|
||||||
functionId, 0);
|
functionId, 0);
|
||||||
|
@ -237,6 +264,29 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
localExecutionSupported
|
localExecutionSupported
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const char* NodeTagToString(NodeTag tag)
|
||||||
|
{
|
||||||
|
switch (tag)
|
||||||
|
{
|
||||||
|
case T_Var: return "Var";
|
||||||
|
case T_Const: return "Const";
|
||||||
|
case T_Param: return "Param";
|
||||||
|
case T_FuncExpr: return "FuncExpr";
|
||||||
|
case T_OpExpr: return "OpExpr";
|
||||||
|
case T_BoolExpr: return "BoolExpr";
|
||||||
|
case T_Aggref: return "Aggref";
|
||||||
|
case T_WindowFunc: return "WindowFunc";
|
||||||
|
case T_SubLink: return "SubLink";
|
||||||
|
case T_CoalesceExpr: return "CoalesceExpr";
|
||||||
|
case T_CaseExpr: return "CaseExpr";
|
||||||
|
case T_NullTest: return "NullTest";
|
||||||
|
case T_CollateExpr: return "CollateExpr";
|
||||||
|
case T_FieldSelect: return "FieldSelect";
|
||||||
|
case T_FieldStore: return "FieldStore";
|
||||||
|
case T_SubPlan: return "SubPlan";
|
||||||
|
default: return "Unknown";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create a ParamListInfo structure
|
// Create a ParamListInfo structure
|
||||||
List *argList = funcExpr->args; // Extract arguments from the function expression
|
List *argList = funcExpr->args; // Extract arguments from the function expression
|
||||||
|
@ -250,12 +300,22 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
{
|
{
|
||||||
Node *argNode = (Node *) lfirst(argCell);
|
Node *argNode = (Node *) lfirst(argCell);
|
||||||
|
|
||||||
|
// Log the type of the argument
|
||||||
|
NodeTag nodeType = nodeTag(argNode);
|
||||||
|
elog(DEBUG1, "Processing argument at index %d of type: %s", paramIndex, NodeTagToString(nodeType));
|
||||||
|
|
||||||
|
|
||||||
if (IsA(argNode, Const))
|
if (IsA(argNode, Const))
|
||||||
{
|
{
|
||||||
Const *constArg = (Const *) argNode;
|
Const *constArg = (Const *) argNode;
|
||||||
paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type
|
paramListInfo->params[paramIndex].ptype = constArg->consttype; // Set parameter type
|
||||||
paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value
|
paramListInfo->params[paramIndex].value = constArg->constvalue; // Set parameter value
|
||||||
paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null
|
paramListInfo->params[paramIndex].isnull = constArg->constisnull; // Set if the parameter is null
|
||||||
|
|
||||||
|
// Log the constant parameter's type, value, and null status
|
||||||
|
elog(DEBUG1, "Populating ParamListInfo with constant parameter: paramIndex: %d, paramType: %d, isNull: %s",
|
||||||
|
paramIndex, paramListInfo->params[paramIndex].ptype,
|
||||||
|
constArg->constisnull ? "true" : "false");
|
||||||
}
|
}
|
||||||
else if (IsA(argNode, Param))
|
else if (IsA(argNode, Param))
|
||||||
{
|
{
|
||||||
|
@ -270,6 +330,11 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
ParamExternData paramData;
|
ParamExternData paramData;
|
||||||
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData);
|
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData);
|
||||||
|
|
||||||
|
// Log the fetched parameter details
|
||||||
|
elog(DEBUG1, "paramFetch for paramId: %d returned value: %d, type: %d, isNull: %s",
|
||||||
|
paramArg->paramid, DatumGetInt32(paramData.value), paramData.ptype,
|
||||||
|
paramData.isnull ? "true" : "false");
|
||||||
|
|
||||||
paramListInfo->params[paramIndex].value = paramData.value;
|
paramListInfo->params[paramIndex].value = paramData.value;
|
||||||
paramListInfo->params[paramIndex].isnull = paramData.isnull;
|
paramListInfo->params[paramIndex].isnull = paramData.isnull;
|
||||||
|
|
||||||
|
@ -280,13 +345,68 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
// Handle the case where paramFetch is NULL
|
// Handle the case where paramFetch is NULL
|
||||||
elog(ERROR, "Could not fetch value for parameter: %d", paramArg->paramid);
|
elog(DEBUG1, "Could not fetch value for parameter: %d", paramArg->paramid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (IsA(argNode, FuncExpr))
|
||||||
|
{
|
||||||
|
// FuncExpr *funcExpr = (FuncExpr *) argNode;
|
||||||
|
|
||||||
|
// Log function expression details
|
||||||
|
elog(DEBUG1, "Processing function expression: funcid: %d", funcExpr->funcid);
|
||||||
|
|
||||||
|
// Iterate through the arguments of the function expression
|
||||||
|
ListCell *funcArgCell;
|
||||||
|
foreach(funcArgCell, funcExpr->args)
|
||||||
|
{
|
||||||
|
Node *funcArgNode = (Node *) lfirst(funcArgCell);
|
||||||
|
|
||||||
|
// Check if the argument is a Param or Const
|
||||||
|
if (IsA(funcArgNode, Param))
|
||||||
|
{
|
||||||
|
Param *paramArg = (Param *) funcArgNode;
|
||||||
|
|
||||||
|
// Fetch the parameter value (same as your param-fetch logic)
|
||||||
|
ParamExternData paramData;
|
||||||
|
paramListInfo->paramFetch(paramListInfo, paramArg->paramid, true, ¶mData);
|
||||||
|
|
||||||
|
// Populate ParamListInfo with fetched param
|
||||||
|
paramListInfo->params[paramIndex].ptype = paramArg->paramtype;
|
||||||
|
paramListInfo->params[paramIndex].value = paramData.value;
|
||||||
|
paramListInfo->params[paramIndex].isnull = paramData.isnull;
|
||||||
|
|
||||||
|
// Log fetched parameter details
|
||||||
|
elog(DEBUG1, "Populating ParamListInfo with fetched parameter: paramIndex: %d, paramType: %d, paramValue: %d",
|
||||||
|
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
||||||
|
}
|
||||||
|
else if (IsA(funcArgNode, Const))
|
||||||
|
{
|
||||||
|
Const *constArg = (Const *) funcArgNode;
|
||||||
|
|
||||||
|
// Handle Const values within the function expression
|
||||||
|
paramListInfo->params[paramIndex].ptype = constArg->consttype;
|
||||||
|
paramListInfo->params[paramIndex].value = constArg->constvalue;
|
||||||
|
paramListInfo->params[paramIndex].isnull = constArg->constisnull;
|
||||||
|
|
||||||
|
// Log constant parameter
|
||||||
|
elog(DEBUG1, "Populating ParamListInfo with constant parameter inside function expression: paramIndex: %d, paramType: %d",
|
||||||
|
paramIndex, paramListInfo->params[paramIndex].ptype);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
elog(DEBUG1, "Unsupported argument type in function expression at paramIndex: %d", paramIndex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
// Handle other cases if necessary
|
||||||
|
elog(DEBUG1, "Unsupported argument type at paramIndex: %d", paramIndex);
|
||||||
|
}
|
||||||
|
|
||||||
// Log populated parameters
|
// Log populated parameters
|
||||||
elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d",
|
// elog(DEBUG1, "Populating ParamListInfo, paramIndex: %d, paramType: %d, paramValue: %d",
|
||||||
paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
// paramIndex, paramListInfo->params[paramIndex].ptype, DatumGetInt32(paramListInfo->params[paramIndex].value));
|
||||||
|
|
||||||
paramIndex++;
|
paramIndex++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,30 +36,33 @@ def test_call_param2(cluster):
|
||||||
# Get the coordinator node from the Citus cluster
|
# Get the coordinator node from the Citus cluster
|
||||||
coord = cluster.coordinator
|
coord = cluster.coordinator
|
||||||
|
|
||||||
|
coord.sql("DROP TABLE IF EXISTS t CASCADE")
|
||||||
coord.sql("CREATE TABLE t (p int, i int)")
|
coord.sql("CREATE TABLE t (p int, i int)")
|
||||||
|
coord.sql("SELECT create_distributed_table('t', 'p')")
|
||||||
coord.sql(
|
coord.sql(
|
||||||
"""
|
"""
|
||||||
CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS $$
|
CREATE PROCEDURE f(_p INT, _i INT) LANGUAGE plpgsql AS $$
|
||||||
BEGIN
|
BEGIN
|
||||||
-- Example logic that uses the parameters (you can add your own logic)
|
|
||||||
INSERT INTO t (p, i) VALUES (_p, _i);
|
INSERT INTO t (p, i) VALUES (_p, _i);
|
||||||
|
PERFORM _i;
|
||||||
END; $$
|
END; $$
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
coord.sql("SELECT create_distributed_table('t', 'p')")
|
|
||||||
coord.sql(
|
coord.sql(
|
||||||
"SELECT create_distributed_function('f(int, int)', distribution_arg_name := '_p', colocate_with := 't')"
|
"SELECT create_distributed_function('f(int, int)', distribution_arg_name := '_p', colocate_with := 't')"
|
||||||
)
|
)
|
||||||
|
|
||||||
sql_insert_and_call = "CALL f(1, %s);"
|
sql_insert_and_call = "CALL f(1, 33);"
|
||||||
|
# sql_insert_and_call = "CALL f(%s, 1);"
|
||||||
|
|
||||||
# cluster.coordinator.psql_debug()
|
# cluster.coordinator.psql_debug()
|
||||||
# cluster.debug()
|
# cluster.debug()
|
||||||
|
|
||||||
# After distributing the table, insert more data and call the procedure again
|
# After distributing the table, insert more data and call the procedure again
|
||||||
coord.sql_prepared(sql_insert_and_call, (2,))
|
# coord.sql_prepared(sql_insert_and_call, (33,))
|
||||||
|
coord.sql(sql_insert_and_call)
|
||||||
|
|
||||||
# Step 6: Check the result
|
# Step 6: Check the result
|
||||||
sum_i = coord.sql_value("SELECT count(*) FROM t;")
|
sum_i = coord.sql_value("SELECT i FROM t limit 1;")
|
||||||
|
|
||||||
assert sum_i == 1
|
assert sum_i == 33
|
Loading…
Reference in New Issue