Merge pull request #1278 from citusdata/master_ddl_first

Execute DDL on coordinator before workers

cr: @metdos, @anarazel
pull/1175/merge
Jason Petersen 2017-03-22 17:45:06 -06:00 committed by GitHub
commit dc8c12f8b0
11 changed files with 174 additions and 179 deletions

View File

@ -101,17 +101,16 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
/* Local functions forward declarations for processing distributed table commands */ /* Local functions forward declarations for processing distributed table commands */
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner); bool *commandMustRunAsOwner);
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, static List * PlanIndexStmt(IndexStmt *createIndexStatement,
const char *createIndexCommand, bool isTopLevel); const char *createIndexCommand);
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, static List * PlanDropIndexStmt(DropStmt *dropIndexStatement,
const char *dropIndexCommand, bool isTopLevel); const char *dropIndexCommand);
static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, static List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand, bool isTopLevel); const char *alterTableCommand);
static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement, static Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand); const char *alterTableCommand);
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, static List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand, const char *alterObjectSchemaCommand);
bool isTopLevel);
static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand); static void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt); static bool IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt);
static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt); static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt);
@ -132,11 +131,7 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
bool isTopLevel);
static void ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
const char *ddlCommandString,
bool isTopLevel);
static void ShowNoticeIfNotUsing2PC(void); static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString); static List * DDLTaskList(Oid relationId, const char *commandString);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
@ -151,16 +146,13 @@ static bool warnedUserAbout2PC = false;
/* /*
* Utility for handling citus specific concerns around utility statements. * multi_ProcessUtility is the main entry hook for implementing Citus-specific
* * utility behavior. Its primary responsibilities are intercepting COPY and DDL
* There's two basic types of concerns here: * commands and augmenting the coordinator's command with corresponding tasks
* 1) Intercept utility statements that run after distributed query * to be run on worker nodes, after suitably ensuring said commands' options
* execution. At this stage, the Create Table command for the master node's * are fully supported by Citus. Much of the DDL behavior is toggled by Citus'
* temporary table has been executed, and this table's relationId is * enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as
* visible to us. We can therefore update the relationId in master node's * TRUNCATE and VACUUM are also supported.
* select query.
* 2) Handle utility statements on distributed tables that the core code can't
* handle.
*/ */
void void
multi_ProcessUtility(Node *parsetree, multi_ProcessUtility(Node *parsetree,
@ -173,6 +165,7 @@ multi_ProcessUtility(Node *parsetree,
bool commandMustRunAsOwner = false; bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
List *ddlJobs = NIL;
if (IsA(parsetree, TransactionStmt)) if (IsA(parsetree, TransactionStmt))
{ {
@ -236,6 +229,8 @@ multi_ProcessUtility(Node *parsetree,
} }
} }
/* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */
if (IsA(parsetree, CreateSeqStmt)) if (IsA(parsetree, CreateSeqStmt))
{ {
ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree); ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
@ -251,18 +246,12 @@ multi_ProcessUtility(Node *parsetree,
ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree); ErrorIfUnsupportedTruncateStmt((TruncateStmt *) parsetree);
} }
/* /* only generate worker DDLJobs if propagation is enabled */
* DDL commands are propagated to workers only if EnableDDLPropagation is
* set to true and the current node is the coordinator
*/
if (EnableDDLPropagation) if (EnableDDLPropagation)
{ {
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
if (IsA(parsetree, IndexStmt)) if (IsA(parsetree, IndexStmt))
{ {
parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, ddlJobs = PlanIndexStmt((IndexStmt *) parsetree, queryString);
isTopLevel);
} }
if (IsA(parsetree, DropStmt)) if (IsA(parsetree, DropStmt))
@ -270,7 +259,7 @@ multi_ProcessUtility(Node *parsetree,
DropStmt *dropStatement = (DropStmt *) parsetree; DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType == OBJECT_INDEX) if (dropStatement->removeType == OBJECT_INDEX)
{ {
parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); ddlJobs = PlanDropIndexStmt(dropStatement, queryString);
} }
} }
@ -279,8 +268,7 @@ multi_ProcessUtility(Node *parsetree,
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
if (alterTableStmt->relkind == OBJECT_TABLE) if (alterTableStmt->relkind == OBJECT_TABLE)
{ {
parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, ddlJobs = PlanAlterTableStmt(alterTableStmt, queryString);
isTopLevel);
} }
} }
@ -304,8 +292,7 @@ multi_ProcessUtility(Node *parsetree,
if (IsA(parsetree, AlterObjectSchemaStmt)) if (IsA(parsetree, AlterObjectSchemaStmt))
{ {
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree; AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString, ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
isTopLevel);
} }
/* /*
@ -344,7 +331,8 @@ multi_ProcessUtility(Node *parsetree,
* Therefore, we check whether the given ALTER TABLE statement is a * Therefore, we check whether the given ALTER TABLE statement is a
* FOREIGN KEY constraint and if so disable the validation step. * FOREIGN KEY constraint and if so disable the validation step.
* Note that validation is done on the shard level when DDL * Note that validation is done on the shard level when DDL
* propagation is enabled. * propagation is enabled. Unlike the preceeding Plan* calls, the
* following eagerly executes some tasks on workers.
*/ */
parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString); parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString);
} }
@ -369,13 +357,13 @@ multi_ProcessUtility(Node *parsetree,
" necessary users and roles."))); " necessary users and roles.")));
} }
/* set user if needed and go ahead and run local utility using standard hook */
if (commandMustRunAsOwner) if (commandMustRunAsOwner)
{ {
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
} }
/* now drop into standard process utility */
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
@ -384,7 +372,20 @@ multi_ProcessUtility(Node *parsetree,
SetUserIdAndSecContext(savedUserId, savedSecurityContext); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
} }
/* we run VacuumStmt after standard hook to benefit from its checks and locking */ /* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{
ListCell *ddlJobCell = NULL;
foreach(ddlJobCell, ddlJobs)
{
DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell);
ExecuteDistributedDDLJob(ddlJob);
}
}
/* TODO: fold VACUUM's processing into the above block */
if (IsA(parsetree, VacuumStmt)) if (IsA(parsetree, VacuumStmt))
{ {
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree; VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
@ -608,17 +609,18 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
/* /*
* ProcessIndexStmt processes create index statements for distributed tables. * PlanIndexStmt determines whether a given CREATE INDEX statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the IndexStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static Node * static List *
ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
bool isTopLevel)
{ {
List *ddlJobs = NIL;
/* /*
* We first check whether a distributed relation is affected. For that, we need to * We first check whether a distributed relation is affected. For that, we need to
* open the relation. To prevent race conditions with later lookups, lock the table, * open the relation. To prevent race conditions with later lookups, lock the table,
@ -673,33 +675,32 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
/* if index does not exist, send the command to workers */ /* if index does not exist, send the command to workers */
if (!OidIsValid(indexRelationId)) if (!OidIsValid(indexRelationId))
{ {
ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
} ddlJob->targetRelationId = relationId;
else if (!createIndexStatement->if_not_exists) ddlJob->commandString = createIndexCommand;
{ ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
/* if the index exists and there is no IF NOT EXISTS clause, error */
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE), ddlJobs = list_make1(ddlJob);
errmsg("relation \"%s\" already exists", indexName)));
} }
} }
} }
return (Node *) createIndexStatement; return ddlJobs;
} }
/* /*
* ProcessDropIndexStmt processes drop index statements for distributed tables. * PlanDropIndexStmt determines whether a given DROP INDEX statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the DropStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static Node * static List *
ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
bool isTopLevel)
{ {
List *ddlJobs = NIL;
ListCell *dropObjectCell = NULL; ListCell *dropObjectCell = NULL;
Oid distributedIndexId = InvalidOid; Oid distributedIndexId = InvalidOid;
Oid distributedRelationId = InvalidOid; Oid distributedRelationId = InvalidOid;
@ -764,28 +765,34 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
if (OidIsValid(distributedIndexId)) if (OidIsValid(distributedIndexId))
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
/* if it is supported, go ahead and execute the command */ ddlJob->targetRelationId = distributedRelationId;
ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
ddlJobs = list_make1(ddlJob);
} }
return (Node *) dropIndexStatement; return ddlJobs;
} }
/* /*
* ProcessAlterTableStmt processes alter table statements for distributed tables. * PlanAlterTableStmt determines whether a given ALTER TABLE statement involves
* The function first checks if the statement belongs to a distributed table * a distributed table. If so (and if the statement does not use unsupported
* or not. If it does, then it executes distributed logic for the command. * options), it modifies the input statement to ensure proper execution against
* * the master node table and creates a DDLJob to encapsulate information needed
* The function returns the AlterTableStmt node for the command to be executed on the * during the worker node portion of DDL execution before returning that DDLJob
* master node table. * in a List. If no distributed table is involved, this function returns NIL.
*/ */
static Node * static List *
ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, PlanAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand)
bool isTopLevel)
{ {
List *ddlJobs = NIL;
DDLJob *ddlJob = NULL;
LOCKMODE lockmode = 0; LOCKMODE lockmode = 0;
Oid leftRelationId = InvalidOid; Oid leftRelationId = InvalidOid;
Oid rightRelationId = InvalidOid; Oid rightRelationId = InvalidOid;
@ -796,20 +803,20 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
/* first check whether a distributed relation is affected */ /* first check whether a distributed relation is affected */
if (alterTableStatement->relation == NULL) if (alterTableStatement->relation == NULL)
{ {
return (Node *) alterTableStatement; return NIL;
} }
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
if (!OidIsValid(leftRelationId)) if (!OidIsValid(leftRelationId))
{ {
return (Node *) alterTableStatement; return NIL;
} }
isDistributedRelation = IsDistributedTable(leftRelationId); isDistributedRelation = IsDistributedTable(leftRelationId);
if (!isDistributedRelation) if (!isDistributedRelation)
{ {
return (Node *) alterTableStatement; return NIL;
} }
ErrorIfUnsupportedAlterTableStmt(alterTableStatement); ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
@ -854,17 +861,25 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
} }
} }
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->commandString = alterTableCommand;
if (rightRelationId) if (rightRelationId)
{ {
ExecuteDistributedForeignKeyCommand(leftRelationId, rightRelationId, /* if foreign key related, use specialized task list function ... */
alterTableCommand, isTopLevel); ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
alterTableCommand);
} }
else else
{ {
ExecuteDistributedDDLCommand(leftRelationId, alterTableCommand, isTopLevel); /* ... otherwise use standard DDL task list function */
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
} }
return (Node *) alterTableStatement; ddlJobs = list_make1(ddlJob);
return ddlJobs;
} }
@ -931,21 +946,21 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
/* /*
* ProcessAlterObjectSchemaStmt processes ALTER ... SET SCHEMA statements for distributed * PlanAlterObjectSchemaStmt determines whether a given ALTER ... SET SCHEMA
* objects. The function first checks if the statement belongs to a distributed objects * statement involves a distributed table and issues a warning if so. Because
* or not. If it does, then it checks whether given object is a table. If it is, we warn * we do not support distributed ALTER ... SET SCHEMA, this function always
* out, since we do not support ALTER ... SET SCHEMA * returns NIL.
*/ */
static Node * static List *
ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand, bool isTopLevel) const char *alterObjectSchemaCommand)
{ {
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
bool noWait = false; bool noWait = false;
if (alterObjectSchemaStmt->relation == NULL) if (alterObjectSchemaStmt->relation == NULL)
{ {
return (Node *) alterObjectSchemaStmt; return NIL;
} }
relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation, relationId = RangeVarGetRelidExtended(alterObjectSchemaStmt->relation,
@ -956,16 +971,16 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
/* first check whether a distributed relation is affected */ /* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId)) if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{ {
return (Node *) alterObjectSchemaStmt; return NIL;
} }
/* warn out if a distributed relation is affected */ /* emit a warning if a distributed relation is affected */
ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to " ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to "
"worker nodes"), "worker nodes"),
errhint("Connect to worker nodes directly to manually " errhint("Connect to worker nodes directly to manually "
"change schemas of affected objects."))); "change schemas of affected objects.")));
return (Node *) alterObjectSchemaStmt; return NIL;
} }
@ -1951,19 +1966,17 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
/* /*
* ExecuteDistributedDDLCommand applies a given DDL command to the given * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
* distributed table in a distributed transaction. If the multi shard commit protocol is * action, including metadata sync if needed. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be * in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by * each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function. * CompleteShardPlacementTransactions function.
*/ */
static void static void
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ExecuteDistributedDDLJob(DDLJob *ddlJob)
bool isTopLevel)
{ {
List *taskList = NIL; bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (XactModificationLevel == XACT_MODIFICATION_DATA) if (XactModificationLevel == XACT_MODIFICATION_DATA)
{ {
@ -1979,60 +1992,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
if (shouldSyncMetadata) if (shouldSyncMetadata)
{ {
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
} }
taskList = DDLTaskList(relationId, ddlCommandString); ExecuteModifyTasksWithoutResults(ddlJob->taskList);
ExecuteModifyTasksWithoutResults(taskList);
}
/*
* ExecuteDistributedForeignKeyCommand applies a given foreign key command to the given
* distributed table in a distributed transaction. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function.
*
* leftRelationId is the relation id of actual distributed table which given foreign key
* command is applied. rightRelationId is the relation id of distributed table which
* foreign key refers to.
*/
static void
ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
const char *ddlCommandString, bool isTopLevel)
{
List *taskList = NIL;
bool shouldSyncMetadata = false;
if (XactModificationLevel == XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed DDL commands must not appear within "
"transaction blocks containing single-shard data "
"modifications")));
}
EnsureCoordinator();
ShowNoticeIfNotUsing2PC();
/*
* It is sufficient to check only one of the tables for metadata syncing on workers,
* since the colocation of two tables implies that either both or none of them have
* metadata on workers.
*/
shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId);
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString);
}
taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString);
ExecuteModifyTasksWithoutResults(taskList);
} }
@ -2393,15 +2356,16 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
/* /*
* ReplicateGrantStmt replicates GRANT/REVOKE command to worker nodes if the * PlanGrantStmt determines whether a given GRANT/REVOKE statement involves
* the statement affects distributed tables. * a distributed table. If so, it creates DDLJobs to encapsulate information
* needed during the worker node portion of DDL execution before returning the
* DDLJobs in a List. If no distributed table is involved, this returns NIL.
* *
* NB: So far column level privileges are not supported. * NB: So far column level privileges are not supported.
*/ */
void List *
ReplicateGrantStmt(Node *parsetree) PlanGrantStmt(GrantStmt *grantStmt)
{ {
GrantStmt *grantStmt = (GrantStmt *) parsetree;
StringInfoData privsString; StringInfoData privsString;
StringInfoData granteesString; StringInfoData granteesString;
StringInfoData targetString; StringInfoData targetString;
@ -2409,6 +2373,7 @@ ReplicateGrantStmt(Node *parsetree)
ListCell *granteeCell = NULL; ListCell *granteeCell = NULL;
ListCell *objectCell = NULL; ListCell *objectCell = NULL;
bool isFirst = true; bool isFirst = true;
List *ddlJobs = NIL;
initStringInfo(&privsString); initStringInfo(&privsString);
initStringInfo(&granteesString); initStringInfo(&granteesString);
@ -2422,7 +2387,7 @@ ReplicateGrantStmt(Node *parsetree)
if (grantStmt->targtype != ACL_TARGET_OBJECT || if (grantStmt->targtype != ACL_TARGET_OBJECT ||
grantStmt->objtype != ACL_OBJECT_RELATION) grantStmt->objtype != ACL_OBJECT_RELATION)
{ {
return; return NIL;
} }
/* deparse the privileges */ /* deparse the privileges */
@ -2493,7 +2458,7 @@ ReplicateGrantStmt(Node *parsetree)
RangeVar *relvar = (RangeVar *) lfirst(objectCell); RangeVar *relvar = (RangeVar *) lfirst(objectCell);
Oid relOid = RangeVarGetRelid(relvar, NoLock, false); Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
const char *grantOption = ""; const char *grantOption = "";
bool isTopLevel = true; DDLJob *ddlJob = NULL;
if (!IsDistributedTable(relOid)) if (!IsDistributedTable(relOid))
{ {
@ -2526,7 +2491,15 @@ ReplicateGrantStmt(Node *parsetree)
granteesString.data); granteesString.data);
} }
ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relOid;
ddlJob->commandString = pstrdup(ddlString.data);
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);
ddlJobs = lappend(ddlJobs, ddlJob);
resetStringInfo(&ddlString); resetStringInfo(&ddlString);
} }
return ddlJobs;
} }

View File

@ -14,10 +14,23 @@
extern bool EnableDDLPropagation; extern bool EnableDDLPropagation;
/*
* A DDLJob encapsulates the remote tasks and commands needed to process all or
* part of a distributed DDL command. It hold the distributed relation's oid,
* the original DDL command string (for MX DDL propagation), and a task list of
* DDL_TASK-type Tasks to be executed.
*/
typedef struct DDLJob
{
Oid targetRelationId; /* oid of the target distributed relation */
const char *commandString; /* initial (coordinator) DDL command string */
List *taskList; /* worker DDL tasks to execute */
} DDLJob;
extern void multi_ProcessUtility(Node *parsetree, const char *queryString, extern void multi_ProcessUtility(Node *parsetree, const char *queryString,
ProcessUtilityContext context, ParamListInfo params, ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag); DestReceiver *dest, char *completionTag);
extern void ReplicateGrantStmt(Node *parsetree); extern List * PlanGrantStmt(GrantStmt *grantStmt);
#endif /* MULTI_UTILITY_H */ #endif /* MULTI_UTILITY_H */

View File

@ -163,14 +163,10 @@ ERROR: creating unique indexes on append-partitioned tables is currently unsupp
CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey); CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey);
ERROR: relation "lineitem_orderkey_index" already exists ERROR: relation "lineitem_orderkey_index" already exists
CREATE INDEX try_index ON lineitem USING gist (l_orderkey); CREATE INDEX try_index ON lineitem USING gist (l_orderkey);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
ERROR: data type bigint has no default operator class for access method "gist" ERROR: data type bigint has no default operator class for access method "gist"
HINT: You must specify an operator class for the index or define a default operator class for the data type. HINT: You must specify an operator class for the index or define a default operator class for the data type.
CONTEXT: while executing command on localhost:57638
CREATE INDEX try_index ON lineitem (non_existent_column); CREATE INDEX try_index ON lineitem (non_existent_column);
ERROR: column "non_existent_column" does not exist ERROR: column "non_existent_column" does not exist
CONTEXT: while executing command on localhost:57638
CREATE INDEX ON lineitem (l_orderkey); CREATE INDEX ON lineitem (l_orderkey);
ERROR: creating index without a name on a distributed table is currently unsupported ERROR: creating index without a name on a distributed table is currently unsupported
-- Verify that none of failed indexes got created on the master node -- Verify that none of failed indexes got created on the master node
@ -205,6 +201,8 @@ DROP INDEX CONCURRENTLY lineitem_orderkey_index;
ERROR: dropping indexes concurrently on distributed tables is currently unsupported ERROR: dropping indexes concurrently on distributed tables is currently unsupported
-- Verify that we can succesfully drop indexes -- Verify that we can succesfully drop indexes
DROP INDEX lineitem_orderkey_index; DROP INDEX lineitem_orderkey_index;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DROP INDEX lineitem_orderkey_index_new; DROP INDEX lineitem_orderkey_index_new;
DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partkey_desc_index;
DROP INDEX lineitem_partial_index; DROP INDEX lineitem_partial_index;

View File

@ -43,9 +43,9 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1);
(1 row) (1 row)
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
CREATE TABLE orders_hash ( CREATE TABLE orders_hash (
o_orderkey bigint not null, o_orderkey bigint not null,
o_custkey integer not null, o_custkey integer not null,

View File

@ -142,9 +142,9 @@ Indexes:
\c - - - :master_port \c - - - :master_port
SET client_min_messages TO debug2; SET client_min_messages TO debug2;
CREATE INDEX ddl_test_index ON mx_ddl_table(value); CREATE INDEX ddl_test_index ON mx_ddl_table(value);
DEBUG: building index "ddl_test_index" on table "mx_ddl_table"
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DEBUG: building index "ddl_test_index" on table "mx_ddl_table"
RESET client_min_messages; RESET client_min_messages;
DROP INDEX ddl_test_index; DROP INDEX ddl_test_index;
-- show that sequences owned by mx tables result in unique values -- show that sequences owned by mx tables result in unique values

View File

@ -183,7 +183,7 @@ ABORT;
-- applies to DDL -- applies to DDL
BEGIN; BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
ALTER TABLE labs_mx ADD COLUMN text motto; ALTER TABLE labs_mx ADD COLUMN motto text;
ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications ERROR: distributed DDL commands must not appear within transaction blocks containing single-shard data modifications
COMMIT; COMMIT;
-- doesn't apply to COPY after modifications -- doesn't apply to COPY after modifications

View File

@ -610,9 +610,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
\c - - - :master_port \c - - - :master_port
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column; ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col; ALTER TABLE test_schema_support.nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped -- verify column is dropped
\d test_schema_support.nation_hash; \d test_schema_support.nation_hash;
@ -665,9 +665,9 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
\c - - - :master_port \c - - - :master_port
SET search_path TO test_schema_support; SET search_path TO test_schema_support;
ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column; ALTER TABLE nation_hash DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
NOTICE: column "non_existent_column" of relation "nation_hash" does not exist, skipping
ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col; ALTER TABLE nation_hash DROP COLUMN IF EXISTS new_col;
-- verify column is dropped -- verify column is dropped
\d test_schema_support.nation_hash; \d test_schema_support.nation_hash;

View File

@ -140,6 +140,9 @@ SELECT * FROM mx_ref_table ORDER BY col_1;
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_ref_table; DROP TABLE mx_ref_table;
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
\c - - - :worker_1_port \c - - - :worker_1_port
-- DDL commands -- DDL commands
\d mx_table \d mx_table
@ -149,8 +152,10 @@ DROP TABLE mx_ref_table;
col_1 | integer | col_1 | integer |
col_2 | text | col_2 | text |
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
Indexes:
"mx_test_uniq_index" UNIQUE, btree (col_1)
CREATE INDEX mx_test_index ON mx_table(col_1); CREATE INDEX mx_test_index ON mx_table(col_2);
ERROR: operation is not allowed on this node ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again. HINT: Connect to the coordinator and run it again.
ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table ADD COLUMN col_4 int;
@ -166,6 +171,8 @@ HINT: Connect to the coordinator and run it again.
col_1 | integer | col_1 | integer |
col_2 | text | col_2 | text |
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
Indexes:
"mx_test_uniq_index" UNIQUE, btree (col_1)
-- master_modify_multiple_shards -- master_modify_multiple_shards
SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none'''); SELECT master_modify_multiple_shards('UPDATE mx_table SET col_2=''none''');
@ -223,6 +230,9 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
-- master_remove_node -- master_remove_node
\c - - - :master_port \c - - - :master_port
DROP INDEX mx_test_uniq_index;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
SELECT master_add_node('localhost', 5432); SELECT master_add_node('localhost', 5432);
master_add_node master_add_node
-------------------------------------------- --------------------------------------------

View File

@ -261,8 +261,7 @@ ALTER TABLE IF EXISTS non_existent_table ADD COLUMN new_column INTEGER;
NOTICE: relation "non_existent_table" does not exist, skipping NOTICE: relation "non_existent_table" does not exist, skipping
ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER; ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER;
ALTER TABLE lineitem_alter DROP COLUMN non_existent_column; ALTER TABLE lineitem_alter DROP COLUMN non_existent_column;
ERROR: column "non_existent_column" of relation "lineitem_alter_220000" does not exist ERROR: column "non_existent_column" of relation "lineitem_alter" does not exist
CONTEXT: while executing command on localhost:57638
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column; ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column;
NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2; ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2;
@ -360,13 +359,13 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT, ADD|DROP CON
-- types -- types
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type; ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;
ERROR: type "non_existent_type" does not exist ERROR: type "non_existent_type" does not exist
CONTEXT: while executing command on localhost:57638 LINE 1: ALTER TABLE lineitem_alter ADD COLUMN new_column non_existen...
^
ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL; ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL;
ERROR: column "null_column" contains null values ERROR: column "null_column" contains null values
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a'; ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a';
ERROR: invalid input syntax for integer: "a" ERROR: invalid input syntax for integer: "a"
CONTEXT: while executing command on localhost:57638
-- Verify that we error out on statements involving RENAME -- Verify that we error out on statements involving RENAME
ALTER TABLE lineitem_alter RENAME TO lineitem_renamed; ALTER TABLE lineitem_alter RENAME TO lineitem_renamed;
ERROR: renaming distributed tables or their objects is currently unsupported ERROR: renaming distributed tables or their objects is currently unsupported

View File

@ -154,7 +154,7 @@ ABORT;
-- applies to DDL -- applies to DDL
BEGIN; BEGIN;
INSERT INTO labs_mx VALUES (6, 'Bell labs_mx'); INSERT INTO labs_mx VALUES (6, 'Bell labs_mx');
ALTER TABLE labs_mx ADD COLUMN text motto; ALTER TABLE labs_mx ADD COLUMN motto text;
COMMIT; COMMIT;
-- doesn't apply to COPY after modifications -- doesn't apply to COPY after modifications

View File

@ -91,11 +91,12 @@ SELECT * FROM mx_ref_table ORDER BY col_1;
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_ref_table; DROP TABLE mx_ref_table;
CREATE UNIQUE INDEX mx_test_uniq_index ON mx_table(col_1);
\c - - - :worker_1_port \c - - - :worker_1_port
-- DDL commands -- DDL commands
\d mx_table \d mx_table
CREATE INDEX mx_test_index ON mx_table(col_1); CREATE INDEX mx_test_index ON mx_table(col_2);
ALTER TABLE mx_table ADD COLUMN col_4 int; ALTER TABLE mx_table ADD COLUMN col_4 int;
ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1); ALTER TABLE mx_table_2 ADD CONSTRAINT mx_fk_constraint FOREIGN KEY(col_1) REFERENCES mx_table(col_1);
\d mx_table \d mx_table
@ -122,6 +123,7 @@ SELECT * FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
-- master_remove_node -- master_remove_node
\c - - - :master_port \c - - - :master_port
DROP INDEX mx_test_uniq_index;
SELECT master_add_node('localhost', 5432); SELECT master_add_node('localhost', 5432);
\c - - - :worker_1_port \c - - - :worker_1_port