Experiment: create indexes on partitions in PreprocessIndexStmt

partitioned-index
naisila 2021-08-31 13:28:20 +03:00
parent ef2c6d51b2
commit 02991e7811
1 changed files with 680 additions and 0 deletions

View File

@ -10,6 +10,8 @@
#include "postgres.h" #include "postgres.h"
#include "access/amapi.h"
#include "access/reloptions.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
@ -17,7 +19,10 @@
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_am.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_opfamily.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
@ -40,12 +45,19 @@
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "parser/parse_oper.h"
#include "parser/parse_utilcmd.h"
#include "partitioning/partdesc.h"
#include "rewrite/rewriteManip.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/regproc.h"
#include "utils/syscache.h" #include "utils/syscache.h"
@ -72,6 +84,19 @@ static void RangeVarCallbackForReindexIndex(const RangeVar *rel, Oid relOid, Oid
void *arg); void *arg);
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
static void CreatePartitionIndexes(IndexStmt *stmt, Oid relationId, LOCKMODE lockMode);
static void ComputeIndexAttrs(IndexInfo *indexInfo,
Oid *typeOidP,
Oid *collationOidP,
Oid *classOidP,
int16 *colOptionP,
List *attList, /* list of IndexElem's */
List *exclusionOpNames,
Oid relId,
const char *accessMethodName,
Oid accessMethodId,
bool amcanorder,
bool isconstraint);
/* /*
@ -218,11 +243,666 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand,
*/ */
SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement); SwitchToSequentialAndLocalExecutionIfIndexNameTooLong(createIndexStatement);
/* if this table is partitioned table, and we're not using ONLY, create indexes on partitions */
if (PartitionedTable(relationId) && createIndexStatement->relation->inh &&
list_length(PartitionList(relationId)) > 0)
{
createIndexStatement = transformIndexStmt(relationId, createIndexStatement,
createIndexCommand);
CreatePartitionIndexes(createIndexStatement, relationId, lockMode);
}
DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand); DDLJob *ddlJob = GenerateCreateIndexDDLJob(createIndexStatement, createIndexCommand);
return list_make1(ddlJob); return list_make1(ddlJob);
} }
/*
* CreatePartitionIndexes
* Most of the code borrowed from Postgres
*/
static void
CreatePartitionIndexes(IndexStmt *stmt, Oid relationId, LOCKMODE lockMode)
{
Relation rel = table_open(relationId, lockMode);
/*
* look up the access method, verify it can handle the requested features
*/
char *accessMethodName = stmt->accessMethod;
HeapTuple tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
if (!HeapTupleIsValid(tuple))
{
/*
* Hack to provide more-or-less-transparent updating of old RTREE
* indexes to GiST: if RTREE is requested and not found, use GIST.
*/
if (strcmp(accessMethodName, "rtree") == 0)
{
ereport(NOTICE,
(errmsg(
"substituting access method \"gist\" for obsolete method \"rtree\"")));
accessMethodName = "gist";
tuple = SearchSysCache1(AMNAME, PointerGetDatum(accessMethodName));
}
if (!HeapTupleIsValid(tuple))
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("access method \"%s\" does not exist",
accessMethodName)));
}
}
Form_pg_am accessMethodForm = (Form_pg_am) GETSTRUCT(tuple);
Oid accessMethodId = accessMethodForm->oid;
IndexAmRoutine *amRoutine = GetIndexAmRoutine(accessMethodForm->amhandler);
bool amcanorder = amRoutine->amcanorder;
ReleaseSysCache(tuple);
/*
* Force non-concurrent build on temporary relations, even if CONCURRENTLY
* was requested. Other backends can't access a temporary relation, so
* there's no harm in grabbing a stronger lock, and a non-concurrent DROP
* is more efficient. Do this before any use of the concurrent option is
* done.
*/
bool concurrent = false;
if (stmt->concurrent && get_rel_persistence(relationId) != RELPERSISTENCE_TEMP)
{
concurrent = true;
}
/*
* count key attributes in index
*/
int numberOfKeyAttributes = list_length(stmt->indexParams);
/*
* Calculate the new list of index columns including both key columns and
* INCLUDE columns. Later we can determine which of these are key
* columns, and which are just part of the INCLUDE list by checking the
* list position. A list item in a position less than ii_NumIndexKeyAttrs
* is part of the key columns, and anything equal to and over is part of
* the INCLUDE columns.
*/
List *allIndexParams = list_concat_copy(stmt->indexParams,
stmt->indexIncludingParams);
int numberOfAttributes = list_length(allIndexParams);
IndexInfo *indexInfo = makeIndexInfo(numberOfAttributes,
numberOfKeyAttributes,
accessMethodId,
NIL, /* expressions, NIL for now */
make_ands_implicit((Expr *) stmt->whereClause),
stmt->unique,
!concurrent,
concurrent);
Oid *typeObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
Oid *collationObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
Oid *classObjectId = (Oid *) palloc(numberOfAttributes * sizeof(Oid));
int16 *coloptions = (int16 *) palloc(numberOfAttributes * sizeof(int16));
/* this function is non-export in PG, how to handle this? */
ComputeIndexAttrs(indexInfo,
typeObjectId, collationObjectId, classObjectId,
coloptions, allIndexParams,
stmt->excludeOpNames, relationId,
accessMethodName, accessMethodId,
amcanorder, stmt->isconstraint);
PartitionDesc partdesc;
partdesc = RelationGetPartitionDesc(rel, true);
if (partdesc->nparts > 0)
{
int nparts = partdesc->nparts;
Oid *part_oids = palloc(sizeof(Oid) * nparts);
TupleDesc parentDesc;
Oid *opfamOids;
memcpy(part_oids, partdesc->oids, sizeof(Oid) * nparts);
parentDesc = RelationGetDescr(rel);
opfamOids = palloc(sizeof(Oid) * numberOfKeyAttributes);
for (int i = 0; i < numberOfKeyAttributes; i++)
{
opfamOids[i] = get_opclass_family(classObjectId[i]);
}
/*
* For each partition, scan all existing indexes; if one matches
* our index definition and is not already attached to some other
* parent index, we don't need to build a new one.
*
* If none matches, build a new index by calling ourselves
* recursively with the same options (except for the index name).
*/
for (int i = 0; i < nparts; i++)
{
Oid childRelid = part_oids[i];
Relation childrel;
List *childidxs;
ListCell *cell;
AttrMap *attmap;
bool found = false;
childrel = table_open(childRelid, lockMode);
/*
* Don't try to create indexes on foreign tables, though. Skip
* those if a regular index, or fail if trying to create a
* constraint index.
*/
if (childrel->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
{
if (stmt->unique || stmt->primary)
{
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg(
"cannot create unique index on partitioned table \"%s\"",
RelationGetRelationName(rel)),
errdetail(
"Table \"%s\" contains partitions that are foreign tables.",
RelationGetRelationName(rel))));
}
table_close(childrel, lockMode);
continue;
}
childidxs = RelationGetIndexList(childrel);
attmap =
build_attrmap_by_name(RelationGetDescr(childrel),
parentDesc);
foreach(cell, childidxs) /*ONLY is not used here */
{
Oid cldidxid = lfirst_oid(cell);
Relation cldidx;
IndexInfo *cldIdxInfo;
/* this index is already partition of another one */
if (has_superclass(cldidxid))
{
continue;
}
cldidx = index_open(cldidxid, lockMode);
cldIdxInfo = BuildIndexInfo(cldidx);
if (CompareIndexInfo(cldIdxInfo, indexInfo,
cldidx->rd_indcollation,
collationObjectId,
cldidx->rd_opfamily,
opfamOids,
attmap))
{
found = true;
/* keep lock till commit */
index_close(cldidx, NoLock);
break;
}
index_close(cldidx, lockMode);
}
list_free(childidxs);
table_close(childrel, NoLock);
/*
* If no matching index was found, create our own.
*/
if (!found)
{
IndexStmt *childStmt = copyObject(stmt);
bool found_whole_row;
ListCell *lc;
/*
* We can't use the same index name for the child index,
* so clear idxname to let the recursive invocation choose
* a new name. Likewise, the existing target relation
* field is wrong, and if indexOid or oldNode are set,
* they mustn't be applied to the child either.
*/
childStmt->idxname = NULL;
childStmt->relation = NULL;
childStmt->indexOid = InvalidOid;
childStmt->oldNode = InvalidOid;
childStmt->oldCreateSubid = InvalidSubTransactionId;
childStmt->oldFirstRelfilenodeSubid = InvalidSubTransactionId;
/*
* Adjust any Vars (both in expressions and in the index's
* WHERE clause) to match the partition's column numbering
* in case it's different from the parent's.
*/
foreach(lc, childStmt->indexParams)
{
IndexElem *ielem = lfirst(lc);
/*
* If the index parameter is an expression, we must
* translate it to contain child Vars.
*/
if (ielem->expr)
{
ielem->expr =
map_variable_attnos((Node *) ielem->expr,
1, 0, attmap,
InvalidOid,
&found_whole_row);
if (found_whole_row)
{
elog(ERROR, "cannot convert whole-row table reference");
}
}
}
childStmt->whereClause =
map_variable_attnos(stmt->whereClause, 1, 0,
attmap,
InvalidOid, &found_whole_row);
if (found_whole_row)
{
elog(ERROR, "cannot convert whole-row table reference");
}
char *partitionNamespace = get_namespace_name(get_rel_namespace(
childRelid));
char *partitionName = get_rel_name(childRelid);
childStmt->relation = makeRangeVar(partitionNamespace, partitionName, -1);
childStmt->relation->inh = false;
const char *dummyString = "-";
/* since the command is an IndexStmt, a dummy command string works fine */
ProcessUtilityParseTree((Node *) childStmt, dummyString,
PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL);
}
free_attrmap(attmap);
}
}
table_close(rel, NoLock);
}
/*
* Compute per-index-column information, including indexed column numbers
* or index expressions, opclasses and their options. Note, all output vectors
* should be allocated for all columns, including "including" ones.
*
* ALL METHOD BORROWED FROM POSTGRES SINCE IT'S STATIC
*/
static void
ComputeIndexAttrs(IndexInfo *indexInfo,
Oid *typeOidP,
Oid *collationOidP,
Oid *classOidP,
int16 *colOptionP,
List *attList, /* list of IndexElem's */
List *exclusionOpNames,
Oid relId,
const char *accessMethodName,
Oid accessMethodId,
bool amcanorder,
bool isconstraint)
{
ListCell *nextExclOp;
ListCell *lc;
int attn;
int nkeycols = indexInfo->ii_NumIndexKeyAttrs;
/* Allocate space for exclusion operator info, if needed */
if (exclusionOpNames)
{
Assert(list_length(exclusionOpNames) == nkeycols);
indexInfo->ii_ExclusionOps = (Oid *) palloc(sizeof(Oid) * nkeycols);
indexInfo->ii_ExclusionProcs = (Oid *) palloc(sizeof(Oid) * nkeycols);
indexInfo->ii_ExclusionStrats = (uint16 *) palloc(sizeof(uint16) * nkeycols);
nextExclOp = list_head(exclusionOpNames);
}
else
{
nextExclOp = NULL;
}
/*
* process attributeList
*/
attn = 0;
foreach(lc, attList)
{
IndexElem *attribute = (IndexElem *) lfirst(lc);
Oid atttype;
Oid attcollation;
/*
* Process the column-or-expression to be indexed.
*/
if (attribute->name != NULL)
{
/* Simple index attribute */
HeapTuple atttuple;
Form_pg_attribute attform;
Assert(attribute->expr == NULL);
atttuple = SearchSysCacheAttName(relId, attribute->name);
if (!HeapTupleIsValid(atttuple))
{
/* difference in error message spellings is historical */
if (isconstraint)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" named in key does not exist",
attribute->name)));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
attribute->name)));
}
}
attform = (Form_pg_attribute) GETSTRUCT(atttuple);
indexInfo->ii_IndexAttrNumbers[attn] = attform->attnum;
atttype = attform->atttypid;
attcollation = attform->attcollation;
ReleaseSysCache(atttuple);
}
else
{
/* Index expression */
Node *expr = attribute->expr;
Assert(expr != NULL);
if (attn >= nkeycols)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("expressions are not supported in included columns")));
}
atttype = exprType(expr);
attcollation = exprCollation(expr);
/*
* Strip any top-level COLLATE clause. This ensures that we treat
* "x COLLATE y" and "(x COLLATE y)" alike.
*/
while (IsA(expr, CollateExpr))
{
expr = (Node *) ((CollateExpr *) expr)->arg;
}
if (IsA(expr, Var) &&
((Var *) expr)->varattno != InvalidAttrNumber)
{
/*
* User wrote "(column)" or "(column COLLATE something)".
* Treat it like simple attribute anyway.
*/
indexInfo->ii_IndexAttrNumbers[attn] = ((Var *) expr)->varattno;
}
else
{
indexInfo->ii_IndexAttrNumbers[attn] = 0; /* marks expression */
indexInfo->ii_Expressions = lappend(indexInfo->ii_Expressions,
expr);
/*
* transformExpr() should have already rejected subqueries,
* aggregates, and window functions, based on the EXPR_KIND_
* for an index expression.
*/
/*
* An expression using mutable functions is probably wrong,
* since if you aren't going to get the same result for the
* same data every time, it's not clear what the index entries
* mean at all.
*/
if (contain_mutable_functions((Node *) expression_planner((Expr *) expr)))
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg(
"functions in index expression must be marked IMMUTABLE")));
}
}
}
typeOidP[attn] = atttype;
/*
* Included columns have no collation, no opclass and no ordering
* options.
*/
if (attn >= nkeycols)
{
if (attribute->collation)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("including column does not support a collation")));
}
if (attribute->opclass)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("including column does not support an operator class")));
}
if (attribute->ordering != SORTBY_DEFAULT)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("including column does not support ASC/DESC options")));
}
if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg(
"including column does not support NULLS FIRST/LAST options")));
}
classOidP[attn] = InvalidOid;
colOptionP[attn] = 0;
collationOidP[attn] = InvalidOid;
attn++;
continue;
}
/*
* Apply collation override if any
*/
if (attribute->collation)
{
attcollation = get_collation_oid(attribute->collation, false);
}
/*
* Check we have a collation iff it's a collatable type. The only
* expected failures here are (1) COLLATE applied to a noncollatable
* type, or (2) index expression had an unresolved collation. But we
* might as well code this to be a complete consistency check.
*/
if (type_is_collatable(atttype))
{
if (!OidIsValid(attcollation))
{
ereport(ERROR,
(errcode(ERRCODE_INDETERMINATE_COLLATION),
errmsg(
"could not determine which collation to use for index expression"),
errhint(
"Use the COLLATE clause to set the collation explicitly.")));
}
}
else
{
if (OidIsValid(attcollation))
{
ereport(ERROR,
(errcode(ERRCODE_DATATYPE_MISMATCH),
errmsg("collations are not supported by type %s",
format_type_be(atttype))));
}
}
collationOidP[attn] = attcollation;
/*
* Identify the opclass to use.
*/
classOidP[attn] = ResolveOpClass(attribute->opclass,
atttype,
accessMethodName,
accessMethodId);
/*
* Identify the exclusion operator, if any.
*/
if (nextExclOp)
{
List *opname = (List *) lfirst(nextExclOp);
Oid opid;
Oid opfamily;
int strat;
/*
* Find the operator --- it must accept the column datatype
* without runtime coercion (but binary compatibility is OK)
*/
opid = compatible_oper_opid(opname, atttype, atttype, false);
/*
* Only allow commutative operators to be used in exclusion
* constraints. If X conflicts with Y, but Y does not conflict
* with X, bad things will happen.
*/
if (get_commutator(opid) != opid)
{
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("operator %s is not commutative",
format_operator(opid)),
errdetail(
"Only commutative operators can be used in exclusion constraints.")));
}
/*
* Operator must be a member of the right opfamily, too
*/
opfamily = get_opclass_family(classOidP[attn]);
strat = get_op_opfamily_strategy(opid, opfamily);
if (strat == 0)
{
HeapTuple opftuple;
Form_pg_opfamily opfform;
/*
* attribute->opclass might not explicitly name the opfamily,
* so fetch the name of the selected opfamily for use in the
* error message.
*/
opftuple = SearchSysCache1(OPFAMILYOID,
ObjectIdGetDatum(opfamily));
if (!HeapTupleIsValid(opftuple))
{
elog(ERROR, "cache lookup failed for opfamily %u",
opfamily);
}
opfform = (Form_pg_opfamily) GETSTRUCT(opftuple);
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("operator %s is not a member of operator family \"%s\"",
format_operator(opid),
NameStr(opfform->opfname)),
errdetail(
"The exclusion operator must be related to the index operator class for the constraint.")));
}
indexInfo->ii_ExclusionOps[attn] = opid;
indexInfo->ii_ExclusionProcs[attn] = get_opcode(opid);
indexInfo->ii_ExclusionStrats[attn] = strat;
nextExclOp = lnext(exclusionOpNames, nextExclOp);
}
/*
* Set up the per-column options (indoption field). For now, this is
* zero for any un-ordered index, while ordered indexes have DESC and
* NULLS FIRST/LAST options.
*/
colOptionP[attn] = 0;
if (amcanorder)
{
/* default ordering is ASC */
if (attribute->ordering == SORTBY_DESC)
{
colOptionP[attn] |= INDOPTION_DESC;
}
/* default null ordering is LAST for ASC, FIRST for DESC */
if (attribute->nulls_ordering == SORTBY_NULLS_DEFAULT)
{
if (attribute->ordering == SORTBY_DESC)
{
colOptionP[attn] |= INDOPTION_NULLS_FIRST;
}
}
else if (attribute->nulls_ordering == SORTBY_NULLS_FIRST)
{
colOptionP[attn] |= INDOPTION_NULLS_FIRST;
}
}
else
{
/* index AM does not support ordering */
if (attribute->ordering != SORTBY_DEFAULT)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("access method \"%s\" does not support ASC/DESC options",
accessMethodName)));
}
if (attribute->nulls_ordering != SORTBY_NULLS_DEFAULT)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"access method \"%s\" does not support NULLS FIRST/LAST options",
accessMethodName)));
}
}
/* Set up the per-column opclass options (attoptions field). */
if (attribute->opclassopts)
{
Assert(attn < nkeycols);
if (!indexInfo->ii_OpclassOptions)
{
indexInfo->ii_OpclassOptions =
palloc0(sizeof(Datum) * indexInfo->ii_NumIndexAttrs);
}
indexInfo->ii_OpclassOptions[attn] =
transformRelOptions((Datum) 0, attribute->opclassopts,
NULL, NULL, false, false);
}
attn++;
}
}
/* /*
* ErrorIfCreateIndexHasTooManyColumns errors out if given CREATE INDEX command * ErrorIfCreateIndexHasTooManyColumns errors out if given CREATE INDEX command
* would use more than INDEX_MAX_KEYS columns. * would use more than INDEX_MAX_KEYS columns.