mirror of https://github.com/citusdata/citus.git
Refactor PreprocessIndexStmt (#4272)
parent
ba300dcad8
commit
46be63d76b
|
@ -34,6 +34,7 @@
|
|||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/relation_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "miscadmin.h"
|
||||
|
@ -46,13 +47,19 @@
|
|||
|
||||
|
||||
/* Local functions forward declarations for helper functions */
|
||||
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
|
||||
static void SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid
|
||||
namespaceId,
|
||||
static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
|
||||
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
|
||||
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
|
||||
static void SwitchToSequentialExecutionIfIndexNameTooLong(
|
||||
IndexStmt *createIndexStatement);
|
||||
static char * GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId,
|
||||
IndexStmt *createIndexStatement);
|
||||
|
||||
static char * GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement);
|
||||
static char * GenerateDefaultIndexName(IndexStmt *createIndexStatement);
|
||||
static List * GenerateIndexParameters(IndexStmt *createIndexStatement);
|
||||
static DDLJob * GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement,
|
||||
const char *createIndexCommand);
|
||||
static Oid CreateIndexStmtGetRelationId(IndexStmt *createIndexStatement);
|
||||
static LOCKMODE GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement);
|
||||
static List * CreateIndexTaskList(IndexStmt *indexStmt);
|
||||
static List * CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt);
|
||||
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
||||
void *arg);
|
||||
|
@ -120,79 +127,61 @@ List *
|
|||
PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
||||
{
|
||||
IndexStmt *createIndexStatement = castNode(IndexStmt, node);
|
||||
List *ddlJobs = NIL;
|
||||
|
||||
/*
|
||||
* We first check whether a distributed relation is affected. For that, we need to
|
||||
* open the relation. To prevent race conditions with later lookups, lock the table,
|
||||
* and modify the rangevar to include the schema.
|
||||
*/
|
||||
if (createIndexStatement->relation != NULL)
|
||||
RangeVar *relationRangeVar = createIndexStatement->relation;
|
||||
if (relationRangeVar == NULL)
|
||||
{
|
||||
LOCKMODE lockmode = ShareLock;
|
||||
MemoryContext relationContext = NULL;
|
||||
|
||||
/*
|
||||
* We don't support concurrently creating indexes for distributed
|
||||
* tables, but till this point, we don't know if it is a regular or a
|
||||
* distributed table.
|
||||
*/
|
||||
if (createIndexStatement->concurrent)
|
||||
{
|
||||
lockmode = ShareUpdateExclusiveLock;
|
||||
/* let's be on the safe side */
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* We first check whether a distributed relation is affected. For that,
|
||||
* we need to open the relation. To prevent race conditions with later
|
||||
* lookups, lock the table.
|
||||
*
|
||||
* XXX: Consider using RangeVarGetRelidExtended with a permission
|
||||
* checking callback. Right now we'll acquire the lock before having
|
||||
* checked permissions, and will only fail when executing the actual
|
||||
* index statements.
|
||||
*/
|
||||
Relation relation = table_openrv(createIndexStatement->relation, lockmode);
|
||||
Oid relationId = RelationGetRelid(relation);
|
||||
LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement);
|
||||
Relation relation = table_openrv(relationRangeVar, lockMode);
|
||||
|
||||
bool isCitusRelation = IsCitusTable(relationId);
|
||||
|
||||
if (createIndexStatement->relation->schemaname == NULL)
|
||||
{
|
||||
/*
|
||||
* Before we do any further processing, fix the schema name to make sure
|
||||
* that a (distributed) table with the same name does not appear on the
|
||||
* search path in front of the current schema. We do this even if the
|
||||
* search_path in front of the current schema. We do this even if the
|
||||
* table is not distributed, since a distributed table may appear on the
|
||||
* search path by the time postgres starts processing this statement.
|
||||
* search_path by the time postgres starts processing this command.
|
||||
*/
|
||||
char *namespaceName = get_namespace_name(RelationGetNamespace(relation));
|
||||
|
||||
if (relationRangeVar->schemaname == NULL)
|
||||
{
|
||||
/* ensure we copy string into proper context */
|
||||
relationContext = GetMemoryChunkContext(createIndexStatement->relation);
|
||||
createIndexStatement->relation->schemaname = MemoryContextStrdup(
|
||||
relationContext, namespaceName);
|
||||
MemoryContext relationContext = GetMemoryChunkContext(relationRangeVar);
|
||||
char *namespaceName = RelationGetNamespaceName(relation);
|
||||
relationRangeVar->schemaname = MemoryContextStrdup(relationContext,
|
||||
namespaceName);
|
||||
}
|
||||
|
||||
table_close(relation, NoLock);
|
||||
|
||||
if (isCitusRelation)
|
||||
Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
char *indexName = createIndexStatement->idxname;
|
||||
char *namespaceName = createIndexStatement->relation->schemaname;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
ErrorIfUnsupportedIndexStmt(createIndexStatement);
|
||||
|
||||
Oid namespaceId = get_namespace_oid(namespaceName, false);
|
||||
Oid indexRelationId = get_relname_relid(indexName, namespaceId);
|
||||
|
||||
/* if index does not exist, send the command to workers */
|
||||
if (!OidIsValid(indexRelationId))
|
||||
if (IndexAlreadyExists(createIndexStatement))
|
||||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
|
||||
ddlJob->startNewTransaction = createIndexStatement->concurrent;
|
||||
ddlJob->commandString = createIndexCommand;
|
||||
ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement);
|
||||
|
||||
ddlJobs = list_make1(ddlJob);
|
||||
/*
|
||||
* Let standard_processUtility to error out or skip if command has
|
||||
* IF NOT EXISTS.
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/*
|
||||
* Citus has the logic to truncate the long shard names to prevent
|
||||
|
@ -210,13 +199,52 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
|||
* the same, and thus forming a self-deadlock as these tables/
|
||||
* indexes are inserted into postgres' metadata tables, like pg_class.
|
||||
*/
|
||||
SwitchToSequentialExecutionIfIndexNameTooLong(relationId, namespaceId,
|
||||
createIndexStatement);
|
||||
}
|
||||
}
|
||||
}
|
||||
SwitchToSequentialExecutionIfIndexNameTooLong(createIndexStatement);
|
||||
|
||||
return ddlJobs;
|
||||
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
|
||||
return list_make1(ddlJob);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IndexAlreadyExists returns true if index to be created by given CREATE INDEX
|
||||
* command already exists.
|
||||
*/
|
||||
static bool
|
||||
IndexAlreadyExists(IndexStmt *createIndexStatement)
|
||||
{
|
||||
Oid indexRelationId = CreateIndexStmtGetIndexId(createIndexStatement);
|
||||
return OidIsValid(indexRelationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateIndexStmtGetIndexId returns OID of the index that given CREATE INDEX
|
||||
* command attempts to create if it's already created before. Otherwise, returns
|
||||
* InvalidOid.
|
||||
*/
|
||||
static Oid
|
||||
CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement)
|
||||
{
|
||||
char *indexName = createIndexStatement->idxname;
|
||||
Oid namespaceId = CreateIndexStmtGetSchemaId(createIndexStatement);
|
||||
Oid indexRelationId = get_relname_relid(indexName, namespaceId);
|
||||
return indexRelationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateIndexStmtGetSchemaId returns schemaId of the schema that given
|
||||
* CREATE INDEX command operates on.
|
||||
*/
|
||||
static Oid
|
||||
CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement)
|
||||
{
|
||||
RangeVar *relationRangeVar = createIndexStatement->relation;
|
||||
char *schemaName = relationRangeVar->schemaname;
|
||||
bool missingOk = false;
|
||||
Oid namespaceId = get_namespace_oid(schemaName, missingOk);
|
||||
return namespaceId;
|
||||
}
|
||||
|
||||
|
||||
|
@ -226,9 +254,9 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
|||
* sequential execution to prevent self-deadlocks.
|
||||
*/
|
||||
static void
|
||||
SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId,
|
||||
IndexStmt *createIndexStatement)
|
||||
SwitchToSequentialExecutionIfIndexNameTooLong(IndexStmt *createIndexStatement)
|
||||
{
|
||||
Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||
if (!PartitionedTable(relationId))
|
||||
{
|
||||
/* Citus already handles long names for regular tables */
|
||||
|
@ -244,10 +272,7 @@ SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId,
|
|||
return;
|
||||
}
|
||||
|
||||
char *indexName =
|
||||
GenerateLongestShardPartitionIndexName(relationId, namespaceId,
|
||||
createIndexStatement);
|
||||
|
||||
char *indexName = GenerateLongestShardPartitionIndexName(createIndexStatement);
|
||||
if (indexName &&
|
||||
strnlen(indexName, NAMEDATALEN) >= NAMEDATALEN - 1)
|
||||
{
|
||||
|
@ -282,9 +307,9 @@ SwitchToSequentialExecutionIfIndexNameTooLong(Oid relationId, Oid namespaceId,
|
|||
* possible index name.
|
||||
*/
|
||||
static char *
|
||||
GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId,
|
||||
IndexStmt *createIndexStatement)
|
||||
GenerateLongestShardPartitionIndexName(IndexStmt *createIndexStatement)
|
||||
{
|
||||
Oid relationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||
char *longestPartitionName = LongestPartitionName(relationId);
|
||||
if (longestPartitionName == NULL)
|
||||
{
|
||||
|
@ -294,34 +319,103 @@ GenerateLongestShardPartitionIndexName(Oid relationId, Oid namespaceId,
|
|||
|
||||
char *longestPartitionShardName = pstrdup(longestPartitionName);
|
||||
ShardInterval *shardInterval = LoadShardIntervalWithLongestShardName(relationId);
|
||||
|
||||
AppendShardIdToName(&longestPartitionShardName, shardInterval->shardId);
|
||||
|
||||
/*
|
||||
* The rest of the code is copy & paste from DefineIndex()
|
||||
* postgres/src/backend/commands/indexcmds.c
|
||||
IndexStmt *createLongestShardIndexStmt = copyObject(createIndexStatement);
|
||||
createLongestShardIndexStmt->relation->relname = longestPartitionShardName;
|
||||
|
||||
char *choosenIndexName = GenerateDefaultIndexName(createLongestShardIndexStmt);
|
||||
return choosenIndexName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateDefaultIndexName is a wrapper around postgres function ChooseIndexName
|
||||
* that generates default index name for the index to be created by given CREATE
|
||||
* INDEX statement as postgres would do.
|
||||
*
|
||||
* (See DefineIndex at postgres/src/backend/commands/indexcmds.c)
|
||||
*/
|
||||
|
||||
/*
|
||||
* Calculate the new list of index columns including both key columns and
|
||||
* INCLUDE columns. Later we can determine which of these are key
|
||||
* columns, and which are just part of the INCLUDE list by checking the
|
||||
* list position. A list item in a position less than ii_NumIndexKeyAttrs
|
||||
* is part of the key columns, and anything equal to and over is part of
|
||||
* the INCLUDE columns.
|
||||
*/
|
||||
List *allIndexParams =
|
||||
list_concat(list_copy(createIndexStatement->indexParams),
|
||||
list_copy(createIndexStatement->indexIncludingParams));
|
||||
|
||||
List *indexColNames = ChooseIndexColumnNames(allIndexParams);
|
||||
|
||||
char *choosenIndexName = ChooseIndexName(longestPartitionShardName, namespaceId,
|
||||
indexColNames,
|
||||
static char *
|
||||
GenerateDefaultIndexName(IndexStmt *createIndexStatement)
|
||||
{
|
||||
char *relationName = createIndexStatement->relation->relname;
|
||||
Oid namespaceId = CreateIndexStmtGetSchemaId(createIndexStatement);
|
||||
List *indexParams = GenerateIndexParameters(createIndexStatement);
|
||||
List *indexColNames = ChooseIndexColumnNames(indexParams);
|
||||
char *indexName = ChooseIndexName(relationName, namespaceId, indexColNames,
|
||||
createIndexStatement->excludeOpNames,
|
||||
createIndexStatement->primary,
|
||||
createIndexStatement->isconstraint);
|
||||
return choosenIndexName;
|
||||
|
||||
return indexName;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateIndexParameters is a helper function that creates a list of parameters
|
||||
* required to assign a default index name for the index to be created by given
|
||||
* CREATE INDEX command.
|
||||
*/
|
||||
static List *
|
||||
GenerateIndexParameters(IndexStmt *createIndexStatement)
|
||||
{
|
||||
List *indexParams = createIndexStatement->indexParams;
|
||||
List *indexIncludingParams = createIndexStatement->indexIncludingParams;
|
||||
List *allIndexParams = list_concat(list_copy(indexParams),
|
||||
list_copy(indexIncludingParams));
|
||||
return allIndexParams;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GenerateCreateIndexDDLJob returns DDLJob for given CREATE INDEX command.
|
||||
*/
|
||||
static DDLJob *
|
||||
GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
||||
{
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement);
|
||||
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
|
||||
ddlJob->startNewTransaction = createIndexStatement->concurrent;
|
||||
ddlJob->commandString = createIndexCommand;
|
||||
ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
|
||||
|
||||
return ddlJob;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateIndexStmtGetRelationId returns relationId for relation that given
|
||||
* CREATE INDEX command operates on.
|
||||
*/
|
||||
static Oid
|
||||
CreateIndexStmtGetRelationId(IndexStmt *createIndexStatement)
|
||||
{
|
||||
RangeVar *relationRangeVar = createIndexStatement->relation;
|
||||
LOCKMODE lockMode = GetCreateIndexRelationLockMode(createIndexStatement);
|
||||
bool missingOk = false;
|
||||
Oid relationId = RangeVarGetRelid(relationRangeVar, lockMode, missingOk);
|
||||
return relationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetCreateIndexRelationLockMode returns required lock mode to open the
|
||||
* relation that given CREATE INDEX command operates on.
|
||||
*/
|
||||
static LOCKMODE
|
||||
GetCreateIndexRelationLockMode(IndexStmt *createIndexStatement)
|
||||
{
|
||||
if (createIndexStatement->concurrent)
|
||||
{
|
||||
return ShareUpdateExclusiveLock;
|
||||
}
|
||||
else
|
||||
{
|
||||
return ShareLock;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -635,13 +729,13 @@ ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement)
|
|||
|
||||
|
||||
/*
|
||||
* CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command
|
||||
* against a specified distributed table.
|
||||
* CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command.
|
||||
*/
|
||||
static List *
|
||||
CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
|
||||
CreateIndexTaskList(IndexStmt *indexStmt)
|
||||
{
|
||||
List *taskList = NIL;
|
||||
Oid relationId = CreateIndexStmtGetRelationId(indexStmt);
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
StringInfoData ddlString;
|
||||
uint64 jobId = INVALID_JOB_ID;
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* relation_utils.c
|
||||
*
|
||||
* This file contains functions similar to rel.h to perform useful
|
||||
* operations on Relation objects.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "distributed/relation_utils.h"
|
||||
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
|
||||
|
||||
/*
|
||||
* RelationGetNamespaceName returns the relation's namespace name.
|
||||
*/
|
||||
char *
|
||||
RelationGetNamespaceName(Relation relation)
|
||||
{
|
||||
Oid namespaceId = RelationGetNamespace(relation);
|
||||
char *namespaceName = get_namespace_name(namespaceId);
|
||||
return namespaceName;
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* relation_utils.h
|
||||
* Utilities related to Relation objects.
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef RELATION_UTILS_H
|
||||
#define RELATION_UTILS_H
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "utils/relcache.h"
|
||||
|
||||
extern char * RelationGetNamespaceName(Relation relation);
|
||||
|
||||
#endif /* RELATION_UTILS_H */
|
Loading…
Reference in New Issue