From 603b9cf3c44d442ba10e158180657735f3b53383 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Wed, 19 Apr 2017 13:39:46 -0600 Subject: [PATCH] Handle ProcessUtility changes PostgreSQL 10's ProcessUtility now requires a PlannedStmt with its utilityStmt field set to what we used to pass directly. In addition, it receives a QueryEnvironment reference, altogether new but apparently OK to NULL out for now. This adds a wrapper to adapt to the new style and fixes all callers. --- .../distributed/executor/multi_utility.c | 69 ++++++++++++++++--- src/backend/distributed/shared_library_init.c | 4 ++ .../worker/worker_data_fetch_protocol.c | 33 ++++----- src/include/distributed/multi_utility.h | 11 +++ 4 files changed, 93 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index cb14d8dcf..25d6b53aa 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -163,6 +163,40 @@ static void PostProcessUtility(Node *parsetree); static bool warnedUserAbout2PC = false; +void +multi_ProcessUtility(Node *parsetree, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + DestReceiver *dest, + char *completionTag) +{ + PlannedStmt *plannedStmt = makeNode(PlannedStmt); + plannedStmt->commandType = CMD_UTILITY; + plannedStmt->utilityStmt = parsetree; + + multi_ProcessUtility10(plannedStmt, queryString, context, params, NULL, dest, + completionTag); +} + + +void +CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context, + ParamListInfo params, DestReceiver *dest, char *completionTag) +{ +#if (PG_VERSION_NUM >= 100000) + PlannedStmt *plannedStmt = makeNode(PlannedStmt); + plannedStmt->commandType = CMD_UTILITY; + plannedStmt->utilityStmt = node; + + ProcessUtility(plannedStmt, queryString, context, params, NULL, dest, + completionTag); +#else + ProcessUtility(node, queryString, context, params, dest, completionTag); +#endif +} + + /* * multi_ProcessUtility is the main entry hook for implementing Citus-specific * utility behavior. Its primary responsibilities are intercepting COPY and DDL @@ -173,13 +207,15 @@ static bool warnedUserAbout2PC = false; * TRUNCATE and VACUUM are also supported. */ void -multi_ProcessUtility(Node *parsetree, - const char *queryString, - ProcessUtilityContext context, - ParamListInfo params, - DestReceiver *dest, - char *completionTag) +multi_ProcessUtility10(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + char *completionTag) { + Node *parsetree = pstmt->utilityStmt; bool commandMustRunAsOwner = false; Oid savedUserId = InvalidOid; int savedSecurityContext = 0; @@ -194,8 +230,13 @@ multi_ProcessUtility(Node *parsetree, * that state. Since we never need to intercept transaction statements, * skip our checks and immediately fall into standard_ProcessUtility. */ +#if (PG_VERSION_NUM >= 100000) + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +#else standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); +#endif return; } @@ -213,8 +254,13 @@ multi_ProcessUtility(Node *parsetree, * Ensure that utility commands do not behave any differently until CREATE * EXTENSION is invoked. */ +#if (PG_VERSION_NUM >= 100000) + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +#else standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); +#endif return; } @@ -392,8 +438,14 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); } +#if (PG_VERSION_NUM >= 100000) + pstmt->utilityStmt = parsetree; + standard_ProcessUtility(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +#else standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); +#endif PostProcessUtility(parsetree); @@ -2340,8 +2392,9 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) /* run only a selected set of DDL commands */ if (applyDDLCommand) { - ProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode), - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CitusProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode), + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CommandCounterIncrement(); } } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 181242e8b..08f4e42f1 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -154,7 +154,11 @@ _PG_init(void) planner_hook = multi_planner; /* register utility hook */ +#if (PG_VERSION_NUM >= 100000) + ProcessUtility_hook = multi_ProcessUtility10; +#else ProcessUtility_hook = multi_ProcessUtility; +#endif /* register for planner hook */ set_rel_pathlist_hook = multi_relation_restriction_hook; diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 4e1045f57..0422de4ec 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -31,6 +31,7 @@ #include "distributed/multi_client_executor.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_server_executor.h" +#include "distributed/multi_utility.h" #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/task_tracker.h" @@ -425,8 +426,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS) /* extend names in ddl command and apply extended command */ RelayEventExtendNames(ddlCommandNode, schemaName, shardId); - ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); PG_RETURN_VOID(); } @@ -457,8 +458,8 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS) RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, leftShardSchemaName, rightShardId, rightShardSchemaName); - ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, - None_Receiver, NULL); + CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); PG_RETURN_VOID(); } @@ -493,8 +494,8 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS) } /* run the CREATE SEQUENCE command */ - ProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + CitusProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL, + None_Receiver, NULL); CommandCounterIncrement(); createSequenceStatement = (CreateSeqStmt *) commandNode; @@ -848,8 +849,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName) StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); - ProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + CitusProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); CommandCounterIncrement(); } @@ -867,8 +868,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName) queryString = makeStringInfo(); appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data); - ProcessUtility((Node *) localCopyCommand, queryString->data, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CitusProcessUtility((Node *) localCopyCommand, queryString->data, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); /* finally delete the temporary file we created */ DeleteFile(localFilePath->data); @@ -942,8 +943,8 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName) StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); - ProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, - NULL, None_Receiver, NULL); + CitusProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, + NULL, None_Receiver, NULL); CommandCounterIncrement(); } @@ -1272,8 +1273,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, localFilePath->data); - ProcessUtility((Node *) localCopyCommand, queryString->data, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CitusProcessUtility((Node *) localCopyCommand, queryString->data, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); /* finally delete the temporary file we created */ DeleteFile(localFilePath->data); @@ -1372,8 +1373,8 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName) SetDefElemArg(alterSequenceStatement, "restart", startFloatArg); /* since the command is an AlterSeqStmt, a dummy command string works fine */ - ProcessUtility((Node *) alterSequenceStatement, dummyString, - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CitusProcessUtility((Node *) alterSequenceStatement, dummyString, + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); } } diff --git a/src/include/distributed/multi_utility.h b/src/include/distributed/multi_utility.h index b81262c4d..b46108489 100644 --- a/src/include/distributed/multi_utility.h +++ b/src/include/distributed/multi_utility.h @@ -29,9 +29,20 @@ typedef struct DDLJob List *taskList; /* worker DDL tasks to execute */ } DDLJob; +#if (PG_VERSION_NUM < 100000) +struct QueryEnvironment; /* forward-declare to appease compiler */ +#endif + +extern void multi_ProcessUtility10(PlannedStmt *pstmt, const char *queryString, + ProcessUtilityContext context, ParamListInfo params, + struct QueryEnvironment *queryEnv, DestReceiver *dest, + char *completionTag); extern void multi_ProcessUtility(Node *parsetree, const char *queryString, ProcessUtilityContext context, ParamListInfo params, DestReceiver *dest, char *completionTag); +extern void CitusProcessUtility(Node *node, const char *queryString, + ProcessUtilityContext context, ParamListInfo params, + DestReceiver *dest, char *completionTag); extern List * PlanGrantStmt(GrantStmt *grantStmt); extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId);