diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index 4251de854..1f3cf58b4 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -14,6 +14,7 @@ #include "catalog/pg_proc.h" #include "commands/defrem.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/multi_copy.h" @@ -38,15 +39,13 @@ static bool CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, const char *queryString, - DestReceiver *dest); + FuncExpr *funcExpr, DestReceiver *dest); /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. */ bool -CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, - DestReceiver *dest) +CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) { DistObjectCacheEntry *procedure = NULL; FuncExpr *funcExpr = callStmt->funcexpr; @@ -58,7 +57,7 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, return false; } - return CallFuncExprRemotely(callStmt, procedure, funcExpr, queryString, dest); + return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest); } @@ -67,7 +66,7 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, const char *queryString, */ static bool CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, const char *queryString, DestReceiver *dest) + FuncExpr *funcExpr, DestReceiver *dest) { Oid colocatedRelationId = InvalidOid; Const *partitionValue = NULL; @@ -78,6 +77,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, Var *partitionColumn = NULL; ShardPlacement *placement = NULL; WorkerNode *workerNode = NULL; + StringInfo callCommand = NULL; if (IsMultiStatementTransaction()) { @@ -160,6 +160,10 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, ereport(DEBUG1, (errmsg("pushing down the procedure"))); + /* build remote command with fully qualified names */ + callCommand = makeStringInfo(); + appendStringInfo(callCommand, "CALL %s", pg_get_rule_expr((Node *) funcExpr)); + { Tuplestorestate *tupleStore = tuplestore_begin_heap(true, false, work_mem); TupleDesc tupleDesc = CallStmtResultDesc(callStmt); @@ -170,7 +174,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, task->jobId = INVALID_JOB_ID; task->taskId = 0; task->taskType = DDL_TASK; - task->queryString = pstrdup(queryString); + task->queryString = callCommand->data; task->replicationModel = REPLICATION_MODEL_INVALID; task->dependedTaskList = NIL; task->anchorShardId = placement->shardId; diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 8f2664312..e1f90aa22 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -209,7 +209,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, * the worker this can avoid making many network round trips. */ if (context == PROCESS_UTILITY_TOPLEVEL && - CallDistributedProcedureRemotely(callStmt, queryString, dest)) + CallDistributedProcedureRemotely(callStmt, dest)) { return; } diff --git a/src/backend/distributed/utils/ruleutils_10.c b/src/backend/distributed/utils/ruleutils_10.c index 79af4d086..31d6ab509 100644 --- a/src/backend/distributed/utils/ruleutils_10.c +++ b/src/backend/distributed/utils/ruleutils_10.c @@ -451,6 +451,48 @@ pg_get_query_def(Query *query, StringInfo buffer) } +/* + * pg_get_rule_expr deparses an expression and returns the result as a string. + */ +char * +pg_get_rule_expr(Node *expression) +{ + bool showImplicitCasts = true; + deparse_context context; + OverrideSearchPath *overridePath = NULL; + StringInfo buffer = makeStringInfo(); + + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + PushOverrideSearchPath(overridePath); + + context.buf = buffer; + context.namespaces = NIL; + context.windowClause = NIL; + context.windowTList = NIL; + context.varprefix = false; + context.prettyFlags = 0; + context.wrapColumn = WRAP_COLUMN_DEFAULT; + context.indentLevel = 0; + context.special_exprkind = EXPR_KIND_NONE; + context.distrelid = InvalidOid; + context.shardid = INVALID_SHARD_ID; + + get_rule_expr(expression, &context, showImplicitCasts); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return buffer->data; +} + + /* * set_rtable_names: select RTE aliases to be used in printing a query * diff --git a/src/backend/distributed/utils/ruleutils_11.c b/src/backend/distributed/utils/ruleutils_11.c index 13a7adbed..4b3e3ddba 100644 --- a/src/backend/distributed/utils/ruleutils_11.c +++ b/src/backend/distributed/utils/ruleutils_11.c @@ -451,6 +451,48 @@ pg_get_query_def(Query *query, StringInfo buffer) } +/* + * pg_get_rule_expr deparses an expression and returns the result as a string. + */ +char * +pg_get_rule_expr(Node *expression) +{ + bool showImplicitCasts = true; + deparse_context context; + OverrideSearchPath *overridePath = NULL; + StringInfo buffer = makeStringInfo(); + + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + PushOverrideSearchPath(overridePath); + + context.buf = buffer; + context.namespaces = NIL; + context.windowClause = NIL; + context.windowTList = NIL; + context.varprefix = false; + context.prettyFlags = 0; + context.wrapColumn = WRAP_COLUMN_DEFAULT; + context.indentLevel = 0; + context.special_exprkind = EXPR_KIND_NONE; + context.distrelid = InvalidOid; + context.shardid = INVALID_SHARD_ID; + + get_rule_expr(expression, &context, showImplicitCasts); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return buffer->data; +} + + /* * set_rtable_names: select RTE aliases to be used in printing a query * diff --git a/src/backend/distributed/utils/ruleutils_12.c b/src/backend/distributed/utils/ruleutils_12.c index aa95cd3fa..b786515ee 100644 --- a/src/backend/distributed/utils/ruleutils_12.c +++ b/src/backend/distributed/utils/ruleutils_12.c @@ -452,6 +452,48 @@ pg_get_query_def(Query *query, StringInfo buffer) } +/* + * pg_get_rule_expr deparses an expression and returns the result as a string. + */ +char * +pg_get_rule_expr(Node *expression) +{ + bool showImplicitCasts = true; + deparse_context context; + OverrideSearchPath *overridePath = NULL; + StringInfo buffer = makeStringInfo(); + + /* + * Set search_path to NIL so that all objects outside of pg_catalog will be + * schema-prefixed. pg_catalog will be added automatically when we call + * PushOverrideSearchPath(), since we set addCatalog to true; + */ + overridePath = GetOverrideSearchPath(CurrentMemoryContext); + overridePath->schemas = NIL; + overridePath->addCatalog = true; + PushOverrideSearchPath(overridePath); + + context.buf = buffer; + context.namespaces = NIL; + context.windowClause = NIL; + context.windowTList = NIL; + context.varprefix = false; + context.prettyFlags = 0; + context.wrapColumn = WRAP_COLUMN_DEFAULT; + context.indentLevel = 0; + context.special_exprkind = EXPR_KIND_NONE; + context.distrelid = InvalidOid; + context.shardid = INVALID_SHARD_ID; + + get_rule_expr(expression, &context, showImplicitCasts); + + /* revert back to original search_path */ + PopOverrideSearchPath(); + + return buffer->data; +} + + /* * set_rtable_names: select RTE aliases to be used in printing a query * diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 19b4b4118..204173671 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -43,6 +43,7 @@ extern const char * RoleSpecString(RoleSpec *spec); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); +char * pg_get_rule_expr(Node *expression); extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo buffer); extern char * generate_relation_name(Oid relid, List *namespaces); diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c6285f911..94befe461 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -26,8 +26,7 @@ extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterComma #if PG_VERSION_NUM >= 110000 /* call.c */ -extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, const char *callCommand, - DestReceiver *dest); +extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); #endif /* PG_VERSION_NUM >= 110000 */ /* extension.c - forward declarations */