Refactor ExecuteDistDDLCommand to expect struct

Will let us separate out the determination of what to execute from its
actual execution.
pull/1278/head
Jason Petersen 2017-03-11 13:21:28 -07:00
parent 2cb34406d1
commit a02a2a90c7
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
2 changed files with 47 additions and 70 deletions

View File

@ -132,11 +132,7 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
bool isTopLevel);
static void ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
const char *ddlCommandString,
bool isTopLevel);
static void ShowNoticeIfNotUsing2PC(void); static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString); static List * DDLTaskList(Oid relationId, const char *commandString);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
@ -673,7 +669,12 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
/* if index does not exist, send the command to workers */ /* if index does not exist, send the command to workers */
if (!OidIsValid(indexRelationId)) if (!OidIsValid(indexRelationId))
{ {
ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->commandString = createIndexCommand;
ddlJob->taskList = DDLTaskList(relationId, createIndexCommand);
ExecuteDistributedDDLJob(ddlJob);
} }
else if (!createIndexStatement->if_not_exists) else if (!createIndexStatement->if_not_exists)
{ {
@ -764,10 +765,16 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
if (OidIsValid(distributedIndexId)) if (OidIsValid(distributedIndexId))
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
/* if it is supported, go ahead and execute the command */ /* if it is supported, go ahead and execute the command */
ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); ddlJob->targetRelationId = distributedRelationId;
ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand);
ExecuteDistributedDDLJob(ddlJob);
} }
return (Node *) dropIndexStatement; return (Node *) dropIndexStatement;
@ -856,12 +863,22 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
if (rightRelationId) if (rightRelationId)
{ {
ExecuteDistributedForeignKeyCommand(leftRelationId, rightRelationId, DDLJob *ddlJob = palloc0(sizeof(DDLJob));
alterTableCommand, isTopLevel); ddlJob->targetRelationId = leftRelationId;
ddlJob->commandString = alterTableCommand;
ddlJob->taskList = ForeignKeyTaskList(leftRelationId, rightRelationId,
alterTableCommand);
ExecuteDistributedDDLJob(ddlJob);
} }
else else
{ {
ExecuteDistributedDDLCommand(leftRelationId, alterTableCommand, isTopLevel); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId;
ddlJob->commandString = alterTableCommand;
ddlJob->taskList = DDLTaskList(leftRelationId, alterTableCommand);
ExecuteDistributedDDLJob(ddlJob);
} }
return (Node *) alterTableStatement; return (Node *) alterTableStatement;
@ -1951,19 +1968,17 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
/* /*
* ExecuteDistributedDDLCommand applies a given DDL command to the given * ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed
* distributed table in a distributed transaction. If the multi shard commit protocol is * transaction, including metadata sync if needed. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be * in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to * used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by * each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function. * CompleteShardPlacementTransactions function.
*/ */
static void static void
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, ExecuteDistributedDDLJob(DDLJob *ddlJob)
bool isTopLevel)
{ {
List *taskList = NIL; bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
bool shouldSyncMetadata = ShouldSyncTableMetadata(relationId);
if (XactModificationLevel == XACT_MODIFICATION_DATA) if (XactModificationLevel == XACT_MODIFICATION_DATA)
{ {
@ -1979,60 +1994,10 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
if (shouldSyncMetadata) if (shouldSyncMetadata)
{ {
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION); SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString); SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
} }
taskList = DDLTaskList(relationId, ddlCommandString); ExecuteModifyTasksWithoutResults(ddlJob->taskList);
ExecuteModifyTasksWithoutResults(taskList);
}
/*
* ExecuteDistributedForeignKeyCommand applies a given foreign key command to the given
* distributed table in a distributed transaction. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by
* CompleteShardPlacementTransactions function.
*
* leftRelationId is the relation id of actual distributed table which given foreign key
* command is applied. rightRelationId is the relation id of distributed table which
* foreign key refers to.
*/
static void
ExecuteDistributedForeignKeyCommand(Oid leftRelationId, Oid rightRelationId,
const char *ddlCommandString, bool isTopLevel)
{
List *taskList = NIL;
bool shouldSyncMetadata = false;
if (XactModificationLevel == XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed DDL commands must not appear within "
"transaction blocks containing single-shard data "
"modifications")));
}
EnsureCoordinator();
ShowNoticeIfNotUsing2PC();
/*
* It is sufficient to check only one of the tables for metadata syncing on workers,
* since the colocation of two tables implies that either both or none of them have
* metadata on workers.
*/
shouldSyncMetadata = ShouldSyncTableMetadata(leftRelationId);
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlCommandString);
}
taskList = ForeignKeyTaskList(leftRelationId, rightRelationId, ddlCommandString);
ExecuteModifyTasksWithoutResults(taskList);
} }
@ -2409,6 +2374,7 @@ ReplicateGrantStmt(Node *parsetree)
ListCell *granteeCell = NULL; ListCell *granteeCell = NULL;
ListCell *objectCell = NULL; ListCell *objectCell = NULL;
bool isFirst = true; bool isFirst = true;
DDLJob *ddlJob = palloc(sizeof(DDLJob));
initStringInfo(&privsString); initStringInfo(&privsString);
initStringInfo(&granteesString); initStringInfo(&granteesString);
@ -2493,7 +2459,6 @@ ReplicateGrantStmt(Node *parsetree)
RangeVar *relvar = (RangeVar *) lfirst(objectCell); RangeVar *relvar = (RangeVar *) lfirst(objectCell);
Oid relOid = RangeVarGetRelid(relvar, NoLock, false); Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
const char *grantOption = ""; const char *grantOption = "";
bool isTopLevel = true;
if (!IsDistributedTable(relOid)) if (!IsDistributedTable(relOid))
{ {
@ -2526,7 +2491,12 @@ ReplicateGrantStmt(Node *parsetree)
granteesString.data); granteesString.data);
} }
ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); ddlJob->targetRelationId = relOid;
ddlJob->commandString = ddlString.data;
ddlJob->taskList = DDLTaskList(relOid, ddlString.data);
ExecuteDistributedDDLJob(ddlJob);
resetStringInfo(&ddlString); resetStringInfo(&ddlString);
} }
} }

View File

@ -14,6 +14,13 @@
extern bool EnableDDLPropagation; extern bool EnableDDLPropagation;
typedef struct DDLJob
{
Oid targetRelationId;
const char *commandString;
List *taskList;
} DDLJob;
extern void multi_ProcessUtility(Node *parsetree, const char *queryString, extern void multi_ProcessUtility(Node *parsetree, const char *queryString,
ProcessUtilityContext context, ParamListInfo params, ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag); DestReceiver *dest, char *completionTag);