Address code review comments

pull/1278/head
Jason Petersen 2017-03-22 15:01:15 -06:00
parent d95b5bbad3
commit 34a62abb7d
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
2 changed files with 55 additions and 54 deletions

View File

@ -146,16 +146,13 @@ static bool warnedUserAbout2PC = false;
/* /*
* Utility for handling citus specific concerns around utility statements. * multi_ProcessUtility is the main entry hook for implementing Citus-specific
* * utility behavior. Its primary responsibilities are intercepting COPY and DDL
* There's two basic types of concerns here: * commands and augmenting the coordinator's command with corresponding tasks
* 1) Intercept utility statements that run after distributed query * to be run on worker nodes, after suitably ensuring said commands' options
* execution. At this stage, the Create Table command for the master node's * are fully supported by Citus. Much of the DDL behavior is toggled by Citus'
* temporary table has been executed, and this table's relationId is * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as
* visible to us. We can therefore update the relationId in master node's * TRUNCATE and VACUUM are also supported.
* select query.
* 2) Handle utility statements on distributed tables that the core code can't
* handle.
*/ */
void void
multi_ProcessUtility(Node *parsetree, multi_ProcessUtility(Node *parsetree,
@ -232,6 +229,8 @@ multi_ProcessUtility(Node *parsetree,
} }
} }
/* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */
if (IsA(parsetree, CreateSeqStmt)) if (IsA(parsetree, CreateSeqStmt))
{ {
ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree); ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
@ -247,10 +246,7 @@ multi_ProcessUtility(Node *parsetree,
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
} }
/* /* only generate worker DDLJobs if propagation is enabled */
* DDL commands are propagated to workers only if EnableDDLPropagation is
* set to true and the current node is the coordinator
*/
if (EnableDDLPropagation) if (EnableDDLPropagation)
{ {
if (IsA(parsetree, IndexStmt)) if (IsA(parsetree, IndexStmt))
@ -335,7 +331,8 @@ multi_ProcessUtility(Node *parsetree,
* Therefore, we check whether the given ALTER TABLE statement is a * Therefore, we check whether the given ALTER TABLE statement is a
* FOREIGN KEY constraint and if so disable the validation step. * FOREIGN KEY constraint and if so disable the validation step.
* Note that validation is done on the shard level when DDL * Note that validation is done on the shard level when DDL
* propagation is enabled. * propagation is enabled. Unlike the preceeding Plan* calls, the
* following eagerly executes some tasks on workers.
*/ */
parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString);
} }
@ -360,13 +357,13 @@ multi_ProcessUtility(Node *parsetree,
" necessary users and roles."))); " necessary users and roles.")));
} }
/* set user if needed and go ahead and run local utility using standard hook */
if (commandMustRunAsOwner) if (commandMustRunAsOwner)
{ {
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
} }
/* now drop into standard process utility */
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
@ -375,6 +372,7 @@ multi_ProcessUtility(Node *parsetree,
SetUserIdAndSecContext(savedUserId, savedSecurityContext); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
} }
/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL) if (ddlJobs != NIL)
{ {
ListCell *ddlJobCell = NULL; ListCell *ddlJobCell = NULL;
@ -387,7 +385,7 @@ multi_ProcessUtility(Node *parsetree,
} }
} }
/* we run VacuumStmt after standard hook to benefit from its checks and locking */ /* TODO: fold VACUUM's processing into the above block */
if (IsA(parsetree, VacuumStmt)) if (IsA(parsetree, VacuumStmt))
{ {
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree; VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
@ -611,12 +609,12 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
/* /*
* PlanIndexStmt processes create index statements for distributed tables. * PlanIndexStmt determines whether a given CREATE INDEX statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the IndexStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static List * static List *
PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
@ -684,12 +682,6 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
ddlJobs = list_make1(ddlJob); ddlJobs = list_make1(ddlJob);
} }
else if (!createIndexStatement->if_not_exists)
{
/* if the index exists and there is no IF NOT EXISTS clause, error */
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
errmsg("relation \"%s\" already exists", indexName)));
}
} }
} }
@ -698,12 +690,12 @@ PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
/* /*
* PlanDropIndexStmt processes drop index statements for distributed tables. * PlanDropIndexStmt determines whether a given DROP INDEX statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the DropStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static List * static List *
PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
@ -777,7 +769,6 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
/* if it is supported, go ahead and execute the command */
ddlJob->targetRelationId = distributedRelationId; ddlJob->targetRelationId = distributedRelationId;
ddlJob->commandString = dropIndexCommand; ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
@ -790,12 +781,12 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
/* /*
* PlanAlterTableStmt processes alter table statements for distributed tables. * PlanAlterTableStmt determines whether a given ALTER TABLE statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the AlterTableStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static List * static List *
PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand)
@ -876,11 +867,13 @@ PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCo
if (rightRelationId) if (rightRelationId)
{ {
/* if foreign key related, use specialized task list function ... */
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
alterTableCommand); alterTableCommand);
} }
else else
{ {
/* ... otherwise use standard DDL task list function */
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand); ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
} }
@ -953,10 +946,10 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
/* /*
* PlanAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * PlanAlterObjectSchemaStmt determines whether a given ALTER ... SET SCHEMA
* objects. The function first checks if the statement belongs to a distributed objects * statement involves a distributed table and issues a warning if so. Because
* or not. If it does, then it checks whether given object is a table. If it is, we warn * we do not support distributed ALTER ... SET SCHEMA, this function always
* out, since we do not support ALTER ... SET SCHEMA * returns NIL.
*/ */
static List * static List *
PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
@ -981,7 +974,7 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
return NIL; return NIL;
} }
/* warn out if a distributed relation is affected */ /* emit a warning if a distributed relation is affected */
ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to " ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to "
"worker nodes"), "worker nodes"),
errhint("Connect to worker nodes directly to manually " errhint("Connect to worker nodes directly to manually "
@ -1973,8 +1966,8 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
/* /*
* ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
* transaction, including metadata sync if needed. If the multi shard commit protocol is * action, including metadata sync if needed. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be * in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by * each shard placement and COMMIT/ROLLBACK is handled by
@ -2363,8 +2356,10 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
/* /*
* PlanGrantStmt replicates GRANT/REVOKE command to worker nodes if the * PlanGrantStmt determines whether a given GRANT/REVOKE statement involves
* the statement affects distributed tables. * a distributed table. If so, it creates DDLJobs to encapsulate information
* needed during the worker node portion of DDL execution before returning the
* DDLJobs in a List. If no distributed table is involved, this returns NIL.
* *
* NB: So far column level privileges are not supported. * NB: So far column level privileges are not supported.
*/ */

View File

@ -14,11 +14,17 @@
extern bool EnableDDLPropagation; extern bool EnableDDLPropagation;
/*
* A DDLJob encapsulates the remote tasks and commands needed to process all or
* part of a distributed DDL command. It hold the distributed relation's oid,
* the original DDL command string (for MX DDL propagation), and a task list of
* DDL_TASK-type Tasks to be executed.
*/
typedef struct DDLJob typedef struct DDLJob
{ {
Oid targetRelationId; Oid targetRelationId; /* oid of the target distributed relation */
const char *commandString; const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; List *taskList; /* worker DDL tasks to execute */
} DDLJob; } DDLJob;
extern void multi_ProcessUtility(Node *parsetree, const char *queryString, extern void multi_ProcessUtility(Node *parsetree, const char *queryString,