Change DropStmt to generate worker DDL on master

Because we can't execute DROP INDEX CONCURRENTLY during transactions,
worker_apply_shard_ddl_command is insufficient.
pull/1287/head
Jason Petersen 2017-03-21 17:38:16 -06:00
parent 95d8d27c4f
commit 0b6c4e756e
No known key found for this signature in database
GPG Key ID: 9F1D3510D110ABA9
1 changed files with 59 additions and 1 deletions

View File

@ -135,6 +135,7 @@ static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static void ShowNoticeIfNotUsing2PC(void); static void ShowNoticeIfNotUsing2PC(void);
static List * DDLTaskList(Oid relationId, const char *commandString); static List * DDLTaskList(Oid relationId, const char *commandString);
static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt); static List * IndexTaskList(Oid relationId, IndexStmt *indexStmt);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId, static List * ForeignKeyTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString); const char *commandString);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
@ -772,7 +773,8 @@ PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
ddlJob->targetRelationId = distributedRelationId; ddlJob->targetRelationId = distributedRelationId;
ddlJob->commandString = dropIndexCommand; ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DDLTaskList(distributedRelationId, dropIndexCommand); ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJobs = list_make1(ddlJob); ddlJobs = list_make1(ddlJob);
} }
@ -2126,6 +2128,62 @@ IndexTaskList(Oid relationId, IndexStmt *indexStmt)
} }
/*
* DDLTaskList builds a list of tasks to execute a DDL command on a
* given list of shards.
*/
static List *
DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
char *indexName = get_rel_name(indexId);
Oid schemaId = get_rel_namespace(indexId);
char *schemaName = get_namespace_name(schemaId);
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
char *shardIndexName = pstrdup(indexName);
Task *task = NULL;
AppendShardIdToName(&shardIndexName, shardId);
appendStringInfo(&ddlString, "DROP INDEX %s %s %s %s",
(dropStmt->concurrent ? "CONCURRENTLY" : ""),
(dropStmt->missing_ok ? "IF EXISTS" : ""),
quote_qualified_identifier(schemaName, shardIndexName),
(dropStmt->behavior == DROP_RESTRICT ? "RESTRICT" : "CASCADE"));
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}
/* /*
* ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a * ForeignKeyTaskList builds a list of tasks to execute a foreign key command on a
* shards of given list of distributed table. * shards of given list of distributed table.