Handle ProcessUtility changes

PostgreSQL 10's ProcessUtility now requires a PlannedStmt with its
utilityStmt field set to what we used to pass directly.

In addition, it receives a QueryEnvironment reference, altogether new
but apparently OK to NULL out for now.

This adds a wrapper to adapt to the new style and fixes all callers.
pull/1439/head
Jason Petersen 2017-04-19 13:39:46 -06:00
parent a34e9e30d7
commit 603b9cf3c4
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
4 changed files with 93 additions and 24 deletions

View File

@ -163,6 +163,40 @@ static void PostProcessUtility(Node *parsetree);
static bool warnedUserAbout2PC = false; static bool warnedUserAbout2PC = false;
void
multi_ProcessUtility(Node *parsetree,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
DestReceiver *dest,
char *completionTag)
{
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY;
plannedStmt->utilityStmt = parsetree;
multi_ProcessUtility10(plannedStmt, queryString, context, params, NULL, dest,
completionTag);
}
void
CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context,
ParamListInfo params, DestReceiver *dest, char *completionTag)
{
#if (PG_VERSION_NUM >= 100000)
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY;
plannedStmt->utilityStmt = node;
ProcessUtility(plannedStmt, queryString, context, params, NULL, dest,
completionTag);
#else
ProcessUtility(node, queryString, context, params, dest, completionTag);
#endif
}
/* /*
* multi_ProcessUtility is the main entry hook for implementing Citus-specific * multi_ProcessUtility is the main entry hook for implementing Citus-specific
* utility behavior. Its primary responsibilities are intercepting COPY and DDL * utility behavior. Its primary responsibilities are intercepting COPY and DDL
@ -173,13 +207,15 @@ static bool warnedUserAbout2PC = false;
* TRUNCATE and VACUUM are also supported. * TRUNCATE and VACUUM are also supported.
*/ */
void void
multi_ProcessUtility(Node *parsetree, multi_ProcessUtility10(PlannedStmt *pstmt,
const char *queryString, const char *queryString,
ProcessUtilityContext context, ProcessUtilityContext context,
ParamListInfo params, ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest, DestReceiver *dest,
char *completionTag) char *completionTag)
{ {
Node *parsetree = pstmt->utilityStmt;
bool commandMustRunAsOwner = false; bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
@ -194,8 +230,13 @@ multi_ProcessUtility(Node *parsetree,
* that state. Since we never need to intercept transaction statements, * that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility. * skip our checks and immediately fall into standard_ProcessUtility.
*/ */
#if (PG_VERSION_NUM >= 100000)
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
#endif
return; return;
} }
@ -213,8 +254,13 @@ multi_ProcessUtility(Node *parsetree,
* Ensure that utility commands do not behave any differently until CREATE * Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked. * EXTENSION is invoked.
*/ */
#if (PG_VERSION_NUM >= 100000)
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
#endif
return; return;
} }
@ -392,8 +438,14 @@ multi_ProcessUtility(Node *parsetree,
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
} }
#if (PG_VERSION_NUM >= 100000)
pstmt->utilityStmt = parsetree;
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
#endif
PostProcessUtility(parsetree); PostProcessUtility(parsetree);
@ -2340,8 +2392,9 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
/* run only a selected set of DDL commands */ /* run only a selected set of DDL commands */
if (applyDDLCommand) if (applyDDLCommand)
{ {
ProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode), CitusProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode),
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
CommandCounterIncrement(); CommandCounterIncrement();
} }
} }

View File

@ -154,7 +154,11 @@ _PG_init(void)
planner_hook = multi_planner; planner_hook = multi_planner;
/* register utility hook */ /* register utility hook */
#if (PG_VERSION_NUM >= 100000)
ProcessUtility_hook = multi_ProcessUtility10;
#else
ProcessUtility_hook = multi_ProcessUtility; ProcessUtility_hook = multi_ProcessUtility;
#endif
/* register for planner hook */ /* register for planner hook */
set_rel_pathlist_hook = multi_relation_restriction_hook; set_rel_pathlist_hook = multi_relation_restriction_hook;

View File

@ -31,6 +31,7 @@
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/multi_logical_optimizer.h" #include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/relay_utility.h" #include "distributed/relay_utility.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
@ -425,8 +426,8 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
/* extend names in ddl command and apply extended command */ /* extend names in ddl command and apply extended command */
RelayEventExtendNames(ddlCommandNode, schemaName, shardId); RelayEventExtendNames(ddlCommandNode, schemaName, shardId);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL,
NULL, None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -457,7 +458,7 @@ worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId, RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId, leftShardSchemaName, rightShardId,
rightShardSchemaName); rightShardSchemaName);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL, CitusProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL); None_Receiver, NULL);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -493,8 +494,8 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
} }
/* run the CREATE SEQUENCE command */ /* run the CREATE SEQUENCE command */
ProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, CitusProcessUtility(commandNode, commandString, PROCESS_UTILITY_TOPLEVEL, NULL,
NULL, None_Receiver, NULL); None_Receiver, NULL);
CommandCounterIncrement(); CommandCounterIncrement();
createSequenceStatement = (CreateSeqStmt *) commandNode; createSequenceStatement = (CreateSeqStmt *) commandNode;
@ -848,7 +849,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
ProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, CitusProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
CommandCounterIncrement(); CommandCounterIncrement();
} }
@ -867,7 +868,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
queryString = makeStringInfo(); queryString = makeStringInfo();
appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data); appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data);
ProcessUtility((Node *) localCopyCommand, queryString->data, CitusProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
/* finally delete the temporary file we created */ /* finally delete the temporary file we created */
@ -942,7 +943,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
ProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL, CitusProcessUtility(ddlCommandNode, ddlCommand->data, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL); NULL, None_Receiver, NULL);
CommandCounterIncrement(); CommandCounterIncrement();
} }
@ -1272,7 +1273,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName, appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
localFilePath->data); localFilePath->data);
ProcessUtility((Node *) localCopyCommand, queryString->data, CitusProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
/* finally delete the temporary file we created */ /* finally delete the temporary file we created */
@ -1372,7 +1373,7 @@ AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName)
SetDefElemArg(alterSequenceStatement, "restart", startFloatArg); SetDefElemArg(alterSequenceStatement, "restart", startFloatArg);
/* since the command is an AlterSeqStmt, a dummy command string works fine */ /* since the command is an AlterSeqStmt, a dummy command string works fine */
ProcessUtility((Node *) alterSequenceStatement, dummyString, CitusProcessUtility((Node *) alterSequenceStatement, dummyString,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
} }
} }

View File

@ -29,9 +29,20 @@ typedef struct DDLJob
List *taskList; /* worker DDL tasks to execute */ List *taskList; /* worker DDL tasks to execute */
} DDLJob; } DDLJob;
#if (PG_VERSION_NUM < 100000)
struct QueryEnvironment; /* forward-declare to appease compiler */
#endif
extern void multi_ProcessUtility10(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
struct QueryEnvironment *queryEnv, DestReceiver *dest,
char *completionTag);
extern void multi_ProcessUtility(Node *parsetree, const char *queryString, extern void multi_ProcessUtility(Node *parsetree, const char *queryString,
ProcessUtilityContext context, ParamListInfo params, ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag); DestReceiver *dest, char *completionTag);
extern void CitusProcessUtility(Node *node, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag);
extern List * PlanGrantStmt(GrantStmt *grantStmt); extern List * PlanGrantStmt(GrantStmt *grantStmt);
extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);