/*------------------------------------------------------------------------- * * create_distributed_relation.c * Routines relation to the creation of distributed relations. * * Copyright (c) 2012-2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "access/genam.h" #include "access/hash.h" #include "access/heapam.h" #include "access/htup.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_constraint.h" #if (PG_VERSION_NUM >= 90600) #include "catalog/pg_constraint_fn.h" #endif #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" #include "catalog/pg_opclass.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" #include "distributed/worker_transaction.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" #include "parser/parse_expr.h" #include "parser/parse_node.h" #include "parser/parse_relation.h" #include "parser/parser.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/inval.h" /* Replication model to use when creating distributed tables */ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ static void CreateReferenceTable(Oid relationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table); PG_FUNCTION_INFO_V1(create_reference_table); /* * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. * * Note that this udf is depreciated and cannot create colocated tables, so we * always use INVALID_COLOCATION_ID. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) { Oid distributedRelationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); EnsureCoordinator(); ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } /* * create_distributed_table gets a table name, distribution column, * distribution method and colocate_with option, then it creates a * distributed table. */ Datum create_distributed_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; EnsureCoordinator(); /* guard against a binary update without a function update */ if (PG_NARGS() >= 4) { colocateWithTableNameText = PG_GETARG_TEXT_P(3); colocateWithTableName = text_to_cstring(colocateWithTableNameText); } else { colocateWithTableName = "default"; } /* check if we try to colocate with hash distributed tables */ if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) { Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || distributionMethod != DISTRIBUTE_BY_HASH) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation"), errdetail("Currently, colocate_with option is only supported " "for hash distributed tables."))); } } /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { ConvertToDistributedTable(relationId, distributionColumnName, distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } /* use configuration values for shard count and shard replication factor */ CreateHashDistributedTable(relationId, distributionColumnName, colocateWithTableName, ShardCount, ShardReplicationFactor); if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); } PG_RETURN_VOID(); } /* * create_reference_table accepts a table and then it creates a distributed * table which has one shard and replication factor is set to * the worker count. */ Datum create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); CreateReferenceTable(relationId); PG_RETURN_VOID(); } /* * CreateReferenceTable creates a distributed table with the given relationId. The * created table has one shard and replication factor is set to the active worker * count. In fact, the above is the definition of a reference table in Citus. */ static void CreateReferenceTable(Oid relationId) { uint32 colocationId = INVALID_COLOCATION_ID; List *workerNodeList = WorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; EnsureCoordinator(); /* if there are no workers, error out */ if (replicationFactor == 0) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("cannot create reference table \"%s\"", relationName), errdetail("There are no active worker nodes."))); } colocationId = CreateReferenceTableColocationId(); /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE, colocationId); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); CreateTableMetadataOnWorkers(relationId); } /* * ConvertToDistributedTable converts the given regular PostgreSQL table into a * distributed table. First, it checks if the given table can be distributed, * then it creates related tuple in pg_dist_partition. * * XXX: We should perform more checks here to see if this table is fit for * partitioning. At a minimum, we should validate the following: (i) this node * runs as the master node, (ii) table does not make use of the inheritance * mechanism, (iii) table does not own columns that are sequences, and (iv) * table does not have collated columns. */ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, uint32 colocationId) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; char relationKind = 0; Var *distributionColumn = NULL; char replicationModel = REPLICATION_MODEL_INVALID; /* check global replication settings before continuing */ EnsureReplicationSettings(InvalidOid); /* distribute by none tables use 2PC replication; otherwise use GUC setting */ if (distributionMethod == DISTRIBUTE_BY_NONE) { replicationModel = REPLICATION_MODEL_2PC; } else { replicationModel = ReplicationModel; } /* * Lock target relation with an exclusive lock - there's no way to make * sense of this table until we've committed, and we don't want multiple * backends manipulating this relation. */ relation = relation_open(relationId, ExclusiveLock); relationDesc = RelationGetDescr(relation); relationName = RelationGetRelationName(relation); EnsureTableOwner(relationId); /* check that the relation is not already distributed */ if (IsDistributedTable(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("table \"%s\" is already distributed", relationName))); } /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ if (relationDesc->tdhasoid) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: %s", relationName), errdetail("Distributed relations must not specify the WITH " "(OIDS) option in their definitions."))); } /* verify target relation is either regular or foreign table */ relationKind = relation->rd_rel->relkind; if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot distribute relation: %s", relationName), errdetail("Distributed relations must be regular or " "foreign tables."))); } /* check that the relation does not contain any rows */ if (!LocalTableEmpty(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", relationName), errdetail("Relation \"%s\" contains data.", relationName), errhint("Empty your table before distributing it."))); } /* * Distribution column returns NULL for reference tables, * but it is not used below for reference tables. */ distributionColumn = BuildDistributionKeyFromColumnName(relation, distributionColumnName); /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) { Oid hashSupportFunction = SupportFunctionForColumn(distributionColumn, HASH_AM_OID, HASHPROC); if (hashSupportFunction == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg("could not identify a hash function for type %s", format_type_be(distributionColumn->vartype)), errdatatype(distributionColumn->vartype), errdetail("Partition column types must have a hash function " "defined to use hash partitioning."))); } } else if (distributionMethod == DISTRIBUTE_BY_RANGE) { Oid btreeSupportFunction = SupportFunctionForColumn(distributionColumn, BTREE_AM_OID, BTORDER_PROC); if (btreeSupportFunction == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg("could not identify a comparison function for type %s", format_type_be(distributionColumn->vartype)), errdatatype(distributionColumn->vartype), errdetail("Partition column types must have a comparison function " "defined to use range partitioning."))); } } ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn, colocationId); InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, colocationId, replicationModel); relation_close(relation, NoLock); /* * PostgreSQL supports truncate trigger for regular relations only. * Truncate on foreign tables is not supported. */ if (relationKind == RELKIND_RELATION) { CreateTruncateTrigger(relationId); } } /* * ErrorIfNotSupportedConstraint run checks related to unique index / exclude * constraints. * * The function skips the uniqeness checks for reference tables (i.e., distribution * method is 'none'). * * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned * tables, since currently there is no way of enforcing uniqueness for * overlapping shards. * * Similarly, do not allow such constraints if they do not include partition * column. This check is important for two reasons: * i. First, currently Citus does not enforce uniqueness constraint on multiple * shards. * ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed * with no further check for constraints. */ static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId) { char *relationName = NULL; List *indexOidList = NULL; ListCell *indexOidCell = NULL; /* * We first perform check for foreign constraints. It is important to do this check * before next check, because other types of constraints are allowed on reference * tables and we return early for those constraints thanks to next check. Therefore, * for reference tables, we first check for foreing constraints and if they are OK, * we do not error out for other types of constraints. */ ErrorIfNotSupportedForeignConstraint(relation, distributionMethod, distributionColumn, colocationId); /* * Citus supports any kind of uniqueness constraints for reference tables * given that they only consist of a single shard and we can simply rely on * Postgres. */ if (distributionMethod == DISTRIBUTE_BY_NONE) { return; } relationName = RelationGetRelationName(relation); indexOidList = RelationGetIndexList(relation); foreach(indexOidCell, indexOidList) { Oid indexOid = lfirst_oid(indexOidCell); Relation indexDesc = index_open(indexOid, RowExclusiveLock); IndexInfo *indexInfo = NULL; AttrNumber *attributeNumberArray = NULL; bool hasDistributionColumn = false; int attributeCount = 0; int attributeIndex = 0; /* extract index key information from the index's pg_index info */ indexInfo = BuildIndexInfo(indexDesc); /* only check unique indexes and exclusion constraints. */ if (indexInfo->ii_Unique == false && indexInfo->ii_ExclusionOps == NULL) { index_close(indexDesc, NoLock); continue; } /* * Citus cannot enforce uniqueness/exclusion constraints with overlapping shards. * Thus, emit a warning for unique indexes and exclusion constraints on * append partitioned tables. */ if (distributionMethod == DISTRIBUTE_BY_APPEND) { ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint", relationName), errdetail("UNIQUE constraints, EXCLUDE constraints, " "and PRIMARY KEYs on " "append-partitioned tables cannot be enforced."), errhint("Consider using hash partitioning."))); } attributeCount = indexInfo->ii_NumIndexAttrs; attributeNumberArray = indexInfo->ii_KeyAttrNumbers; for (attributeIndex = 0; attributeIndex < attributeCount; attributeIndex++) { AttrNumber attributeNumber = attributeNumberArray[attributeIndex]; bool uniqueConstraint = false; bool exclusionConstraintWithEquality = false; if (distributionColumn->varattno != attributeNumber) { continue; } uniqueConstraint = indexInfo->ii_Unique; exclusionConstraintWithEquality = (indexInfo->ii_ExclusionOps != NULL && OperatorImplementsEquality( indexInfo->ii_ExclusionOps[ attributeIndex])); if (uniqueConstraint || exclusionConstraintWithEquality) { hasDistributionColumn = true; break; } } if (!hasDistributionColumn) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: \"%s\"", relationName), errdetail("Distributed relations cannot have UNIQUE, " "EXCLUDE, or PRIMARY KEY constraints that do not " "include the partition column (with an equality " "operator if EXCLUDE)."))); } index_close(indexDesc, NoLock); } } /* * ErrorIfNotSupportedForeignConstraint runs checks related to foreign constraints and * errors out if it is not possible to create one of the foreign constraint in distributed * environment. * * To support foreign constraints, we require that; * - Referencing and referenced tables are hash distributed. * - Referencing and referenced tables are co-located. * - Foreign constraint is defined over distribution column. * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options * are not used. * - Replication factors of referencing and referenced table are 1. */ static void ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId) { Relation pgConstraint = NULL; SysScanDesc scanDescriptor = NULL; ScanKeyData scanKey[1]; int scanKeyCount = 1; HeapTuple heapTuple = NULL; Oid referencedTableId = InvalidOid; uint32 referencedTableColocationId = INVALID_COLOCATION_ID; Var *referencedTablePartitionColumn = NULL; Datum referencingColumnsDatum; Datum *referencingColumnArray; int referencingColumnCount = 0; Datum referencedColumnsDatum; Datum *referencedColumnArray; int referencedColumnCount = 0; bool isNull = false; int attrIdx = 0; bool foreignConstraintOnPartitionColumn = false; bool selfReferencingTable = false; pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, relation->rd_id); scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL, scanKeyCount, scanKey); heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); if (constraintForm->contype != CONSTRAINT_FOREIGN) { heapTuple = systable_getnext(scanDescriptor); continue; } referencedTableId = constraintForm->confrelid; selfReferencingTable = relation->rd_id == referencedTableId; /* * We do not support foreign keys for reference tables. Here we skip the second * part of check if the table is a self referencing table because; * - PartitionMethod only works for distributed tables and this table is not * distributed yet. * - Since referencing and referenced tables are same, it is OK to not checking * distribution method twice. */ if (distributionMethod == DISTRIBUTE_BY_NONE || (!selfReferencingTable && PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("Foreign key constraints are not allowed from or " "to reference tables."))); } /* * ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do * not want to set partition column to NULL or default value. */ if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL || constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("SET NULL or SET DEFAULT is not supported" " in ON DELETE operation."))); } /* * ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported. * Because we do not want to set partition column to NULL or default value. Also * cascading update operation would require re-partitioning. Updating partition * column value is not allowed anyway even outside of foreign key concept. */ if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL || constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT || constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("SET NULL, SET DEFAULT or CASCADE is not" " supported in ON UPDATE operation."))); } /* * Some checks are not meaningful if foreign key references the table itself. * Therefore we will skip those checks. */ if (!selfReferencingTable) { if (!IsDistributedTable(referencedTableId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot create foreign key constraint"), errdetail("Referenced table must be a distributed " "table."))); } /* to enforce foreign constraints, tables must be co-located */ referencedTableColocationId = TableColocationId(referencedTableId); if (relation->rd_id != referencedTableId && (colocationId == INVALID_COLOCATION_ID || colocationId != referencedTableColocationId)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("Foreign key constraint can only be created" " on co-located tables."))); } /* * Partition column must exist in both referencing and referenced side of the * foreign key constraint. They also must be in same ordinal. */ referencedTablePartitionColumn = PartitionKey(referencedTableId); } else { /* * Partition column must exist in both referencing and referenced side of the * foreign key constraint. They also must be in same ordinal. */ referencedTablePartitionColumn = distributionColumn; } /* * Column attributes are not available in Form_pg_constraint, therefore we need * to find them in the system catalog. After finding them, we iterate over column * attributes together because partition column must be at the same place in both * referencing and referenced side of the foreign key constraint */ referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple, Anum_pg_constraint_conkey, &isNull); referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple, Anum_pg_constraint_confkey, &isNull); deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true, 's', &referencingColumnArray, NULL, &referencingColumnCount); deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true, 's', &referencedColumnArray, NULL, &referencedColumnCount); Assert(referencingColumnCount == referencedColumnCount); for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx) { AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]); AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]); if (distributionColumn->varattno == referencingAttrNo && referencedTablePartitionColumn->varattno == referencedAttrNo) { foreignConstraintOnPartitionColumn = true; } } if (!foreignConstraintOnPartitionColumn) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("Partition column must exist both " "referencing and referenced side of the " "foreign constraint statement and it must " "be in the same ordinal in both sides."))); } /* * We do not allow to create foreign constraints if shard replication factor is * greater than 1. Because in our current design, multiple replicas may cause * locking problems and inconsistent shard contents. */ if (ShardReplicationFactor > 1 || (referencedTableId != relation->rd_id && !SingleReplicatedTable(referencedTableId))) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), errdetail("Citus Community Edition currently supports " "foreign key constraints only for " "\"citus.shard_replication_factor = 1\"."), errhint("Please change \"citus.shard_replication_factor to " "1\". To learn more about using foreign keys with " "other replication factors, please contact us at " "https://citusdata.com/about/contact_us."))); } heapTuple = systable_getnext(scanDescriptor); } /* clean up scan and close system catalog */ systable_endscan(scanDescriptor); heap_close(pgConstraint, AccessShareLock); } /* * LookupDistributionMethod maps the oids of citus.distribution_type enum * values to pg_dist_partition.partmethod values. * * The passed in oid has to belong to a value of citus.distribution_type. */ static char LookupDistributionMethod(Oid distributionMethodOid) { HeapTuple enumTuple = NULL; Form_pg_enum enumForm = NULL; char distributionMethod = 0; const char *enumLabel = NULL; enumTuple = SearchSysCache1(ENUMOID, ObjectIdGetDatum(distributionMethodOid)); if (!HeapTupleIsValid(enumTuple)) { ereport(ERROR, (errmsg("invalid internal value for enum: %u", distributionMethodOid))); } enumForm = (Form_pg_enum) GETSTRUCT(enumTuple); enumLabel = NameStr(enumForm->enumlabel); if (strncmp(enumLabel, "append", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_APPEND; } else if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_HASH; } else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_RANGE; } else { ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel))); } ReleaseSysCache(enumTuple); return distributionMethod; } /* * SupportFunctionForColumn locates a support function given a column, an access method, * and and id of a support function. This function returns InvalidOid if there is no * support function for the operator class family of the column, but if the data type * of the column has no default operator class whatsoever, this function errors out. */ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber) { Oid operatorFamilyId = InvalidOid; Oid supportFunctionOid = InvalidOid; Oid operatorClassInputType = InvalidOid; Oid columnOid = partitionColumn->vartype; Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId); /* currently only support using the default operator class */ if (operatorClassId == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("data type %s has no default operator class for specified" " partition method", format_type_be(columnOid)), errdatatype(columnOid), errdetail("Partition column types must have a default operator" " class defined."))); } operatorFamilyId = get_opclass_family(operatorClassId); operatorClassInputType = get_opclass_input_type(operatorClassId); supportFunctionOid = get_opfamily_proc(operatorFamilyId, operatorClassInputType, operatorClassInputType, supportFunctionNumber); return supportFunctionOid; } /* * LocalTableEmpty function checks whether given local table contains any row and * returns false if there is any data. This function is only for local tables and * should not be called for distributed tables. */ static bool LocalTableEmpty(Oid tableId) { Oid schemaId = get_rel_namespace(tableId); char *schemaName = get_namespace_name(schemaId); char *tableName = get_rel_name(tableId); char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); int spiConnectionResult = 0; int spiQueryResult = 0; StringInfo selectExistQueryString = makeStringInfo(); HeapTuple tuple = NULL; Datum hasDataDatum = 0; bool localTableEmpty = false; bool columnNull = false; bool readOnly = true; int rowId = 0; int attributeId = 1; AssertArg(!IsDistributedTable(tableId)); spiConnectionResult = SPI_connect(); if (spiConnectionResult != SPI_OK_CONNECT) { ereport(ERROR, (errmsg("could not connect to SPI manager"))); } appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName); spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0); if (spiQueryResult != SPI_OK_SELECT) { ereport(ERROR, (errmsg("execution was not successful \"%s\"", selectExistQueryString->data))); } /* we expect that SELECT EXISTS query will return single value in a single row */ Assert(SPI_processed == 1); tuple = SPI_tuptable->vals[rowId]; hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull); localTableEmpty = !DatumGetBool(hasDataDatum); SPI_finish(); return localTableEmpty; } /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. */ void CreateTruncateTrigger(Oid relationId) { CreateTrigStmt *trigger = NULL; StringInfo triggerName = makeStringInfo(); bool internal = true; appendStringInfo(triggerName, "truncate_trigger"); trigger = makeNode(CreateTrigStmt); trigger->trigname = triggerName->data; trigger->relation = NULL; trigger->funcname = SystemFuncName("citus_truncate_trigger"); trigger->args = NIL; trigger->row = false; trigger->timing = TRIGGER_TYPE_BEFORE; trigger->events = TRIGGER_TYPE_TRUNCATE; trigger->columns = NIL; trigger->whenClause = NULL; trigger->isconstraint = false; CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, internal); } /* * CreateHashDistributedTable creates a hash distributed table. */ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); /* * Get an exclusive lock on the colocation system catalog. Therefore, we * can be sure that there will no modifications on the colocation table * until this transaction is committed. */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); /* get distribution column data type */ distributionColumnType = ColumnType(relationId, distributionColumnName); if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { /* check for default colocation group */ colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); if (colocationId == INVALID_COLOCATION_ID) { colocationId = CreateColocationGroup(shardCount, replicationFactor, distributionColumnType); } else { sourceRelationId = ColocatedTableId(colocationId); } } else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) { colocationId = GetNextColocationId(); } else { /* get colocation group of the target table */ text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); sourceRelationId = ResolveRelationId(colocateWithTableNameText); colocationId = TableColocationId(sourceRelationId); } /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, colocationId); /* create shards */ if (sourceRelationId != InvalidOid) { /* first run checks */ CheckReplicationModel(sourceRelationId, relationId); CheckDistributionColumnType(sourceRelationId, relationId); CreateColocatedShards(relationId, sourceRelationId); } else { CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } /* * ColumnType returns the column type of the given column. */ static Oid ColumnType(Oid relationId, char *columnName) { AttrNumber columnIndex = get_attnum(relationId, columnName); Oid columnType = get_atttype(relationId, columnIndex); return columnType; } /* * Check that the current replication factor setting is compatible with the * replication model of relationId, if valid. If InvalidOid, check that the * global replication model setting instead. Errors out if an invalid state * is detected. */ void EnsureReplicationSettings(Oid relationId) { char replicationModel = (char) ReplicationModel; char *msgSuffix = "the streaming replication model"; char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; if (relationId != InvalidOid) { replicationModel = TableReplicationModel(relationId); msgSuffix = "tables which use the streaming replication model"; extraHint = ""; } if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("replication factors above one are incompatible with %s", msgSuffix), errhint("Try again after reducing \"citus.shard_replication_" "factor\" to one%s.", extraHint))); } }