mirror of https://github.com/citusdata/citus.git
Prototype of create/drop index via worker
parent
57d9cc1975
commit
914137ab4a
|
@ -151,6 +151,17 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
|
|||
* index statements.
|
||||
*/
|
||||
LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement);
|
||||
|
||||
/*
|
||||
* Acquire global lock to prevent concurrent writes or DDL. However,
|
||||
* we do not bother for CREATE INDEX CONCURRENTLY, since we'll have
|
||||
* to release the lock.
|
||||
*/
|
||||
if (!createIndexStatement->concurrent)
|
||||
{
|
||||
AcquireDistributedLockOnRelations(list_make1(relationRangeVar), lockMode, 0);
|
||||
}
|
||||
|
||||
Relation relation = table_openrv(relationRangeVar, lockMode);
|
||||
|
||||
/*
|
||||
|
@ -231,6 +242,10 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
|
|||
SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement);
|
||||
|
||||
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
|
||||
|
||||
/* we have taken appropriate locks */
|
||||
ddlJob->allowedOnWorkerNode = true;
|
||||
|
||||
return list_make1(ddlJob);
|
||||
}
|
||||
|
||||
|
@ -555,6 +570,17 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
|
|||
"concurrently");
|
||||
state.locked_table_oid = InvalidOid;
|
||||
|
||||
/*
|
||||
* Acquire global lock to prevent concurrent writes or DDL. However,
|
||||
* we do not bother for REINDEX CONCURRENTLY, since we'll have
|
||||
* to release the lock.
|
||||
*/
|
||||
if (!state.concurrent)
|
||||
{
|
||||
AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation),
|
||||
lockmode, 0);
|
||||
}
|
||||
|
||||
Oid indOid = RangeVarGetRelidExtended(reindexStatement->relation,
|
||||
lockmode, 0,
|
||||
RangeVarCallbackForReindexIndex,
|
||||
|
@ -564,6 +590,9 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
|
|||
}
|
||||
else
|
||||
{
|
||||
AcquireDistributedLockOnRelations(list_make1(reindexStatement->relation),
|
||||
lockmode, 0);
|
||||
|
||||
RangeVarGetRelidExtended(reindexStatement->relation, lockmode, 0,
|
||||
RangeVarCallbackOwnsTable, NULL);
|
||||
|
||||
|
@ -614,6 +643,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
|
|||
"concurrently");
|
||||
ddlJob->metadataSyncCommand = reindexCommand;
|
||||
ddlJob->taskList = CreateReindexTaskList(relationId, reindexStatement);
|
||||
ddlJob->allowedOnWorkerNode = true;
|
||||
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
}
|
||||
|
@ -662,6 +692,16 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
|
|||
lockmode = ShareUpdateExclusiveLock;
|
||||
}
|
||||
|
||||
/*
|
||||
* Acquire global lock to prevent concurrent writes or DDL. However,
|
||||
* we do not bother for DROP INDEX CONCURRENTLY, since we'll have
|
||||
* to release the lock.
|
||||
*/
|
||||
if (!dropIndexStatement->concurrent)
|
||||
{
|
||||
AcquireDistributedLockOnRelations(list_make1(rangeVar), lockmode, 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* The next few statements are based on RemoveRelations() in
|
||||
* commands/tablecmds.c in Postgres source.
|
||||
|
@ -718,6 +758,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
|
|||
ddlJob->metadataSyncCommand = dropIndexCommand;
|
||||
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
|
||||
dropIndexStatement);
|
||||
ddlJob->allowedOnWorkerNode = true;
|
||||
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
}
|
||||
|
@ -737,12 +778,6 @@ PostprocessIndexStmt(Node *node, const char *queryString)
|
|||
{
|
||||
IndexStmt *indexStmt = castNode(IndexStmt, node);
|
||||
|
||||
/* this logic only applies to the coordinator */
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* We make sure schema name is not null in the PreprocessIndexStmt
|
||||
*/
|
||||
|
@ -1265,7 +1300,6 @@ void
|
|||
MarkIndexValid(IndexStmt *indexStmt)
|
||||
{
|
||||
Assert(indexStmt->concurrent);
|
||||
Assert(IsCoordinator());
|
||||
|
||||
/*
|
||||
* We make sure schema name is not null in the PreprocessIndexStmt
|
||||
|
|
|
@ -315,7 +315,7 @@ PreprocessDropSequenceStmt(Node *node, const char *queryString,
|
|||
(void *) dropStmtSql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
|
||||
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
@ -350,7 +350,7 @@ PreprocessRenameSequenceStmt(Node *node, const char *queryString, ProcessUtility
|
|||
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
|
||||
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
@ -513,7 +513,7 @@ PreprocessAlterSequenceSchemaStmt(Node *node, const char *queryString,
|
|||
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
|
||||
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
@ -615,7 +615,7 @@ PreprocessAlterSequenceOwnerStmt(Node *node, const char *queryString,
|
|||
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
|
||||
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
@ -718,7 +718,7 @@ PreprocessGrantOnSequenceStmt(Node *node, const char *queryString,
|
|||
List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_METADATA_NODES, commands);
|
||||
return NodeDDLTaskList(OTHER_METADATA_NODES, commands);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1130,7 +1130,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
{
|
||||
bool shouldSyncMetadata = false;
|
||||
|
||||
if (!ddlJob->allowedOnWorkerNode)
|
||||
{
|
||||
EnsureCoordinator();
|
||||
}
|
||||
|
||||
ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;
|
||||
|
||||
|
|
|
@ -2216,7 +2216,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
columnIndex,
|
||||
value);
|
||||
|
||||
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
|
||||
List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES,
|
||||
ShareLock);
|
||||
|
||||
/* open connections in parallel */
|
||||
|
|
|
@ -148,24 +148,32 @@ SendCommandToWorkersWithMetadataViaSuperUser(const char *command)
|
|||
List *
|
||||
TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
|
||||
{
|
||||
List *workerNodeList = NIL;
|
||||
if (targetWorkerSet == ALL_SHARD_NODES || targetWorkerSet == METADATA_NODES)
|
||||
{
|
||||
workerNodeList = ActivePrimaryNodeList(lockMode);
|
||||
}
|
||||
else
|
||||
{
|
||||
workerNodeList = ActivePrimaryNonCoordinatorNodeList(lockMode);
|
||||
}
|
||||
List *workerNodeList = ActivePrimaryNodeList(lockMode);
|
||||
List *result = NIL;
|
||||
int localGroupId = GetLocalGroupId();
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || targetWorkerSet ==
|
||||
METADATA_NODES) &&
|
||||
if ((targetWorkerSet == OTHER_METADATA_NODES ||
|
||||
targetWorkerSet == METADATA_NODES) &&
|
||||
!workerNode->hasMetadata)
|
||||
{
|
||||
/* only interested in metadata nodes */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (targetWorkerSet == OTHER_METADATA_NODES &&
|
||||
workerNode->groupId == localGroupId)
|
||||
{
|
||||
/* only interested in other metadata nodes */
|
||||
continue;
|
||||
}
|
||||
|
||||
if (targetWorkerSet == NON_COORDINATOR_NODES &&
|
||||
workerNode->groupId == COORDINATOR_GROUP_ID)
|
||||
{
|
||||
/* only interested in worker nodes */
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -186,7 +194,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
|
|||
void
|
||||
SendBareCommandListToMetadataWorkers(List *commandList)
|
||||
{
|
||||
TargetWorkerSet targetWorkerSet = NON_COORDINATOR_METADATA_NODES;
|
||||
TargetWorkerSet targetWorkerSet = OTHER_METADATA_NODES;
|
||||
List *workerNodeList = TargetWorkerSetNodeList(targetWorkerSet, RowShareLock);
|
||||
char *nodeUser = CurrentUserName();
|
||||
|
||||
|
@ -227,12 +235,12 @@ SendCommandToMetadataWorkersParams(const char *command,
|
|||
const Oid *parameterTypes,
|
||||
const char *const *parameterValues)
|
||||
{
|
||||
List *workerNodeList = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
|
||||
List *workerNodeList = TargetWorkerSetNodeList(OTHER_METADATA_NODES,
|
||||
RowShareLock);
|
||||
|
||||
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
|
||||
|
||||
SendCommandToWorkersParamsInternal(NON_COORDINATOR_METADATA_NODES, command, user,
|
||||
SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user,
|
||||
parameterCount, parameterTypes,
|
||||
parameterValues);
|
||||
}
|
||||
|
|
|
@ -74,6 +74,9 @@ typedef struct DDLJob
|
|||
const char *metadataSyncCommand;
|
||||
|
||||
List *taskList; /* worker DDL tasks to execute */
|
||||
|
||||
/* whether the DDL can be executed from a worker */
|
||||
bool allowedOnWorkerNode;
|
||||
} DDLJob;
|
||||
|
||||
extern ProcessUtility_hook_type PrevProcessUtility;
|
||||
|
|
|
@ -23,10 +23,9 @@
|
|||
typedef enum TargetWorkerSet
|
||||
{
|
||||
/*
|
||||
* All the active primary nodes in the metadata which have metadata
|
||||
* except the coordinator
|
||||
* All other active primary nodes in the metadata which have metadata.
|
||||
*/
|
||||
NON_COORDINATOR_METADATA_NODES,
|
||||
OTHER_METADATA_NODES,
|
||||
|
||||
/*
|
||||
* All the active primary nodes in the metadata except the coordinator
|
||||
|
|
Loading…
Reference in New Issue