/*------------------------------------------------------------------------- * * create_distributed_table.c * Routines relation to the creation of distributed relations. * * Copyright (c) Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" #include "access/heapam.h" #include "access/htup.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/pg_am.h" #include "catalog/pg_attrdef.h" #include "catalog/pg_attribute.h" #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" #include "catalog/pg_namespace.h" #include "catalog/pg_opclass.h" #include "catalog/pg_proc.h" #include "catalog/pg_trigger.h" #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" #include "commands/sequence.h" #include "commands/tablecmds.h" #include "commands/trigger.h" #include "executor/executor.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" #include "parser/parse_expr.h" #include "parser/parse_node.h" #include "parser/parse_relation.h" #include "parser/parser.h" #include "postmaster/postmaster.h" #include "storage/lmgr.h" #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "pg_version_constants.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/distributed_execution_locks.h" #include "distributed/distribution_column.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata_utility.h" #include "distributed/multi_executor.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/replicate_none_dist_table_shard.h" #include "distributed/resource_lock.h" #include "distributed/shard_cleaner.h" #include "distributed/shard_rebalancer.h" #include "distributed/shard_split.h" #include "distributed/shard_transfer.h" #include "distributed/shared_library_init.h" #include "distributed/utils/distribution_column_map.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" /* common params that apply to all Citus table types */ typedef struct { char distributionMethod; char replicationModel; } CitusTableParams; /* * Params that only apply to distributed tables, i.e., the ones that are * known as DISTRIBUTED_TABLE by Citus metadata. */ typedef struct { int shardCount; bool shardCountIsStrict; char *distributionColumnName; ColocationParam colocationParam; } DistributedTableParams; /* * once every LOG_PER_TUPLE_AMOUNT, the copy will be logged. */ #define LOG_PER_TUPLE_AMOUNT 1000000 /* local function forward declarations */ static void CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, char distributionMethod, char *colocateWithTableName, int shardCount, bool shardCountIsStrict); static char DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName); static List * HashSplitPointsForShardList(List *shardList); static List * HashSplitPointsForShardCount(int shardCount); static List * WorkerNodesForShardList(List *shardList); static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength); static CitusTableParams DecideCitusTableParams(CitusTableType tableType, DistributedTableParams * distributedTableParams); static void CreateCitusTable(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams); static void ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, DistributedTableParams * distributedTableParams); static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId); static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams, Var *distributionColumn); static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, char distributionMethod, uint32 colocationId, char replicationModel); static void EnsureLocalTableEmpty(Oid relationId); static void EnsureRelationHasNoTriggers(Oid relationId); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod); static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod); static void EnsureCitusTableCanBeCreated(Oid relationOid); static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId); static void EnsureDistributedSequencesHaveOneType(Oid relationId, List *seqInfoList); static void CopyLocalDataIntoShards(Oid distributedTableId); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); static bool DistributionColumnUsesNumericColumnNegativeScale(TupleDesc relationDesc, Var *distributionColumn); static int numeric_typmod_scale(int32 typmod); static bool is_valid_numeric_typmod(int32 typmod); static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); static uint64 DoCopyFromLocalTableIntoShards(Relation distributedRelation, DestReceiver *copyDest, TupleTableSlot *slot, EState *estate); static void ErrorIfTemporaryTable(Oid relationId); static void ErrorIfForeignTable(Oid relationOid); static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId); static void EnsureDistributableTable(Oid relationId); static void EnsureForeignKeysForDistributedTableConcurrently(Oid relationId); static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod, char *distributionColumnName, char *colocateWithTableName); static void WarnIfTableHaveNoReplicaIdentity(Oid relationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table_concurrently); PG_FUNCTION_INFO_V1(create_distributed_table); PG_FUNCTION_INFO_V1(create_reference_table); /* * master_create_distributed_table is a deprecated predecessor to * create_distributed_table */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) { ereport(ERROR, (errmsg("master_create_distributed_table has been deprecated"))); } /* * create_distributed_table gets a table name, distribution column, * distribution method and colocate_with option, then it creates a * distributed table. */ Datum create_distributed_table(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); if (PG_ARGISNULL(0) || PG_ARGISNULL(3)) { PG_RETURN_VOID(); } Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_ARGISNULL(1) ? NULL : PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); bool shardCountIsStrict = false; if (distributionColumnText) { if (PG_ARGISNULL(2)) { PG_RETURN_VOID(); } int shardCount = ShardCount; if (!PG_ARGISNULL(4)) { if (!IsColocateWithDefault(colocateWithTableName) && !IsColocateWithNone(colocateWithTableName)) { ereport(ERROR, (errmsg("Cannot use colocate_with with a table " "and shard_count at the same time"))); } shardCount = PG_GETARG_INT32(4); /* * If shard_count parameter is given, then we have to * make sure table has that many shards. */ shardCountIsStrict = true; } char *distributionColumnName = text_to_cstring(distributionColumnText); Assert(distributionColumnName != NULL); char distributionMethod = LookupDistributionMethod(distributionMethodOid); if (shardCount < 1 || shardCount > MAX_SHARD_COUNT) { ereport(ERROR, (errmsg("%d is outside the valid range for " "parameter \"shard_count\" (1 .. %d)", shardCount, MAX_SHARD_COUNT))); } CreateDistributedTable(relationId, distributionColumnName, distributionMethod, shardCount, shardCountIsStrict, colocateWithTableName); } else { if (!PG_ARGISNULL(4)) { ereport(ERROR, (errmsg("shard_count can't be specified when the " "distribution column is null because in " "that case it's automatically set to 1"))); } if (!PG_ARGISNULL(2) && LookupDistributionMethod(PG_GETARG_OID(2)) != DISTRIBUTE_BY_HASH) { /* * As we do for shard_count parameter, we could throw an error if * distribution_type is not NULL when creating a single-shard table. * However, this requires changing the default value of distribution_type * parameter to NULL and this would mean a breaking change for most * users because they're mostly using this API to create sharded * tables. For this reason, here we instead do nothing if the distribution * method is DISTRIBUTE_BY_HASH. */ ereport(ERROR, (errmsg("distribution_type can't be specified " "when the distribution column is null "))); } ColocationParam colocationParam = { .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = colocateWithTableName, }; CreateSingleShardTable(relationId, colocationParam); } PG_RETURN_VOID(); } /* * create_distributed_concurrently gets a table name, distribution column, * distribution method and colocate_with option, then it creates a * distributed table. */ Datum create_distributed_table_concurrently(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); if (PG_ARGISNULL(0) || PG_ARGISNULL(2) || PG_ARGISNULL(3)) { PG_RETURN_VOID(); } if (PG_ARGISNULL(1)) { ereport(ERROR, (errmsg("cannot use create_distributed_table_concurrently " "to create a distributed table with a null shard " "key, consider using create_distributed_table()"))); } Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); char *distributionColumnName = text_to_cstring(distributionColumnText); Oid distributionMethodOid = PG_GETARG_OID(2); char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); bool shardCountIsStrict = false; int shardCount = ShardCount; if (!PG_ARGISNULL(4)) { if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) { ereport(ERROR, (errmsg("Cannot use colocate_with with a table " "and shard_count at the same time"))); } shardCount = PG_GETARG_INT32(4); /* * if shard_count parameter is given than we have to * make sure table has that many shards */ shardCountIsStrict = true; } CreateDistributedTableConcurrently(relationId, distributionColumnName, distributionMethod, colocateWithTableName, shardCount, shardCountIsStrict); PG_RETURN_VOID(); } /* * CreateDistributedTableConcurrently distributes a table by first converting * it to a Citus local table and then splitting the shard of the Citus local * table. * * If anything goes wrong during the second phase, the table is left as a * Citus local table. */ static void CreateDistributedTableConcurrently(Oid relationId, char *distributionColumnName, char distributionMethod, char *colocateWithTableName, int shardCount, bool shardCountIsStrict) { /* * We disallow create_distributed_table_concurrently in transaction blocks * because we cannot handle preceding writes, and we block writes at the * very end of the operation so the transaction should end immediately after. */ PreventInTransactionBlock(true, "create_distributed_table_concurrently"); /* * do not allow multiple create_distributed_table_concurrently in the same * transaction. We should do that check just here because concurrent local table * conversion can cause issues. */ ErrorIfMultipleNonblockingMoveSplitInTheSameTransaction(); /* do not allow concurrent CreateDistributedTableConcurrently operations */ AcquireCreateDistributedTableConcurrentlyLock(relationId); if (distributionMethod != DISTRIBUTE_BY_HASH) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("only hash-distributed tables can be distributed " "without blocking writes"))); } if (ShardReplicationFactor > 1) { ereport(ERROR, (errmsg("cannot distribute a table concurrently when " "citus.shard_replication_factor > 1"))); } DropOrphanedResourcesInSeparateTransaction(); EnsureCitusTableCanBeCreated(relationId); EnsureValidDistributionColumn(relationId, distributionColumnName); /* * Ensure table type is valid to be distributed. It should be either regular or citus local table. */ EnsureDistributableTable(relationId); /* * we rely on citus_add_local_table_to_metadata, so it can generate irrelevant messages. * we want to error with a user friendly message if foreign keys are not supported. * We can miss foreign key violations because we are not holding locks, so relation * can be modified until we acquire the lock for the relation, but we do as much as we can * to be user friendly on foreign key violation messages. */ EnsureForeignKeysForDistributedTableConcurrently(relationId); char replicationModel = DecideDistTableReplicationModel(distributionMethod, colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with * given table. We should make those checks after local table conversion by acquiring locks to * the relation because the distribution column can be modified in that period. */ if (!IsColocateWithDefault(colocateWithTableName) && !IsColocateWithNone( colocateWithTableName)) { if (replicationModel != REPLICATION_MODEL_STREAMING) { ereport(ERROR, (errmsg("cannot create distributed table " "concurrently because Citus allows " "concurrent table distribution only when " "citus.shard_replication_factor = 1"), errhint("table %s is requested to be colocated " "with %s which has " "citus.shard_replication_factor > 1", get_rel_name(relationId), colocateWithTableName))); } EnsureColocateWithTableIsValid(relationId, distributionMethod, distributionColumnName, colocateWithTableName); } /* * Get name of the table before possibly replacing it in * citus_add_local_table_to_metadata. */ char *tableName = get_rel_name(relationId); Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); RangeVar *rangeVar = makeRangeVar(schemaName, tableName, -1); /* If table is a regular table, then we need to add it into metadata. */ if (!IsCitusTable(relationId)) { /* * Before taking locks, convert the table into a Citus local table and commit * to allow shard split to see the shard. */ SendAddLocalTableToMetadataCommandOutsideTransaction(relationId); } /* * Lock target relation with a shard update exclusive lock to * block DDL, but not writes. * * If there was a concurrent drop/rename, error out by setting missingOK = false. */ bool missingOK = false; relationId = RangeVarGetRelid(rangeVar, ShareUpdateExclusiveLock, missingOK); if (PartitionedTableNoLock(relationId)) { /* also lock partitions */ LockPartitionRelations(relationId, ShareUpdateExclusiveLock); } WarnIfTableHaveNoReplicaIdentity(relationId); List *shardList = LoadShardIntervalList(relationId); /* * It's technically possible for the table to have been concurrently * distributed just after citus_add_local_table_to_metadata and just * before acquiring the lock, so double check. */ if (list_length(shardList) != 1 || !IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("table was concurrently modified"))); } /* * The table currently has one shard, we will split that shard to match the * target distribution. */ ShardInterval *shardToSplit = (ShardInterval *) linitial(shardList); PropagatePrerequisiteObjectsForDistributedTable(relationId); /* * we should re-evaluate distribution column values. It may have changed, * because we did not lock the relation at the previous check before local * table conversion. */ Var *distributionColumn = BuildDistributionKeyFromColumnName(relationId, distributionColumnName, NoLock); Oid distributionColumnType = distributionColumn->vartype; Oid distributionColumnCollation = distributionColumn->varcollid; /* get an advisory lock to serialize concurrent default group creations */ if (IsColocateWithDefault(colocateWithTableName)) { AcquireColocationDefaultLock(); } /* * At this stage, we only want to check for an existing co-location group. * We cannot create a new co-location group until after replication slot * creation in NonBlockingShardSplit. */ uint32 colocationId = FindColocateWithColocationId(relationId, replicationModel, distributionColumnType, distributionColumnCollation, shardCount, shardCountIsStrict, colocateWithTableName); if (IsColocateWithDefault(colocateWithTableName) && (colocationId != INVALID_COLOCATION_ID)) { /* * we can release advisory lock if there is already a default entry for given params; * else, we should keep it to prevent different default coloc entry creation by * concurrent operations. */ ReleaseColocationDefaultLock(); } EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, colocationId, replicationModel); Oid colocatedTableId = InvalidOid; if (colocationId != INVALID_COLOCATION_ID) { colocatedTableId = ColocatedTableId(colocationId); } List *workerNodeList = DistributedTablePlacementNodeList(NoLock); if (workerNodeList == NIL) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("no worker nodes are available for placing shards"), errhint("Add more worker nodes."))); } List *workersForPlacementList; List *shardSplitPointsList; if (colocatedTableId != InvalidOid) { List *colocatedShardList = LoadShardIntervalList(colocatedTableId); /* * Match the shard ranges of an existing table. */ shardSplitPointsList = HashSplitPointsForShardList(colocatedShardList); /* * Find the node IDs of the shard placements. */ workersForPlacementList = WorkerNodesForShardList(colocatedShardList); } else { /* * Generate a new set of #shardCount shards. */ shardSplitPointsList = HashSplitPointsForShardCount(shardCount); /* * Place shards in a round-robin fashion across all data nodes. */ workersForPlacementList = RoundRobinWorkerNodeList(workerNodeList, shardCount); } /* * Make sure that existing reference tables have been replicated to all the nodes * such that we can create foreign keys and joins work immediately after creation. * We do this after applying all essential checks to error out early in case of * user error. * * Use force_logical since this function is meant to not block writes. */ EnsureReferenceTablesExistOnAllNodesExtended(TRANSFER_MODE_FORCE_LOGICAL); /* * At this point, the table is a Citus local table, which means it does * not have a partition column in the metadata. However, we cannot update * the metadata here because that would prevent us from creating a replication * slot to copy ongoing changes. Instead, we pass a hash that maps relation * IDs to partition column vars. */ DistributionColumnMap *distributionColumnOverrides = CreateDistributionColumnMap(); AddDistributionColumnForRelation(distributionColumnOverrides, relationId, distributionColumnName); /* * there is no colocation entries yet for local table, so we should * check if table has any partition and add them to same colocation * group */ List *sourceColocatedShardIntervalList = ListShardsUnderParentRelation(relationId); SplitMode splitMode = NON_BLOCKING_SPLIT; SplitOperation splitOperation = CREATE_DISTRIBUTED_TABLE; SplitShard( splitMode, splitOperation, shardToSplit->shardId, shardSplitPointsList, workersForPlacementList, distributionColumnOverrides, sourceColocatedShardIntervalList, colocationId ); } /* * EnsureForeignKeysForDistributedTableConcurrently ensures that referenced and referencing foreign * keys for the given table are supported. * * We allow distributed -> reference * distributed -> citus local * * We disallow reference -> distributed * citus local -> distributed * regular -> distributed * * Normally regular -> distributed is allowed but it is not allowed when we create the * distributed table concurrently because we rely on conversion of regular table to citus local table, * which errors with an unfriendly message. */ static void EnsureForeignKeysForDistributedTableConcurrently(Oid relationId) { /* * disallow citus local -> distributed fkeys. * disallow reference -> distributed fkeys. * disallow regular -> distributed fkeys. */ EnsureNoFKeyFromTableType(relationId, INCLUDE_CITUS_LOCAL_TABLES | INCLUDE_REFERENCE_TABLES | INCLUDE_LOCAL_TABLES); /* * disallow distributed -> regular fkeys. */ EnsureNoFKeyToTableType(relationId, INCLUDE_LOCAL_TABLES); } /* * EnsureColocateWithTableIsValid ensures given relation can be colocated with the table of given name. */ static void EnsureColocateWithTableIsValid(Oid relationId, char distributionMethod, char *distributionColumnName, char *colocateWithTableName) { char replicationModel = DecideDistTableReplicationModel(distributionMethod, colocateWithTableName); /* * we fail transaction before local table conversion if the table could not be colocated with * given table. We should make those checks after local table conversion by acquiring locks to * the relation because the distribution column can be modified in that period. */ Oid distributionColumnType = ColumnTypeIdForRelationColumnName(relationId, distributionColumnName); text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); Oid colocateWithTableId = ResolveRelationId(colocateWithTableNameText, false); EnsureTableCanBeColocatedWith(relationId, replicationModel, distributionColumnType, colocateWithTableId); } /* * AcquireCreateDistributedTableConcurrentlyLock does not allow concurrent create_distributed_table_concurrently * operations. */ void AcquireCreateDistributedTableConcurrentlyLock(Oid relationId) { LOCKTAG tag; const bool sessionLock = false; const bool dontWait = true; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_CREATE_DISTRIBUTED_TABLE_CONCURRENTLY); LockAcquireResult lockAcquired = LockAcquire(&tag, ExclusiveLock, sessionLock, dontWait); if (!lockAcquired) { ereport(ERROR, (errmsg("another create_distributed_table_concurrently " "operation is in progress"), errhint("Make sure that the concurrent operation has " "finished and re-run the command"))); } } /* * SendAddLocalTableToMetadataCommandOutsideTransaction executes metadata add local * table command locally to avoid deadlock. */ static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId) { char *qualifiedRelationName = generate_qualified_relation_name(relationId); /* * we need to allow nested distributed execution, because we start a new distributed * execution inside the pushed-down UDF citus_add_local_table_to_metadata. Normally * citus does not allow that because it cannot guarantee correctness. */ StringInfo allowNestedDistributionCommand = makeStringInfo(); appendStringInfo(allowNestedDistributionCommand, "SET LOCAL citus.allow_nested_distributed_execution to ON"); StringInfo addLocalTableToMetadataCommand = makeStringInfo(); appendStringInfo(addLocalTableToMetadataCommand, "SELECT pg_catalog.citus_add_local_table_to_metadata(%s)", quote_literal_cstr(qualifiedRelationName)); List *commands = list_make2(allowNestedDistributionCommand->data, addLocalTableToMetadataCommand->data); char *username = NULL; SendCommandListToWorkerOutsideTransaction(LocalHostName, PostPortNumber, username, commands); } /* * WarnIfTableHaveNoReplicaIdentity notices user if the given table or its partitions (if any) * do not have a replica identity which is required for logical replication to replicate * UPDATE and DELETE commands during create_distributed_table_concurrently. */ void WarnIfTableHaveNoReplicaIdentity(Oid relationId) { bool foundRelationWithNoReplicaIdentity = false; /* * Check for source relation's partitions if any. We do not need to check for the source relation * because we can replicate partitioned table even if it does not have replica identity. * Source table will have no data if it has partitions. */ if (PartitionedTable(relationId)) { List *partitionList = PartitionList(relationId); ListCell *partitionCell = NULL; foreach(partitionCell, partitionList) { Oid partitionTableId = lfirst_oid(partitionCell); if (!RelationCanPublishAllModifications(partitionTableId)) { foundRelationWithNoReplicaIdentity = true; break; } } } /* check for source relation if it is not partitioned */ else { if (!RelationCanPublishAllModifications(relationId)) { foundRelationWithNoReplicaIdentity = true; } } if (foundRelationWithNoReplicaIdentity) { char *relationName = get_rel_name(relationId); ereport(NOTICE, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("relation %s does not have a REPLICA " "IDENTITY or PRIMARY KEY", relationName), errdetail("UPDATE and DELETE commands on the relation will " "error out during create_distributed_table_concurrently unless " "there is a REPLICA IDENTITY or PRIMARY KEY. " "INSERT commands will still work."))); } } /* * HashSplitPointsForShardList returns a list of split points which match * the shard ranges of the given list of shards; */ static List * HashSplitPointsForShardList(List *shardList) { List *splitPointList = NIL; ShardInterval *shardInterval = NULL; foreach_declared_ptr(shardInterval, shardList) { int32 shardMaxValue = DatumGetInt32(shardInterval->maxValue); splitPointList = lappend_int(splitPointList, shardMaxValue); } /* * Split point lists only include the upper boundaries. */ splitPointList = list_delete_last(splitPointList); return splitPointList; } /* * HashSplitPointsForShardCount returns a list of split points for a given * shard count with roughly equal hash ranges. */ static List * HashSplitPointsForShardCount(int shardCount) { List *splitPointList = NIL; /* calculate the split of the hash space */ uint64 hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; /* * Split points lists only include the upper boundaries, so we only * go up to shardCount - 1 and do not have to apply the correction * for the last shardmaxvalue. */ for (int64 shardIndex = 0; shardIndex < shardCount - 1; shardIndex++) { /* initialize the hash token space for this shard */ int32 shardMinValue = PG_INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxValue = shardMinValue + (hashTokenIncrement - 1); splitPointList = lappend_int(splitPointList, shardMaxValue); } return splitPointList; } /* * WorkerNodesForShardList returns a list of node ids reflecting the locations of * the given list of shards. */ static List * WorkerNodesForShardList(List *shardList) { List *nodeIdList = NIL; ShardInterval *shardInterval = NULL; foreach_declared_ptr(shardInterval, shardList) { WorkerNode *workerNode = ActiveShardPlacementWorkerNode(shardInterval->shardId); nodeIdList = lappend_int(nodeIdList, workerNode->nodeId); } return nodeIdList; } /* * RoundRobinWorkerNodeList round robins over the workers in the worker node list * and adds node ids to a list of length listLength. */ static List * RoundRobinWorkerNodeList(List *workerNodeList, int listLength) { Assert(workerNodeList != NIL); List *nodeIdList = NIL; for (int idx = 0; idx < listLength; idx++) { int nodeIdx = idx % list_length(workerNodeList); WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, nodeIdx); nodeIdList = lappend_int(nodeIdList, workerNode->nodeId); } return nodeIdList; } /* * create_reference_table creates a distributed table with the given relationId. The * created table has one shard and replication factor is set to the active worker * count. In fact, the above is the definition of a reference table in Citus. */ Datum create_reference_table(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); CreateReferenceTable(relationId); PG_RETURN_VOID(); } /* * EnsureCitusTableCanBeCreated checks if * - we are on the coordinator * - the current user is the owner of the table * - relation kind is supported * - relation is not a shard */ static void EnsureCitusTableCanBeCreated(Oid relationOid) { EnsureCoordinator(); EnsureRelationExists(relationOid); EnsureTableOwner(relationOid); ErrorIfTemporaryTable(relationOid); ErrorIfForeignTable(relationOid); /* * We should do this check here since the codes in the following lines rely * on this relation to have a supported relation kind. More extensive checks * will be performed in CreateDistributedTable. */ EnsureRelationKindSupported(relationOid); /* * When coordinator is added to the metadata, or on the workers, * some of the relations of the coordinator node may/will be shards. * We disallow creating distributed tables from shard relations, by * erroring out here. */ ErrorIfRelationIsAKnownShard(relationOid); } /* * EnsureRelationExists does a basic check on whether the OID belongs to * an existing relation. */ void EnsureRelationExists(Oid relationId) { if (!RelationExists(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("relation with OID %d does not exist", relationId))); } } /* * CreateReferenceTable is a wrapper around CreateCitusTable that creates a * distributed table. */ void CreateDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, int shardCount, bool shardCountIsStrict, char *colocateWithTableName) { CitusTableType tableType; switch (distributionMethod) { case DISTRIBUTE_BY_HASH: { tableType = HASH_DISTRIBUTED; break; } case DISTRIBUTE_BY_APPEND: { tableType = APPEND_DISTRIBUTED; break; } case DISTRIBUTE_BY_RANGE: { tableType = RANGE_DISTRIBUTED; break; } default: { ereport(ERROR, (errmsg("unexpected distribution method when " "deciding Citus table type"))); break; } } DistributedTableParams distributedTableParams = { .colocationParam = { .colocateWithTableName = colocateWithTableName, .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT }, .shardCount = shardCount, .shardCountIsStrict = shardCountIsStrict, .distributionColumnName = distributionColumnName }; CreateCitusTable(relationId, tableType, &distributedTableParams); } /* * CreateReferenceTable creates a reference table. */ void CreateReferenceTable(Oid relationId) { if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { /* * Create the shard of given Citus local table on workers to convert * it into a reference table. */ ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL); } else { CreateCitusTable(relationId, REFERENCE_TABLE, NULL); } } /* * CreateSingleShardTable creates a single shard distributed table that * doesn't have a shard key. */ void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam) { DistributedTableParams distributedTableParams = { .colocationParam = colocationParam, .shardCount = 1, .shardCountIsStrict = true, .distributionColumnName = NULL }; if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { /* * Create the shard of given Citus local table on appropriate node * and drop the local one to convert it into a single-shard distributed * table. */ ConvertCitusLocalTableToTableType(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams); } else { CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams); } } /* * CreateCitusTable is the internal method that creates a Citus table in * given configuration. * * DistributedTableParams should be non-null only if we're creating a distributed * table. * * This functions contains all necessary logic to create distributed tables. It * performs necessary checks to ensure distributing the table is safe. If it is * safe to distribute the table, this function creates distributed table metadata, * creates shards and copies local data to shards. This function also handles * partitioned tables by distributing its partitions as well. */ static void CreateCitusTable(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams) { if ((tableType == HASH_DISTRIBUTED || tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED) != (distributedTableParams != NULL)) { ereport(ERROR, (errmsg("distributed table params must be provided " "when creating a distributed table and must " "not be otherwise"))); } EnsureCitusTableCanBeCreated(relationId); /* allow creating a Citus table on an empty cluster */ InsertCoordinatorIfClusterEmpty(); Relation relation = try_relation_open(relationId, ExclusiveLock); if (relation == NULL) { ereport(ERROR, (errmsg("could not create Citus table: " "relation does not exist"))); } relation_close(relation, NoLock); if (tableType == SINGLE_SHARD_DISTRIBUTED && ShardReplicationFactor > 1) { ereport(ERROR, (errmsg("could not create single shard table: " "citus.shard_replication_factor is greater than 1"), errhint("Consider setting citus.shard_replication_factor to 1 " "and try again"))); } /* * EnsureTableNotDistributed errors out when relation is a citus table but * we don't want to ask user to first undistribute their citus local tables * when creating distributed tables from them. * For this reason, here we undistribute citus local tables beforehand. * But since UndistributeTable does not support undistributing relations * involved in foreign key relationships, we first drop foreign keys that * given relation is involved, then we undistribute the relation and finally * we re-create dropped foreign keys at the end of this function. */ List *originalForeignKeyRecreationCommands = NIL; if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { /* * We use ConvertCitusLocalTableToTableType instead of CreateCitusTable * to create a reference table or a single-shard table from a Citus * local table. */ Assert(tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED); /* store foreign key creation commands that relation is involved */ originalForeignKeyRecreationCommands = GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES); relationId = DropFKeysAndUndistributeTable(relationId); } /* * To support foreign keys between reference tables and local tables, * we drop & re-define foreign keys at the end of this function so * that ALTER TABLE hook does the necessary job, which means converting * local tables to citus local tables to properly support such foreign * keys. */ else if (tableType == REFERENCE_TABLE && ShouldEnableLocalReferenceForeignKeys() && HasForeignKeyWithLocalTable(relationId)) { /* * Store foreign key creation commands for foreign key relationships * that relation has with postgres tables. */ originalForeignKeyRecreationCommands = GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES); /* * Soon we will convert local tables to citus local tables. As * CreateCitusLocalTable needs to use local execution, now we * switch to local execution beforehand so that reference table * creation doesn't use remote execution and we don't error out * in CreateCitusLocalTable. */ SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES); } LockRelationOid(relationId, ExclusiveLock); EnsureTableNotDistributed(relationId); PropagatePrerequisiteObjectsForDistributedTable(relationId); Var *distributionColumn = NULL; if (distributedTableParams && distributedTableParams->distributionColumnName) { distributionColumn = BuildDistributionKeyFromColumnName(relationId, distributedTableParams-> distributionColumnName, NoLock); } CitusTableParams citusTableParams = DecideCitusTableParams(tableType, distributedTableParams); /* * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, * our caller already acquired lock on relationId. */ uint32 colocationId = INVALID_COLOCATION_ID; if (distributedTableParams && distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_COLOCATION_ID) { colocationId = distributedTableParams->colocationParam.colocationId; } else { /* * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, * our caller already acquired lock on relationId. */ colocationId = ColocationIdForNewTable(relationId, tableType, distributedTableParams, distributionColumn); } EnsureRelationCanBeDistributed(relationId, distributionColumn, citusTableParams.distributionMethod, colocationId, citusTableParams.replicationModel); /* * Make sure that existing reference tables have been replicated to all the nodes * such that we can create foreign keys and joins work immediately after creation. * * This will take a lock on the nodes to make sure no nodes are added after we have * verified and ensured the reference tables are copied everywhere. * Although copying reference tables here for anything but creating a new colocation * group, it requires significant refactoring which we don't want to perform now. */ EnsureReferenceTablesExistOnAllNodes(); /* * While adding tables to a colocation group we need to make sure no concurrent * mutations happen on the colocation group with regards to its placements. It is * important that we have already copied any reference tables before acquiring this * lock as these are competing operations. */ LockColocationId(colocationId, ShareLock); /* we need to calculate these variables before creating distributed metadata */ bool localTableEmpty = TableEmpty(relationId); Oid colocatedTableId = ColocatedTableId(colocationId); /* setting to false since this flag is only valid for citus local tables */ bool autoConverted = false; /* create an entry for distributed table in pg_dist_partition */ InsertIntoPgDistPartition(relationId, citusTableParams.distributionMethod, distributionColumn, colocationId, citusTableParams.replicationModel, autoConverted); #if PG_VERSION_NUM >= PG_VERSION_16 /* * PG16+ supports truncate triggers on foreign tables */ if (RegularTable(relationId) || IsForeignTable(relationId)) #else /* foreign tables do not support TRUNCATE trigger */ if (RegularTable(relationId)) #endif { CreateTruncateTrigger(relationId); } if (tableType == HASH_DISTRIBUTED) { /* create shards for hash distributed table */ CreateHashDistributedTableShards(relationId, distributedTableParams->shardCount, colocatedTableId, localTableEmpty); } else if (tableType == REFERENCE_TABLE) { /* create shards for reference table */ CreateReferenceTableShard(relationId); } else if (tableType == SINGLE_SHARD_DISTRIBUTED) { /* create the shard of given single-shard distributed table */ CreateSingleShardTableShard(relationId, colocatedTableId, colocationId); } if (ShouldSyncTableMetadata(relationId)) { SyncCitusTableMetadata(relationId); } /* * We've a custom way of foreign key graph invalidation, * see InvalidateForeignKeyGraph(). */ if (TableReferenced(relationId) || TableReferencing(relationId)) { InvalidateForeignKeyGraph(); } /* if this table is partitioned table, distribute its partitions too */ if (PartitionedTable(relationId)) { List *partitionList = PartitionList(relationId); Oid partitionRelationId = InvalidOid; char *parentRelationName = generate_qualified_relation_name(relationId); /* * when there are many partitions, each call to CreateDistributedTable * accumulates used memory. Create and free context for each call. */ MemoryContext citusPartitionContext = AllocSetContextCreate(CurrentMemoryContext, "citus_per_partition_context", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(citusPartitionContext); foreach_declared_oid(partitionRelationId, partitionList) { MemoryContextReset(citusPartitionContext); DistributedTableParams childDistributedTableParams = { .colocationParam = { .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = parentRelationName, }, .shardCount = distributedTableParams->shardCount, .shardCountIsStrict = false, .distributionColumnName = distributedTableParams->distributionColumnName, }; CreateCitusTable(partitionRelationId, tableType, &childDistributedTableParams); } MemoryContextSwitchTo(oldContext); MemoryContextDelete(citusPartitionContext); } /* copy over data for hash distributed and reference tables */ if (tableType == HASH_DISTRIBUTED || tableType == SINGLE_SHARD_DISTRIBUTED || tableType == REFERENCE_TABLE) { if (RegularTable(relationId)) { CopyLocalDataIntoShards(relationId); } } /* * Now recreate foreign keys that we dropped beforehand. As modifications are not * allowed on the relations that are involved in the foreign key relationship, * we can skip the validation of the foreign keys. */ bool skip_validation = true; ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands, skip_validation); } /* * ConvertCitusLocalTableToTableType converts given Citus local table to * given table type. * * This only supports converting Citus local tables to reference tables * (by replicating the shard to workers) and single-shard distributed * tables (by replicating the shard to the appropriate worker and dropping * the local one). */ static void ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams) { if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) { ereport(ERROR, (errmsg("table is not a local table added to metadata"))); } if (tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED) { ereport(ERROR, (errmsg("table type is not supported for conversion"))); } if ((tableType == SINGLE_SHARD_DISTRIBUTED) != (distributedTableParams != NULL)) { ereport(ERROR, (errmsg("distributed table params must be provided " "when creating a distributed table and must " "not be otherwise"))); } EnsureCitusTableCanBeCreated(relationId); Relation relation = try_relation_open(relationId, ExclusiveLock); if (relation == NULL) { ereport(ERROR, (errmsg("could not create Citus table: " "relation does not exist"))); } relation_close(relation, NoLock); if (tableType == SINGLE_SHARD_DISTRIBUTED && ShardReplicationFactor > 1) { ereport(ERROR, (errmsg("could not create single shard table: " "citus.shard_replication_factor is greater than 1"), errhint("Consider setting citus.shard_replication_factor to 1 " "and try again"))); } LockRelationOid(relationId, ExclusiveLock); Var *distributionColumn = NULL; CitusTableParams citusTableParams = DecideCitusTableParams(tableType, distributedTableParams); uint32 colocationId = INVALID_COLOCATION_ID; if (distributedTableParams && distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_COLOCATION_ID) { colocationId = distributedTableParams->colocationParam.colocationId; } else { colocationId = ColocationIdForNewTable(relationId, tableType, distributedTableParams, distributionColumn); } /* check constraints etc. on table based on new distribution params */ EnsureRelationCanBeDistributed(relationId, distributionColumn, citusTableParams.distributionMethod, colocationId, citusTableParams.replicationModel); /* * Regarding the foreign key relationships that given relation is involved, * EnsureRelationCanBeDistributed() only checks the ones where the relation * is the referencing table. And given that the table at hand is a Citus * local table, right now it may only be referenced by a reference table * or a Citus local table. However, given that neither of those two cases * are not applicable for a distributed table, here we throw an error if * that's the case. * * Note that we don't need to check the same if we're creating a reference * table from a Citus local table because all the foreign keys referencing * Citus local tables are supported by reference tables. */ if (tableType == SINGLE_SHARD_DISTRIBUTED) { EnsureNoFKeyFromTableType(relationId, INCLUDE_CITUS_LOCAL_TABLES | INCLUDE_REFERENCE_TABLES); } EnsureReferenceTablesExistOnAllNodes(); LockColocationId(colocationId, ShareLock); /* * When converting to a single shard table, we want to drop the placement * on the coordinator, but only if transferring to a different node. In that * case, shouldDropLocalPlacement is true. When converting to a reference * table, we always keep the placement on the coordinator, so for reference * tables shouldDropLocalPlacement is always false. */ bool shouldDropLocalPlacement = false; List *targetNodeList = NIL; if (tableType == SINGLE_SHARD_DISTRIBUTED) { uint32 targetNodeId = SingleShardTableColocationNodeId(colocationId); if (targetNodeId != CoordinatorNodeIfAddedAsWorkerOrError()->nodeId) { bool missingOk = false; WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk); targetNodeList = list_make1(targetNode); shouldDropLocalPlacement = true; } } else if (tableType == REFERENCE_TABLE) { targetNodeList = ActivePrimaryNonCoordinatorNodeList(ShareLock); targetNodeList = SortList(targetNodeList, CompareWorkerNodes); } bool autoConverted = false; UpdateNoneDistTableMetadataGlobally( relationId, citusTableParams.replicationModel, colocationId, autoConverted); /* create the shard placement on workers and insert into pg_dist_placement globally */ if (list_length(targetNodeList) > 0) { NoneDistTableReplicateCoordinatorPlacement(relationId, targetNodeList); } if (shouldDropLocalPlacement) { /* * We don't yet drop the local placement before handling partitions. * Otherewise, local shard placements of the partitions will be gone * before we create them on workers. * * However, we need to delete the related entry from pg_dist_placement * before distributing partitions (if any) because we need a sane metadata * state before doing so. */ NoneDistTableDeleteCoordinatorPlacement(relationId); } /* if this table is partitioned table, distribute its partitions too */ if (PartitionedTable(relationId)) { /* right now we don't allow partitioned reference tables */ Assert(tableType == SINGLE_SHARD_DISTRIBUTED); List *partitionList = PartitionList(relationId); char *parentRelationName = generate_qualified_relation_name(relationId); /* * When there are many partitions, each call to * ConvertCitusLocalTableToTableType accumulates used memory. * Create and free citus_per_partition_context for each call. */ MemoryContext citusPartitionContext = AllocSetContextCreate(CurrentMemoryContext, "citus_per_partition_context", ALLOCSET_DEFAULT_SIZES); MemoryContext oldContext = MemoryContextSwitchTo(citusPartitionContext); Oid partitionRelationId = InvalidOid; foreach_declared_oid(partitionRelationId, partitionList) { MemoryContextReset(citusPartitionContext); DistributedTableParams childDistributedTableParams = { .colocationParam = { .colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT, .colocateWithTableName = parentRelationName, }, .shardCount = distributedTableParams->shardCount, .shardCountIsStrict = false, .distributionColumnName = distributedTableParams->distributionColumnName, }; ConvertCitusLocalTableToTableType(partitionRelationId, tableType, &childDistributedTableParams); } MemoryContextSwitchTo(oldContext); MemoryContextDelete(citusPartitionContext); } if (shouldDropLocalPlacement) { NoneDistTableDropCoordinatorPlacementTable(relationId); } } /* * DecideCitusTableParams decides CitusTableParams based on given CitusTableType * and DistributedTableParams if it's a distributed table. * * DistributedTableParams should be non-null only if CitusTableType corresponds * to a distributed table. */ static CitusTableParams DecideCitusTableParams(CitusTableType tableType, DistributedTableParams *distributedTableParams) { CitusTableParams citusTableParams = { 0 }; switch (tableType) { case HASH_DISTRIBUTED: { Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); citusTableParams.distributionMethod = DISTRIBUTE_BY_HASH; citusTableParams.replicationModel = DecideDistTableReplicationModel(DISTRIBUTE_BY_HASH, distributedTableParams->colocationParam. colocateWithTableName); break; } case APPEND_DISTRIBUTED: { Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); citusTableParams.distributionMethod = DISTRIBUTE_BY_APPEND; citusTableParams.replicationModel = DecideDistTableReplicationModel(APPEND_DISTRIBUTED, distributedTableParams->colocationParam. colocateWithTableName); break; } case RANGE_DISTRIBUTED: { Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); citusTableParams.distributionMethod = DISTRIBUTE_BY_RANGE; citusTableParams.replicationModel = DecideDistTableReplicationModel(RANGE_DISTRIBUTED, distributedTableParams->colocationParam. colocateWithTableName); break; } case SINGLE_SHARD_DISTRIBUTED: { citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE; citusTableParams.replicationModel = REPLICATION_MODEL_STREAMING; break; } case REFERENCE_TABLE: { citusTableParams.distributionMethod = DISTRIBUTE_BY_NONE; citusTableParams.replicationModel = REPLICATION_MODEL_2PC; break; } default: { ereport(ERROR, (errmsg("unexpected table type when deciding Citus " "table params"))); break; } } return citusTableParams; } /* * PropagatePrerequisiteObjectsForDistributedTable ensures we can create shards * on all nodes by ensuring all dependent objects exist on all node. */ static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId) { /* * Ensure that the sequences used in column defaults of the table * have proper types */ EnsureRelationHasCompatibleSequenceTypes(relationId); /* * distributed tables might have dependencies on different objects, since we create * shards for a distributed table via multiple sessions these objects will be created * via their own connection and committed immediately so they become visible to all * sessions creating shards. */ ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*tableAddress, RelationRelationId, relationId); EnsureAllObjectDependenciesExistOnAllNodes(list_make1(tableAddress)); TrackPropagatedTableAndSequences(relationId); } /* * EnsureSequenceTypeSupported ensures that the type of the column that uses * a sequence on its DEFAULT is consistent with previous uses (if any) of the * sequence in distributed tables. * If any other distributed table uses the input sequence, it checks whether * the types of the columns using the sequence match. If they don't, it errors out. * Otherwise, the condition is ensured. * Since the owner of the sequence may not distributed yet, it should be added * explicitly. */ void EnsureSequenceTypeSupported(Oid seqOid, Oid attributeTypeId, Oid ownerRelationId) { Oid attrDefOid; List *attrDefOids = GetAttrDefsFromSequence(seqOid); foreach_declared_oid(attrDefOid, attrDefOids) { ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); /* * If another distributed table is using the same sequence * in one of its column defaults, make sure the types of the * columns match. * * We skip non-distributed tables, but we need to check the current * table as it might reference the same sequence multiple times. */ if (columnAddress.objectId != ownerRelationId && !IsCitusTable(columnAddress.objectId)) { continue; } Oid currentAttributeTypId = GetAttributeTypeOid(columnAddress.objectId, columnAddress.objectSubId); if (attributeTypeId != currentAttributeTypId) { char *sequenceName = generate_qualified_relation_name( seqOid); char *citusTableName = generate_qualified_relation_name(columnAddress.objectId); ereport(ERROR, (errmsg( "The sequence %s is already used for a different" " type in column %d of the table %s", sequenceName, columnAddress.objectSubId, citusTableName))); } } } /* * AlterSequenceType alters the given sequence's type to the given type. */ void AlterSequenceType(Oid seqOid, Oid typeOid) { Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid); Oid currentSequenceTypeOid = sequenceData->seqtypid; if (currentSequenceTypeOid != typeOid) { AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt); char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid)); char *seqName = get_rel_name(seqOid); alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1); Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1); SetDefElemArg(alterSequenceStatement, "as", asTypeNode); ParseState *pstate = make_parsestate(NULL); AlterSequence(pstate, alterSequenceStatement); CommandCounterIncrement(); } } /* * EnsureRelationHasCompatibleSequenceTypes ensures that sequences used for columns * of the table have compatible types both with the column type on that table and * all other distributed tables' columns they have used for */ void EnsureRelationHasCompatibleSequenceTypes(Oid relationId) { List *seqInfoList = NIL; GetDependentSequencesWithRelation(relationId, &seqInfoList, 0, DEPENDENCY_AUTO); EnsureDistributedSequencesHaveOneType(relationId, seqInfoList); } /* * EnsureDistributedSequencesHaveOneType first ensures that the type of the column * in which the sequence is used as default is supported for each sequence in input * dependentSequenceList, and then alters the sequence type if not the same with the column type. */ static void EnsureDistributedSequencesHaveOneType(Oid relationId, List *seqInfoList) { SequenceInfo *seqInfo = NULL; foreach_declared_ptr(seqInfo, seqInfoList) { if (!seqInfo->isNextValDefault) { /* * If a sequence is not on the nextval, we don't need any check. * This is a dependent sequence via ALTER SEQUENCE .. OWNED BY col */ continue; } /* * We should make sure that the type of the column that uses * that sequence is supported */ Oid sequenceOid = seqInfo->sequenceOid; AttrNumber attnum = seqInfo->attributeNumber; Oid attributeTypeId = GetAttributeTypeOid(relationId, attnum); EnsureSequenceTypeSupported(sequenceOid, attributeTypeId, relationId); /* * Alter the sequence's data type in the coordinator if needed. * * First, we should only change the sequence type if the column * is a supported sequence type. For example, if a sequence is used * in an expression which then becomes a text, we should not try to * alter the sequence type to text. Postgres only supports int2, int4 * and int8 as the sequence type. * * A sequence's type is bigint by default and it doesn't change even if * it's used in an int column. We should change the type if needed, * and not allow future ALTER SEQUENCE ... TYPE ... commands for * sequences used as defaults in distributed tables. */ if (attributeTypeId == INT2OID || attributeTypeId == INT4OID || attributeTypeId == INT8OID) { AlterSequenceType(sequenceOid, attributeTypeId); } } } /* * DecideDistTableReplicationModel function decides which replication model should be * used for a distributed table depending on given distribution configuration. */ static char DecideDistTableReplicationModel(char distributionMethod, char *colocateWithTableName) { Assert(distributionMethod != DISTRIBUTE_BY_NONE); if (!IsColocateWithDefault(colocateWithTableName) && !IsColocateWithNone(colocateWithTableName)) { text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false); CitusTableCacheEntry *targetTableEntry = GetCitusTableCacheEntry( colocatedRelationId); char replicationModel = targetTableEntry->replicationModel; return replicationModel; } else if (distributionMethod == DISTRIBUTE_BY_HASH && !DistributedTableReplicationIsEnabled()) { return REPLICATION_MODEL_STREAMING; } else { return REPLICATION_MODEL_COORDINATOR; } /* we should not reach to this point */ return REPLICATION_MODEL_INVALID; } /* * CreateHashDistributedTableShards creates shards of given hash distributed table. */ static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty) { bool useExclusiveConnection = false; /* * Decide whether to use exclusive connections per placement or not. Note that * if the local table is not empty, we cannot use sequential mode since the COPY * operation that'd load the data into shards currently requires exclusive * connections. */ if (RegularTable(relationId)) { useExclusiveConnection = CanUseExclusiveConnections(relationId, localTableEmpty); } if (colocatedTableId != InvalidOid) { /* * We currently allow concurrent distribution of colocated tables (which * we probably should not be allowing because of foreign keys / * partitioning etc). * * We also prevent concurrent shard moves / copy / splits) while creating * a colocated table. */ AcquirePlacementColocationLock(colocatedTableId, ShareLock, "colocate distributed table"); CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); } else { /* * This path is only reached by create_distributed_table for the distributed * tables which will not be part of an existing colocation group. Therefore, * we can directly use ShardReplicationFactor global variable here. */ CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor, useExclusiveConnection); } } /* * CreateSingleShardTableShard creates the shard of given single-shard * distributed table. */ static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId, uint32 colocationId) { if (colocatedTableId != InvalidOid) { /* * We currently allow concurrent distribution of colocated tables (which * we probably should not be allowing because of foreign keys / * partitioning etc). * * We also prevent concurrent shard moves / copy / splits) while creating * a colocated table. */ AcquirePlacementColocationLock(colocatedTableId, ShareLock, "colocate distributed table"); /* * We don't need to force using exclusive connections because we're anyway * creating a single shard. */ bool useExclusiveConnection = false; CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); } else { CreateSingleShardTableShardWithRoundRobinPolicy(relationId, colocationId); } } /* * ColocationIdForNewTable returns a colocation id for given table * according to given configuration. If there is no such configuration, it * creates one and returns colocation id of newly the created colocation group. * Note that DistributedTableParams and the distribution column Var should be * non-null only if CitusTableType corresponds to a distributed table. * * For append and range distributed tables, this function errors out if * colocateWithTableName parameter is not NULL, otherwise directly returns * INVALID_COLOCATION_ID. * * For reference tables, returns the common reference table colocation id. * * This function assumes its caller take necessary lock on relationId to * prevent possible changes on it. */ static uint32 ColocationIdForNewTable(Oid relationId, CitusTableType tableType, DistributedTableParams *distributedTableParams, Var *distributionColumn) { CitusTableParams citusTableParams = DecideCitusTableParams(tableType, distributedTableParams); uint32 colocationId = INVALID_COLOCATION_ID; if (tableType == APPEND_DISTRIBUTED || tableType == RANGE_DISTRIBUTED) { Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); char *colocateWithTableName = distributedTableParams->colocationParam.colocateWithTableName; if (!IsColocateWithDefault(colocateWithTableName)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation"), errdetail("Currently, colocate_with option is not supported " "for append / range distributed tables."))); } return colocationId; } else if (tableType == REFERENCE_TABLE) { return CreateReferenceTableColocationId(); } else { /* * Get an exclusive lock on the colocation system catalog. Therefore, we * can be sure that there will no modifications on the colocation table * until this transaction is committed. */ Oid distributionColumnType = distributionColumn ? distributionColumn->vartype : InvalidOid; Oid distributionColumnCollation = distributionColumn ? get_typcollation(distributionColumnType) : InvalidOid; Assert(distributedTableParams->colocationParam.colocationParamType == COLOCATE_WITH_TABLE_LIKE_OPT); char *colocateWithTableName = distributedTableParams->colocationParam.colocateWithTableName; /* get an advisory lock to serialize concurrent default group creations */ if (IsColocateWithDefault(colocateWithTableName)) { AcquireColocationDefaultLock(); } colocationId = FindColocateWithColocationId(relationId, citusTableParams.replicationModel, distributionColumnType, distributionColumnCollation, distributedTableParams->shardCount, distributedTableParams-> shardCountIsStrict, colocateWithTableName); if (IsColocateWithDefault(colocateWithTableName) && (colocationId != INVALID_COLOCATION_ID)) { /* * we can release advisory lock if there is already a default entry for given params; * else, we should keep it to prevent different default coloc entry creation by * concurrent operations. */ ReleaseColocationDefaultLock(); } if (colocationId == INVALID_COLOCATION_ID) { if (IsColocateWithDefault(colocateWithTableName)) { /* * Generate a new colocation ID and insert a pg_dist_colocation * record. */ colocationId = CreateColocationGroup(distributedTableParams->shardCount, ShardReplicationFactor, distributionColumnType, distributionColumnCollation); } else if (IsColocateWithNone(colocateWithTableName)) { /* * Generate a new colocation ID and insert a pg_dist_colocation * record. */ colocationId = CreateColocationGroup(distributedTableParams->shardCount, ShardReplicationFactor, distributionColumnType, distributionColumnCollation); } } } return colocationId; } /* * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given * relation with the given configuration. We perform almost all safety checks for * distributing table here. If there is an unsatisfied requirement, we error out * and do not distribute the table. * * This function assumes, callers have already acquired necessary locks to ensure * there will not be any change in the given relation. */ static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, char distributionMethod, uint32 colocationId, char replicationModel) { Oid parentRelationId = InvalidOid; EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod); /* user really wants triggers? */ if (EnableUnsafeTriggers) { ErrorIfRelationHasUnsupportedTrigger(relationId); } else { EnsureRelationHasNoTriggers(relationId); } /* we assume callers took necessary locks */ Relation relation = relation_open(relationId, NoLock); TupleDesc relationDesc = RelationGetDescr(relation); char *relationName = RelationGetRelationName(relation); ErrorIfTableIsACatalogTable(relation); /* verify target relation is not distributed by a generated stored column */ if (distributionMethod != DISTRIBUTE_BY_NONE && DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: %s", relationName), errdetail("Distribution column must not use GENERATED ALWAYS " "AS (...) STORED."))); } /* verify target relation is not distributed by a column of type numeric with negative scale */ if (distributionMethod != DISTRIBUTE_BY_NONE && DistributionColumnUsesNumericColumnNegativeScale(relationDesc, distributionColumn)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: %s", relationName), errdetail("Distribution column must not use numeric type " "with negative scale"))); } /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) { Oid hashSupportFunction = SupportFunctionForColumn(distributionColumn, HASH_AM_OID, HASHSTANDARD_PROC); if (hashSupportFunction == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg("could not identify a hash function for type %s", format_type_be(distributionColumn->vartype)), errdatatype(distributionColumn->vartype), errdetail("Partition column types must have a hash function " "defined to use hash partitioning."))); } if (distributionColumn->varcollid != InvalidOid && !get_collation_isdeterministic(distributionColumn->varcollid)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("Hash distributed partition columns may not use " "a non deterministic collation"))); } } else if (distributionMethod == DISTRIBUTE_BY_RANGE) { Oid btreeSupportFunction = SupportFunctionForColumn(distributionColumn, BTREE_AM_OID, BTORDER_PROC); if (btreeSupportFunction == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION), errmsg("could not identify a comparison function for type %s", format_type_be(distributionColumn->vartype)), errdatatype(distributionColumn->vartype), errdetail("Partition column types must have a comparison function " "defined to use range partitioning."))); } } if (PartitionTableNoLock(relationId)) { parentRelationId = PartitionParentOid(relationId); } /* partitions cannot be distributed if their parent is not distributed */ if (PartitionTableNoLock(relationId) && !IsCitusTable(parentRelationId)) { char *parentRelationName = get_rel_name(parentRelationId); ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of " "\"%s\"", relationName, parentRelationName), errdetail("Citus does not support distributing partitions " "if their parent is not distributed table."), errhint("Distribute the partitioned table \"%s\" instead.", parentRelationName))); } /* * These checks are mostly for partitioned tables not partitions because we prevent * distributing partitions directly in the above check. However, partitions can still * reach this point because, we call CreateDistributedTable for partitions if their * parent table is distributed. */ if (PartitionedTableNoLock(relationId)) { /* * Distributing partitioned tables is only supported for hash-distribution * or single-shard tables. */ bool isSingleShardTable = distributionMethod == DISTRIBUTE_BY_NONE && replicationModel == REPLICATION_MODEL_STREAMING && colocationId != INVALID_COLOCATION_ID; if (distributionMethod != DISTRIBUTE_BY_HASH && !isSingleShardTable) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("distributing partitioned tables in only supported " "for hash-distributed tables"))); } /* we don't support distributing tables with multi-level partitioning */ if (PartitionTableNoLock(relationId)) { char *parentRelationName = get_rel_name(parentRelationId); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("distributing multi-level partitioned tables " "is not supported"), errdetail("Relation \"%s\" is partitioned table itself and " "it is also partition of relation \"%s\".", relationName, parentRelationName))); } } ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel, distributionColumn, colocationId); ErrorIfUnsupportedPolicy(relation); relation_close(relation, NoLock); } /* * ErrorIfTemporaryTable errors out if the given table is a temporary table. */ static void ErrorIfTemporaryTable(Oid relationId) { if (get_rel_persistence(relationId) == RELPERSISTENCE_TEMP) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute a temporary table"))); } } /* * ErrorIfTableIsACatalogTable is a helper function to error out for citus * table creation from a catalog table. */ void ErrorIfTableIsACatalogTable(Relation relation) { if (relation->rd_rel->relnamespace != PG_CATALOG_NAMESPACE) { return; } ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create a citus table from a catalog table"))); } /* * EnsureLocalTableEmptyIfNecessary errors out if the function should be empty * according to ShouldLocalTableBeEmpty but it is not. */ static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod) { if (ShouldLocalTableBeEmpty(relationId, distributionMethod)) { EnsureLocalTableEmpty(relationId); } } /* * ShouldLocalTableBeEmpty returns true if the local table should be empty * before creating a citus table. * In some cases, it is possible and safe to send local data to shards while * distributing the table. In those cases, we can distribute non-empty local * tables. This function checks the distributionMethod and relation kind to * see whether we need to be ensure emptiness of local table. */ static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod) { bool shouldLocalTableBeEmpty = false; if (distributionMethod != DISTRIBUTE_BY_HASH && distributionMethod != DISTRIBUTE_BY_NONE) { /* * We only support hash distributed tables and reference tables * for initial data loading */ shouldLocalTableBeEmpty = true; } else if (!RegularTable(relationId)) { /* * We only support tables and partitioned tables for initial * data loading */ shouldLocalTableBeEmpty = true; } return shouldLocalTableBeEmpty; } /* * EnsureLocalTableEmpty errors out if the local table is not empty. */ static void EnsureLocalTableEmpty(Oid relationId) { char *relationName = get_rel_name(relationId); bool localTableEmpty = TableEmpty(relationId); if (!localTableEmpty) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", relationName), errdetail("Relation \"%s\" contains data.", relationName), errhint("Empty your table before distributing it."))); } } /* * EnsureDistributableTable ensures the given table type is appropriate to * be distributed. Table type should be regular or citus local table. */ static void EnsureDistributableTable(Oid relationId) { bool isLocalTable = IsCitusTableType(relationId, CITUS_LOCAL_TABLE); bool isRegularTable = !IsCitusTableType(relationId, ANY_CITUS_TABLE_TYPE); if (!isLocalTable && !isRegularTable) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("table \"%s\" is already distributed", relationName))); } } /* * EnsureTableNotDistributed errors out if the table is distributed. */ void EnsureTableNotDistributed(Oid relationId) { char *relationName = get_rel_name(relationId); bool isCitusTable = IsCitusTable(relationId); if (isCitusTable) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("table \"%s\" is already distributed", relationName))); } } /* * EnsureRelationHasNoTriggers errors out if the given table has triggers on * it. See also GetExplicitTriggerIdList function's comment for the triggers this * function errors out. */ static void EnsureRelationHasNoTriggers(Oid relationId) { List *explicitTriggerIds = GetExplicitTriggerIdList(relationId); if (list_length(explicitTriggerIds) > 0) { char *relationName = get_rel_name(relationId); Assert(relationName != NULL); ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it " "has triggers", relationName), errhint("Consider dropping all the triggers on \"%s\" " "and retry.", relationName))); } } /* * LookupDistributionMethod maps the oids of citus.distribution_type enum * values to pg_dist_partition.partmethod values. * * The passed in oid has to belong to a value of citus.distribution_type. */ char LookupDistributionMethod(Oid distributionMethodOid) { char distributionMethod = 0; HeapTuple enumTuple = SearchSysCache1(ENUMOID, ObjectIdGetDatum( distributionMethodOid)); if (!HeapTupleIsValid(enumTuple)) { ereport(ERROR, (errmsg("invalid internal value for enum: %u", distributionMethodOid))); } Form_pg_enum enumForm = (Form_pg_enum) GETSTRUCT(enumTuple); const char *enumLabel = NameStr(enumForm->enumlabel); if (strncmp(enumLabel, "append", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_APPEND; } else if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_HASH; } else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0) { distributionMethod = DISTRIBUTE_BY_RANGE; } else { ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel))); } ReleaseSysCache(enumTuple); return distributionMethod; } /* * SupportFunctionForColumn locates a support function given a column, an access method, * and and id of a support function. This function returns InvalidOid if there is no * support function for the operator class family of the column, but if the data type * of the column has no default operator class whatsoever, this function errors out. */ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber) { Oid columnOid = partitionColumn->vartype; Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId); /* currently only support using the default operator class */ if (operatorClassId == InvalidOid) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("data type %s has no default operator class for specified" " partition method", format_type_be(columnOid)), errdatatype(columnOid), errdetail("Partition column types must have a default operator" " class defined."))); } Oid operatorFamilyId = get_opclass_family(operatorClassId); Oid operatorClassInputType = get_opclass_input_type(operatorClassId); Oid supportFunctionOid = get_opfamily_proc(operatorFamilyId, operatorClassInputType, operatorClassInputType, supportFunctionNumber); return supportFunctionOid; } /* * TableEmpty function checks whether given table contains any row and * returns false if there is any data. */ bool TableEmpty(Oid tableId) { Oid schemaId = get_rel_namespace(tableId); char *schemaName = get_namespace_name(schemaId); char *tableName = get_rel_name(tableId); char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); StringInfo selectTrueQueryString = makeStringInfo(); bool readOnly = true; int spiConnectionResult = SPI_connect(); if (spiConnectionResult != SPI_OK_CONNECT) { ereport(ERROR, (errmsg("could not connect to SPI manager"))); } appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName); int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0); if (spiQueryResult != SPI_OK_SELECT) { ereport(ERROR, (errmsg("execution was not successful \"%s\"", selectTrueQueryString->data))); } /* we expect that SELECT TRUE query will return single value in a single row OR empty set */ Assert(SPI_processed == 1 || SPI_processed == 0); bool localTableEmpty = !SPI_processed; SPI_finish(); return localTableEmpty; } /* * CanUseExclusiveConnections checks if we can open parallel connections * while creating shards. We simply error out if we need to execute * sequentially but there is data in the table, since we cannot copy the * data to shards sequentially. */ static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty) { bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId); bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION || hasForeignKeyToReferenceTable; if (shouldRunSequential && ParallelQueryExecutedInTransaction()) { /* * We decided to use sequential execution. It's either because relation * has a pre-existing foreign key to a reference table or because we * decided to use sequential execution due to a query executed in the * current xact beforehand. * We have specific error messages for either cases. */ char *relationName = get_rel_name(relationId); if (hasForeignKeyToReferenceTable) { /* * If there has already been a parallel query executed, the sequential mode * would still use the already opened parallel connections to the workers, * thus contradicting our purpose of using sequential mode. */ ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this " "transaction because it has a foreign key to " "a reference table", relationName), errdetail("If a hash distributed table has a foreign key " "to a reference table, it has to be created " "in sequential mode before any parallel commands " "have been executed in the same transaction"), errhint("Try re-running the transaction with " "\"SET LOCAL citus.multi_shard_modify_mode TO " "\'sequential\';\""))); } else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) { ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because " "a parallel query was executed in this transaction", relationName), errhint("If you have manually set " "citus.multi_shard_modify_mode to 'sequential', " "try with 'parallel' option. "))); } } else if (shouldRunSequential) { return false; } else if (!localTableEmpty || IsMultiStatementTransaction()) { return true; } return false; } /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. */ void CreateTruncateTrigger(Oid relationId) { StringInfo triggerName = makeStringInfo(); bool internal = true; appendStringInfo(triggerName, "truncate_trigger"); CreateTrigStmt *trigger = makeNode(CreateTrigStmt); trigger->trigname = triggerName->data; trigger->relation = NULL; trigger->funcname = SystemFuncName(CITUS_TRUNCATE_TRIGGER_NAME); trigger->args = NIL; trigger->row = false; trigger->timing = TRIGGER_TYPE_AFTER; trigger->events = TRIGGER_TYPE_TRUNCATE; trigger->columns = NIL; trigger->whenClause = NULL; trigger->isconstraint = false; CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, InvalidOid, InvalidOid, NULL, internal, false); } /* * RegularTable function returns true if given table's relation kind is RELKIND_RELATION * or RELKIND_PARTITIONED_TABLE otherwise it returns false. */ bool RegularTable(Oid relationId) { char relationKind = get_rel_relkind(relationId); if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE) { return true; } return false; } /* * CopyLocalDataIntoShards is a wrapper around CopyFromLocalTableIntoDistTable * to copy data from the local table, which is hidden after converting it to a * distributed table, into the shards of the distributed table. * * After copying local data into the distributed table, the local data remains * in place and should be truncated at a later time. */ static void CopyLocalDataIntoShards(Oid distributedTableId) { uint64 rowsCopied = CopyFromLocalTableIntoDistTable(distributedTableId, distributedTableId); if (rowsCopied > 0) { char *qualifiedRelationName = generate_qualified_relation_name(distributedTableId); ereport(NOTICE, (errmsg("copying the data has completed"), errdetail("The local data in the table is no longer visible, " "but is still on disk."), errhint("To remove the local data, run: SELECT " "truncate_local_data_after_distributing_table($$%s$$)", qualifiedRelationName))); } } /* * CopyFromLocalTableIntoDistTable copies data from given local table into * the shards of given distributed table. * * For partitioned tables, this functions returns without copying the data * because we call this function for both partitioned tables and its partitions. * Returning early saves us from copying data to workers twice. * * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic. * We cannot use a regular COPY here since that cannot read from a table. Instead * we read from the table and pass each tuple to the CitusCopyDestReceiver which * opens a connection and starts a COPY for each shard placement that will have * data. * * We assume that the local table might indeed be a distributed table and the * caller would want to read the local data from the shell table in that case. * For this reason, to keep it simple, we perform a heap scan directly on the * table instead of using SELECT. * * We read from the table and pass each tuple to the CitusCopyDestReceiver which * opens a connection and starts a COPY for each shard placement that will have * data. */ uint64 CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId) { /* take an ExclusiveLock to block all operations except SELECT */ Relation localRelation = table_open(localTableId, ExclusiveLock); /* * Skip copying from partitioned tables, we will copy the data from * partition to partition's shards. */ if (PartitionedTable(distributedTableId)) { table_close(localRelation, NoLock); return 0; } /* * All writes have finished, make sure that we can see them by using the * latest snapshot. We use GetLatestSnapshot instead of * GetTransactionSnapshot since the latter would not reveal all writes * in serializable or repeatable read mode. Note that subsequent reads * from the distributed table would reveal those writes, temporarily * violating the isolation level. However, this seems preferable over * dropping the writes entirely. */ PushActiveSnapshot(GetLatestSnapshot()); Relation distributedRelation = RelationIdGetRelation(distributedTableId); /* get the table columns for distributed table */ TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); List *columnNameList = TupleDescColumnNameList(destTupleDescriptor); RelationClose(distributedRelation); int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; /* determine the partition column in the tuple descriptor */ Var *partitionColumn = PartitionColumn(distributedTableId, 0); if (partitionColumn != NULL) { partitionColumnIndex = partitionColumn->varattno - 1; } /* create tuple slot for local relation */ TupleDesc sourceTupleDescriptor = RelationGetDescr(localRelation); TupleTableSlot *slot = table_slot_create(localRelation, NULL); /* initialise per-tuple memory context */ EState *estate = CreateExecutorState(); ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; const bool nonPublishableData = false; /* we don't track query counters when distributing a table */ const bool trackQueryCounters = false; DestReceiver *copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId, columnNameList, partitionColumnIndex, estate, NULL, nonPublishableData, trackQueryCounters); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, sourceTupleDescriptor); uint64 rowsCopied = DoCopyFromLocalTableIntoShards(localRelation, copyDest, slot, estate); /* finish writing into the shards */ copyDest->rShutdown(copyDest); copyDest->rDestroy(copyDest); /* free memory and close the relation */ ExecDropSingleTupleTableSlot(slot); FreeExecutorState(estate); table_close(localRelation, NoLock); PopActiveSnapshot(); return rowsCopied; } /* * DoCopyFromLocalTableIntoShards performs a copy operation * from local tables into shards. * * Returns the number of rows copied. */ static uint64 DoCopyFromLocalTableIntoShards(Relation localRelation, DestReceiver *copyDest, TupleTableSlot *slot, EState *estate) { /* begin reading from local table */ TableScanDesc scan = table_beginscan(localRelation, GetActiveSnapshot(), 0, NULL); MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); uint64 rowsCopied = 0; while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { /* send tuple it to a shard */ copyDest->receiveSlot(slot, copyDest); /* clear tuple memory */ ResetPerTupleExprContext(estate); /* make sure we roll back on cancellation */ CHECK_FOR_INTERRUPTS(); if (rowsCopied == 0) { ereport(NOTICE, (errmsg("Copying data from local table..."))); } rowsCopied++; if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0) { ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); } } if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0) { ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied))); } MemoryContextSwitchTo(oldContext); /* finish reading from the local table */ table_endscan(scan); return rowsCopied; } /* * TupleDescColumnNameList returns a list of column names for the given tuple * descriptor as plain strings. */ static List * TupleDescColumnNameList(TupleDesc tupleDescriptor) { List *columnNameList = NIL; for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) { Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex); char *columnName = NameStr(currentColumn->attname); if (currentColumn->attisdropped || currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED ) { continue; } columnNameList = lappend(columnNameList, columnName); } return columnNameList; } /* * is_valid_numeric_typmod checks if the typmod value is valid * * Because of the offset, valid numeric typmods are at least VARHDRSZ * * Copied from PG. See numeric.c for understanding how this works. */ static bool is_valid_numeric_typmod(int32 typmod) { return typmod >= (int32) VARHDRSZ; } /* * numeric_typmod_scale extracts the scale from a numeric typmod. * * Copied from PG. See numeric.c for understanding how this works. * */ static int numeric_typmod_scale(int32 typmod) { return (((typmod - VARHDRSZ) & 0x7ff) ^ 1024) - 1024; } /* * DistributionColumnUsesNumericColumnNegativeScale returns whether a given relation uses * numeric data type with negative scale on distribution column */ static bool DistributionColumnUsesNumericColumnNegativeScale(TupleDesc relationDesc, Var *distributionColumn) { Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, distributionColumn->varattno - 1); if (attributeForm->atttypid == NUMERICOID && is_valid_numeric_typmod(attributeForm->atttypmod) && numeric_typmod_scale(attributeForm->atttypmod) < 0) { return true; } return false; } /* * DistributionColumnUsesGeneratedStoredColumn returns whether a given relation uses * GENERATED ALWAYS AS (...) STORED on distribution column */ static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn) { Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, distributionColumn->varattno - 1); if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED) { return true; } return false; } /* * ErrorIfForeignTable errors out if the relation with given relationOid * is a foreign table. */ static void ErrorIfForeignTable(Oid relationOid) { if (IsForeignTable(relationOid)) { char *relname = get_rel_name(relationOid); char *qualifiedRelname = generate_qualified_relation_name(relationOid); ereport(ERROR, (errmsg("foreign tables cannot be distributed"), (errhint("Can add foreign table \"%s\" to metadata by running: " "SELECT citus_add_local_table_to_metadata($$%s$$);", relname, qualifiedRelname)))); } }