citus/src/backend/distributed/commands/create_distributed_table.c

1892 lines
60 KiB
C

/*-------------------------------------------------------------------------
*
* create_distributed_table.c
* Routines relation to the creation of distributed relations.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/pg_version_constants.h"
#include "distributed/commands/utility_hook.h"
#include "access/genam.h"
#include "access/hash.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "access/nbtree.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/index.h"
#include "catalog/pg_am.h"
#include "catalog/pg_attribute.h"
#include "catalog/pg_enum.h"
#include "catalog/pg_extension.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_opclass.h"
#include "catalog/pg_proc.h"
#include "catalog/pg_trigger.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/distribution_column.h"
#include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
#include "parser/parse_expr.h"
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "parser/parser.h"
#include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/inval.h"
/*
* once every LOG_PER_TUPLE_AMOUNT, the copy will be logged.
*/
#define LOG_PER_TUPLE_AMOUNT 1000000
/* local function forward declarations */
static char DecideReplicationModel(char distributionMethod, char *colocateWithTableName,
bool viaDeprecatedAPI);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char replicationModel,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName,
bool viaDeprecatedAPI);
static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
char distributionMethod, uint32 colocationId,
char replicationModel, bool viaDeprecatedAPI);
static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType,
Oid sourceRelationId);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureRelationHasNoTriggers(Oid relationId);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
bool viaDeprecatedAPI);
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod, bool
viaDeprecatedAPI);
static void EnsureCitusTableCanBeCreated(Oid relationOid);
static void EnsureSequenceExistOnMetadataWorkersForRelation(Oid relationId,
Oid sequenceOid);
static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,
int tableTypeFlag);
static Oid DropFKeysAndUndistributeTable(Oid relationId);
static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag);
static void CopyLocalDataIntoShards(Oid relationId);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
Var *distributionColumn);
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate);
static void ErrorIfTemporaryTable(Oid relationId);
static void ErrorIfForeignTable(Oid relationOid);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table);
PG_FUNCTION_INFO_V1(create_reference_table);
/*
* master_create_distributed_table accepts a table, distribution column and
* method and performs the corresponding catalog changes.
*
* Note that this UDF is deprecated and cannot create colocated tables, so we
* always use INVALID_COLOCATION_ID.
*/
Datum
master_create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
EnsureCitusTableCanBeCreated(relationId);
char *colocateWithTableName = NULL;
bool viaDeprecatedAPI = true;
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create distributed table: "
"relation does not exist")));
}
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
Assert(distributionColumn != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
relation_close(relation, NoLock);
PG_RETURN_VOID();
}
/*
* create_distributed_table gets a table name, distribution column,
* distribution method and colocate_with option, then it creates a
* distributed table.
*/
Datum
create_distributed_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
bool viaDeprecatedAPI = false;
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}
shardCount = PG_GETARG_INT32(4);
/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}
EnsureCitusTableCanBeCreated(relationId);
/* enable create_distributed_table on an empty node */
InsertCoordinatorIfClusterEmpty();
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create distributed table: "
"relation does not exist")));
}
relation_close(relation, NoLock);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
Assert(distributionColumn != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
shardCount, shardCountIsStrict, colocateWithTableName,
viaDeprecatedAPI);
PG_RETURN_VOID();
}
/*
* create_reference_table creates a distributed table with the given relationId. The
* created table has one shard and replication factor is set to the active worker
* count. In fact, the above is the definition of a reference table in Citus.
*/
Datum
create_reference_table(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0);
char *colocateWithTableName = NULL;
Var *distributionColumn = NULL;
bool viaDeprecatedAPI = false;
EnsureCitusTableCanBeCreated(relationId);
/* enable create_reference_table on an empty node */
InsertCoordinatorIfClusterEmpty();
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create reference table: "
"relation does not exist")));
}
relation_close(relation, NoLock);
List *workerNodeList = ActivePrimaryNodeList(ShareLock);
int workerCount = list_length(workerNodeList);
/* if there are no workers, error out */
if (workerCount == 0)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot create reference table \"%s\"", relationName),
errdetail("There are no active worker nodes.")));
}
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
PG_RETURN_VOID();
}
/*
* EnsureCitusTableCanBeCreated checks if
* - we are on the coordinator
* - the current user is the owner of the table
* - relation kind is supported
*/
static void
EnsureCitusTableCanBeCreated(Oid relationOid)
{
EnsureCoordinator();
EnsureRelationExists(relationOid);
EnsureTableOwner(relationOid);
ErrorIfTemporaryTable(relationOid);
ErrorIfForeignTable(relationOid);
/*
* We should do this check here since the codes in the following lines rely
* on this relation to have a supported relation kind. More extensive checks
* will be performed in CreateDistributedTable.
*/
EnsureRelationKindSupported(relationOid);
}
/*
* EnsureRelationExists does a basic check on whether the OID belongs to
* an existing relation.
*/
void
EnsureRelationExists(Oid relationId)
{
if (!RelationExists(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("relation with OID %d does not exist",
relationId)));
}
}
/*
* CreateDistributedTable creates distributed table in the given configuration.
* This functions contains all necessary logic to create distributed tables. It
* performs necessary checks to ensure distributing the table is safe. If it is
* safe to distribute the table, this function creates distributed table metadata,
* creates shards and copies local data to shards. This function also handles
* partitioned tables by distributing its partitions as well.
*
* viaDeprecatedAPI boolean flag is not optimal way to implement this function,
* but it helps reducing code duplication a lot. We hope to remove that flag one
* day, once we deprecate master_create_distribute_table completely.
*/
void
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName, bool viaDeprecatedAPI)
{
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
* we don't want to ask user to first undistribute their citus local tables
* when creating reference or distributed tables from them.
* For this reason, here we undistribute citus local tables beforehand.
* But since UndistributeTable does not support undistributing relations
* involved in foreign key relationships, we first drop foreign keys that
* given relation is involved, then we undistribute the relation and finally
* we re-create dropped foreign keys at the end of this function.
*/
List *originalForeignKeyRecreationCommands = NIL;
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/* store foreign key creation commands that relation is involved */
originalForeignKeyRecreationCommands =
GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
INCLUDE_ALL_TABLE_TYPES);
relationId = DropFKeysAndUndistributeTable(relationId);
}
/*
* To support foreign keys between reference tables and local tables,
* we drop & re-define foreign keys at the end of this function so
* that ALTER TABLE hook does the necessary job, which means converting
* local tables to citus local tables to properly support such foreign
* keys.
*
* This function does not expect to create Citus local table, so we blindly
* create reference table when the method is DISTRIBUTE_BY_NONE.
*/
else if (distributionMethod == DISTRIBUTE_BY_NONE &&
ShouldEnableLocalReferenceForeignKeys() &&
HasForeignKeyWithLocalTable(relationId))
{
/*
* Store foreign key creation commands for foreign key relationships
* that relation has with postgres tables.
*/
originalForeignKeyRecreationCommands =
GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
INCLUDE_LOCAL_TABLES);
/*
* Soon we will convert local tables to citus local tables. As
* CreateCitusLocalTable needs to use local execution, now we
* switch to local execution beforehand so that reference table
* creation doesn't use remote execution and we don't error out
* in CreateCitusLocalTable.
*/
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
}
/*
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
viaDeprecatedAPI);
/*
* Due to dropping columns, the parent's distribution key may not match the
* partition's distribution key. The input distributionColumn belongs to
* the parent. That's why we override the distribution column of partitions
* here. See issue #5123 for details.
*/
if (PartitionTable(relationId))
{
Oid parentRelationId = PartitionParentOid(relationId);
char *distributionColumnName =
ColumnToColumnName(parentRelationId, nodeToString(distributionColumn));
distributionColumn =
FindColumnWithNameOnTargetRelation(parentRelationId, distributionColumnName,
relationId);
}
/*
* ColocationIdForNewTable assumes caller acquires lock on relationId. In our case,
* our caller already acquired lock on relationId.
*/
uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
distributionMethod, replicationModel,
shardCount, shardCountIsStrict,
colocateWithTableName,
viaDeprecatedAPI);
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
colocationId, replicationModel, viaDeprecatedAPI);
/*
* Make sure that existing reference tables have been replicated to all the nodes
* such that we can create foreign keys and joins work immediately after creation.
*/
EnsureReferenceTablesExistOnAllNodes();
/* we need to calculate these variables before creating distributed metadata */
bool localTableEmpty = TableEmpty(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
/* setting to false since this flag is only valid for citus local tables */
bool autoConverted = false;
/* create an entry for distributed table in pg_dist_partition */
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel, autoConverted);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId))
{
CreateTruncateTrigger(relationId);
}
/*
* If we are using master_create_distributed_table, we don't need to continue,
* because deprecated API does not supports the following features.
*/
if (viaDeprecatedAPI)
{
Assert(colocateWithTableName == NULL);
return;
}
/* create shards for hash distributed and reference tables */
if (distributionMethod == DISTRIBUTE_BY_HASH)
{
CreateHashDistributedTableShards(relationId, shardCount, colocatedTableId,
localTableEmpty);
}
else if (distributionMethod == DISTRIBUTE_BY_NONE)
{
/*
* This function does not expect to create Citus local table, so we blindly
* create reference table when the method is DISTRIBUTE_BY_NONE.
*/
CreateReferenceTableShard(relationId);
}
if (ShouldSyncTableMetadata(relationId))
{
if (ClusterHasKnownMetadataWorkers())
{
/*
* Ensure both sequence and its' dependencies and mark them as distributed
* before creating table metadata on workers
*/
MarkSequenceListDistributedAndPropagateWithDependencies(relationId,
dependentSequenceList);
}
CreateTableMetadataOnWorkers(relationId);
}
/*
* We've a custom way of foreign key graph invalidation,
* see InvalidateForeignKeyGraph().
*/
if (TableReferenced(relationId) || TableReferencing(relationId))
{
InvalidateForeignKeyGraph();
}
/* if this table is partitioned table, distribute its partitions too */
if (PartitionedTable(relationId))
{
List *partitionList = PartitionList(relationId);
Oid partitionRelationId = InvalidOid;
Oid namespaceId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(namespaceId);
char *relationName = get_rel_name(relationId);
char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
foreach_oid(partitionRelationId, partitionList)
{
CreateDistributedTable(partitionRelationId, distributionColumn,
distributionMethod, shardCount, false,
parentRelationName, viaDeprecatedAPI);
}
}
/* copy over data for hash distributed and reference tables */
if (distributionMethod == DISTRIBUTE_BY_HASH ||
distributionMethod == DISTRIBUTE_BY_NONE)
{
if (RegularTable(relationId))
{
CopyLocalDataIntoShards(relationId);
}
}
/*
* Now recreate foreign keys that we dropped beforehand. As modifications are not
* allowed on the relations that are involved in the foreign key relationship,
* we can skip the validation of the foreign keys.
*/
bool skip_validation = true;
ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
skip_validation);
}
/*
* EnsureSequenceTypeSupported ensures that the type of the column that uses
* a sequence on its DEFAULT is consistent with previous uses (if any) of the
* sequence in distributed tables.
* If any other distributed table uses the input sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured.
*/
void
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList)
{
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(citusTableId, &attnumList,
&dependentSequenceList, 0);
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell,
dependentSequenceList)
{
AttrNumber currentAttnum = lfirst_int(attnumCell);
Oid currentSeqOid = lfirst_oid(dependentSequenceCell);
/*
* If another distributed table is using the same sequence
* in one of its column defaults, make sure the types of the
* columns match
*/
if (currentSeqOid == seqOid)
{
Oid currentSeqTypId = GetAttributeTypeOid(citusTableId,
currentAttnum);
if (seqTypId != currentSeqTypId)
{
char *sequenceName = generate_qualified_relation_name(
seqOid);
char *citusTableName =
generate_qualified_relation_name(citusTableId);
ereport(ERROR, (errmsg(
"The sequence %s is already used for a different"
" type in column %d of the table %s",
sequenceName, currentAttnum,
citusTableName)));
}
}
}
}
}
/*
* AlterSequenceType alters the given sequence's type to the given type.
*/
void
AlterSequenceType(Oid seqOid, Oid typeOid)
{
Form_pg_sequence sequenceData = pg_get_sequencedef(seqOid);
Oid currentSequenceTypeOid = sequenceData->seqtypid;
if (currentSequenceTypeOid != typeOid)
{
AlterSeqStmt *alterSequenceStatement = makeNode(AlterSeqStmt);
char *seqNamespace = get_namespace_name(get_rel_namespace(seqOid));
char *seqName = get_rel_name(seqOid);
alterSequenceStatement->sequence = makeRangeVar(seqNamespace, seqName, -1);
Node *asTypeNode = (Node *) makeTypeNameFromOid(typeOid, -1);
SetDefElemArg(alterSequenceStatement, "as", asTypeNode);
ParseState *pstate = make_parsestate(NULL);
AlterSequence(pstate, alterSequenceStatement);
CommandCounterIncrement();
}
}
/*
* MarkSequenceListDistributedAndPropagateWithDependencies ensures sequences and their
* dependencies for the given sequence list exist on all nodes and marks them as distributed.
*/
void
MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
List *sequenceList)
{
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, sequenceList)
{
MarkSequenceDistributedAndPropagateWithDependencies(relationId, sequenceOid);
}
}
/*
* MarkSequenceDistributedAndPropagateWithDependencies ensures sequence and its'
* dependencies for the given sequence exist on all nodes and marks them as distributed.
*/
void
MarkSequenceDistributedAndPropagateWithDependencies(Oid relationId, Oid sequenceOid)
{
/* get sequence address */
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
EnsureSequenceExistOnMetadataWorkersForRelation(relationId, sequenceOid);
MarkObjectDistributed(&sequenceAddress);
}
/*
* EnsureSequenceExistOnMetadataWorkersForRelation ensures sequence for the given relation
* exist on each worker node with metadata.
*/
static void
EnsureSequenceExistOnMetadataWorkersForRelation(Oid relationId, Oid sequenceOid)
{
Assert(ShouldSyncTableMetadata(relationId));
char *ownerName = TableOwner(relationId);
List *sequenceDDLList = DDLCommandsForSequence(sequenceOid, ownerName);
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
const char *sequenceCommand = NULL;
foreach_ptr(sequenceCommand, sequenceDDLList)
{
SendCommandToWorkersWithMetadata(sequenceCommand);
}
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
}
/*
* EnsureDistributedSequencesHaveOneType first ensures that the type of the column
* in which the sequence is used as default is supported for each sequence in input
* dependentSequenceList, and then alters the sequence type if not the same with the column type.
*/
void
EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceList,
List *attnumList)
{
ListCell *attnumCell = NULL;
ListCell *dependentSequenceCell = NULL;
forboth(attnumCell, attnumList, dependentSequenceCell, dependentSequenceList)
{
AttrNumber attnum = lfirst_int(attnumCell);
Oid sequenceOid = lfirst_oid(dependentSequenceCell);
/*
* We should make sure that the type of the column that uses
* that sequence is supported
*/
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(sequenceOid, seqTypId);
/*
* Alter the sequence's data type in the coordinator if needed.
* A sequence's type is bigint by default and it doesn't change even if
* it's used in an int column. We should change the type if needed,
* and not allow future ALTER SEQUENCE ... TYPE ... commands for
* sequences used as defaults in distributed tables
*/
AlterSequenceType(sequenceOid, seqTypId);
}
}
/*
* GetFKeyCreationCommandsRelationInvolvedWithTableType returns a list of DDL
* commands to recreate the foreign keys that relation with relationId is involved
* with given table type.
*/
static List *
GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
{
int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
tableTypeFlag;
List *referencingFKeyCreationCommands =
GetForeignConstraintCommandsInternal(relationId, referencingFKeysFlag);
/* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
EXCLUDE_SELF_REFERENCES |
tableTypeFlag;
List *referencedFKeyCreationCommands =
GetForeignConstraintCommandsInternal(relationId, referencedFKeysFlag);
return list_concat(referencingFKeyCreationCommands, referencedFKeyCreationCommands);
}
/*
* DropFKeysAndUndistributeTable drops all foreign keys that relation with
* relationId is involved then undistributes it.
* Note that as UndistributeTable changes relationId of relation, this
* function also returns new relationId of relation.
* Also note that callers are responsible for storing & recreating foreign
* keys to be dropped if needed.
*/
static Oid
DropFKeysAndUndistributeTable(Oid relationId)
{
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
/* store them before calling UndistributeTable as it changes relationId */
char *relationName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
/* suppress notices messages not to be too verbose */
TableConversionParameters params = {
.relationId = relationId,
.cascadeViaForeignKeys = false,
.suppressNoticeMessages = true
};
UndistributeTable(&params);
Oid newRelationId = get_relname_relid(relationName, schemaId);
/*
* We don't expect this to happen but to be on the safe side let's error
* out here.
*/
EnsureRelationExists(newRelationId);
return newRelationId;
}
/*
* DropFKeysRelationInvolvedWithTableType drops foreign keys that relation
* with relationId is involved with given table type.
*/
static void
DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableTypeFlag)
{
int referencingFKeysFlag = INCLUDE_REFERENCING_CONSTRAINTS |
tableTypeFlag;
DropRelationForeignKeys(relationId, referencingFKeysFlag);
/* already captured self referencing foreign keys, so use EXCLUDE_SELF_REFERENCES */
int referencedFKeysFlag = INCLUDE_REFERENCED_CONSTRAINTS |
EXCLUDE_SELF_REFERENCES |
tableTypeFlag;
DropRelationForeignKeys(relationId, referencedFKeysFlag);
}
/*
* DecideReplicationModel function decides which replication model should be
* used depending on given distribution configuration.
*/
static char
DecideReplicationModel(char distributionMethod, char *colocateWithTableName, bool
viaDeprecatedAPI)
{
if (viaDeprecatedAPI)
{
return REPLICATION_MODEL_COORDINATOR;
}
else if (distributionMethod == DISTRIBUTE_BY_NONE)
{
return REPLICATION_MODEL_2PC;
}
else if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
!IsColocateWithNone(colocateWithTableName))
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid colocatedRelationId = ResolveRelationId(colocateWithTableNameText, false);
CitusTableCacheEntry *targetTableEntry = GetCitusTableCacheEntry(
colocatedRelationId);
char replicationModel = targetTableEntry->replicationModel;
return replicationModel;
}
else if (distributionMethod == DISTRIBUTE_BY_HASH &&
!DistributedTableReplicationIsEnabled())
{
return REPLICATION_MODEL_STREAMING;
}
else
{
return REPLICATION_MODEL_COORDINATOR;
}
/* we should not reach to this point */
return REPLICATION_MODEL_INVALID;
}
/*
* CreateHashDistributedTableShards creates shards of given hash distributed table.
*/
static void
CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty)
{
bool useExclusiveConnection = false;
/*
* Decide whether to use exclusive connections per placement or not. Note that
* if the local table is not empty, we cannot use sequential mode since the COPY
* operation that'd load the data into shards currently requires exclusive
* connections.
*/
if (RegularTable(relationId))
{
useExclusiveConnection = CanUseExclusiveConnections(relationId,
localTableEmpty);
}
if (colocatedTableId != InvalidOid)
{
CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection);
}
else
{
/*
* This path is only reached by create_distributed_table for the distributed
* tables which will not be part of an existing colocation group. Therefore,
* we can directly use ShardReplicationFactor global variable here.
*/
CreateShardsWithRoundRobinPolicy(relationId, shardCount, ShardReplicationFactor,
useExclusiveConnection);
}
}
/*
* ColocationIdForNewTable returns a colocation id for hash-distributed table
* according to given configuration. If there is no such configuration, it
* creates one and returns colocation id of newly the created colocation group.
* For append and range distributed tables, this function errors out if
* colocateWithTableName parameter is not NULL, otherwise directly returns
* INVALID_COLOCATION_ID.
*
* This function assumes its caller take necessary lock on relationId to
* prevent possible changes on it.
*/
static uint32
ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char replicationModel,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName, bool viaDeprecatedAPI)
{
uint32 colocationId = INVALID_COLOCATION_ID;
if (viaDeprecatedAPI)
{
return colocationId;
}
else if (distributionMethod == DISTRIBUTE_BY_APPEND ||
distributionMethod == DISTRIBUTE_BY_RANGE)
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
return colocationId;
}
else if (distributionMethod == DISTRIBUTE_BY_NONE)
{
return CreateReferenceTableColocationId();
}
else
{
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
Assert(distributionMethod == DISTRIBUTE_BY_HASH);
Relation pgDistColocation = table_open(DistColocationRelationId(), ExclusiveLock);
Oid distributionColumnType = distributionColumn->vartype;
Oid distributionColumnCollation = get_typcollation(distributionColumnType);
bool createdColocationGroup = false;
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
{
/* check for default colocation group */
colocationId = ColocationId(shardCount, ShardReplicationFactor,
distributionColumnType,
distributionColumnCollation);
/*
* if the shardCount is strict then we check if the shard count
* of the colocated table is actually shardCount
*/
if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
{
Oid colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
CitusTableCacheEntry *cacheEntry =
GetCitusTableCacheEntry(colocatedTableId);
int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
if (colocatedTableShardCount != shardCount)
{
colocationId = INVALID_COLOCATION_ID;
}
}
}
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,
distributionColumnType,
distributionColumnCollation);
createdColocationGroup = true;
}
}
else if (IsColocateWithNone(colocateWithTableName))
{
colocationId = GetNextColocationId();
createdColocationGroup = true;
}
else
{
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText, false);
EnsureTableCanBeColocatedWith(relationId, replicationModel,
distributionColumnType, sourceRelationId);
colocationId = TableColocationId(sourceRelationId);
}
/*
* If we created a new colocation group then we need to keep the lock to
* prevent a concurrent create_distributed_table call from creating another
* colocation group with the same parameters. If we're using an existing
* colocation group then other transactions will use the same one.
*/
if (createdColocationGroup)
{
/* keep the exclusive lock */
table_close(pgDistColocation, NoLock);
}
else
{
/* release the exclusive lock */
table_close(pgDistColocation, ExclusiveLock);
}
}
return colocationId;
}
/*
* EnsureRelationCanBeDistributed checks whether Citus can safely distribute given
* relation with the given configuration. We perform almost all safety checks for
* distributing table here. If there is an unsatisfied requirement, we error out
* and do not distribute the table.
*
* This function assumes, callers have already acquired necessary locks to ensure
* there will not be any change in the given relation.
*/
static void
EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
char distributionMethod, uint32 colocationId,
char replicationModel, bool viaDeprecatedAPI)
{
Oid parentRelationId = InvalidOid;
EnsureTableNotDistributed(relationId);
EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI);
EnsureRelationHasNoTriggers(relationId);
/* we assume callers took necessary locks */
Relation relation = relation_open(relationId, NoLock);
TupleDesc relationDesc = RelationGetDescr(relation);
char *relationName = RelationGetRelationName(relation);
ErrorIfTableIsACatalogTable(relation);
/* verify target relation does not use identity columns */
if (RelationUsesIdentityColumns(relationDesc))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", relationName),
errdetail("Distributed relations must not use GENERATED "
"... AS IDENTITY.")));
}
/* verify target relation is not distributed by a generated columns */
if (distributionMethod != DISTRIBUTE_BY_NONE &&
DistributionColumnUsesGeneratedStoredColumn(relationDesc, distributionColumn))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", relationName),
errdetail("Distribution column must not use GENERATED ALWAYS "
"AS (...) STORED.")));
}
/* check for support function needed by specified partition method */
if (distributionMethod == DISTRIBUTE_BY_HASH)
{
Oid hashSupportFunction = SupportFunctionForColumn(distributionColumn,
HASH_AM_OID,
HASHSTANDARD_PROC);
if (hashSupportFunction == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify a hash function for type %s",
format_type_be(distributionColumn->vartype)),
errdatatype(distributionColumn->vartype),
errdetail("Partition column types must have a hash function "
"defined to use hash partitioning.")));
}
if (distributionColumn->varcollid != InvalidOid &&
!get_collation_isdeterministic(distributionColumn->varcollid))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("Hash distributed partition columns may not use "
"a non deterministic collation")));
}
}
else if (distributionMethod == DISTRIBUTE_BY_RANGE)
{
Oid btreeSupportFunction = SupportFunctionForColumn(distributionColumn,
BTREE_AM_OID, BTORDER_PROC);
if (btreeSupportFunction == InvalidOid)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("could not identify a comparison function for type %s",
format_type_be(distributionColumn->vartype)),
errdatatype(distributionColumn->vartype),
errdetail("Partition column types must have a comparison function "
"defined to use range partitioning.")));
}
}
if (PartitionTable(relationId))
{
parentRelationId = PartitionParentOid(relationId);
}
/* partitions cannot be distributed if their parent is not distributed */
if (PartitionTable(relationId) && !IsCitusTable(parentRelationId))
{
char *parentRelationName = get_rel_name(parentRelationId);
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" which is partition of "
"\"%s\"", relationName, parentRelationName),
errdetail("Citus does not support distributing partitions "
"if their parent is not distributed table."),
errhint("Distribute the partitioned table \"%s\" instead.",
parentRelationName)));
}
/*
* These checks are mostly for partitioned tables not partitions because we prevent
* distributing partitions directly in the above check. However, partitions can still
* reach this point because, we call CreateDistributedTable for partitions if their
* parent table is distributed.
*/
if (PartitionedTable(relationId))
{
/* we cannot distribute partitioned tables with master_create_distributed_table */
if (viaDeprecatedAPI)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
"with create_distributed_table UDF")));
}
/* distributing partitioned tables in only supported for hash-distribution */
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing partitioned tables in only supported "
"for hash-distributed tables")));
}
/* we don't support distributing tables with multi-level partitioning */
if (PartitionTable(relationId))
{
char *parentRelationName = get_rel_name(parentRelationId);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("distributing multi-level partitioned tables "
"is not supported"),
errdetail("Relation \"%s\" is partitioned table itself and "
"it is also partition of relation \"%s\".",
relationName, parentRelationName)));
}
}
ErrorIfUnsupportedConstraint(relation, distributionMethod, replicationModel,
distributionColumn, colocationId);
ErrorIfUnsupportedPolicy(relation);
relation_close(relation, NoLock);
}
/*
* ErrorIfTemporaryTable errors out if the given table is a temporary table.
*/
static void
ErrorIfTemporaryTable(Oid relationId)
{
if (get_rel_persistence(relationId) == RELPERSISTENCE_TEMP)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute a temporary table")));
}
}
/*
* ErrorIfTableIsACatalogTable is a helper function to error out for citus
* table creation from a catalog table.
*/
void
ErrorIfTableIsACatalogTable(Relation relation)
{
if (relation->rd_rel->relnamespace != PG_CATALOG_NAMESPACE)
{
return;
}
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create a citus table from a catalog table")));
}
/*
* EnsureTableCanBeColocatedWith checks whether a given replication model and
* distribution column type is suitable to distribute a table to be colocated
* with given source table.
*
* We only pass relationId to provide meaningful error messages.
*/
static void
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
Oid distributionColumnType, Oid sourceRelationId)
{
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
char sourceReplicationModel = sourceTableEntry->replicationModel;
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
if (sourceReplicationModel != replicationModel)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Replication models don't match for %s and %s.",
sourceRelationName, relationName)));
}
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
if (sourceDistributionColumnType != distributionColumnType)
{
char *relationName = get_rel_name(relationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, relationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
relationName)));
}
}
/*
* EnsureLocalTableEmptyIfNecessary errors out if the function should be empty
* according to ShouldLocalTableBeEmpty but it is not.
*/
static void
EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod,
bool viaDeprecatedAPI)
{
if (ShouldLocalTableBeEmpty(relationId, distributionMethod, viaDeprecatedAPI))
{
EnsureLocalTableEmpty(relationId);
}
}
/*
* ShouldLocalTableBeEmpty returns true if the local table should be empty
* before creating a citus table.
* In some cases, it is possible and safe to send local data to shards while
* distributing the table. In those cases, we can distribute non-empty local
* tables. This function checks the distributionMethod and relation kind to
* see whether we need to be ensure emptiness of local table.
*/
static bool
ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod,
bool viaDeprecatedAPI)
{
bool shouldLocalTableBeEmpty = false;
if (viaDeprecatedAPI)
{
/* we don't support copying local data via deprecated API */
shouldLocalTableBeEmpty = true;
}
else if (distributionMethod != DISTRIBUTE_BY_HASH &&
distributionMethod != DISTRIBUTE_BY_NONE)
{
/*
* We only support hash distributed tables and reference tables
* for initial data loading
*/
shouldLocalTableBeEmpty = true;
}
else if (!RegularTable(relationId))
{
/*
* We only support tables and partitioned tables for initial
* data loading
*/
shouldLocalTableBeEmpty = true;
}
return shouldLocalTableBeEmpty;
}
/*
* EnsureLocalTableEmpty errors out if the local table is not empty.
*/
static void
EnsureLocalTableEmpty(Oid relationId)
{
char *relationName = get_rel_name(relationId);
bool localTableEmpty = TableEmpty(relationId);
if (!localTableEmpty)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"", relationName),
errdetail("Relation \"%s\" contains data.", relationName),
errhint("Empty your table before distributing it.")));
}
}
/*
* EnsureTableNotDistributed errors out if the table is distributed.
*/
void
EnsureTableNotDistributed(Oid relationId)
{
char *relationName = get_rel_name(relationId);
bool isCitusTable = IsCitusTable(relationId);
if (isCitusTable)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" is already distributed",
relationName)));
}
}
/*
* EnsureRelationHasNoTriggers errors out if the given table has triggers on
* it. See also GetExplicitTriggerIdList function's comment for the triggers this
* function errors out.
*/
static void
EnsureRelationHasNoTriggers(Oid relationId)
{
List *explicitTriggerIds = GetExplicitTriggerIdList(relationId);
if (list_length(explicitTriggerIds) > 0)
{
char *relationName = get_rel_name(relationId);
Assert(relationName != NULL);
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" because it has "
"triggers ", relationName),
errdetail("Citus does not support distributing tables with "
"triggers."),
errhint("Drop all the triggers on \"%s\" and retry.",
relationName)));
}
}
/*
* LookupDistributionMethod maps the oids of citus.distribution_type enum
* values to pg_dist_partition.partmethod values.
*
* The passed in oid has to belong to a value of citus.distribution_type.
*/
char
LookupDistributionMethod(Oid distributionMethodOid)
{
char distributionMethod = 0;
HeapTuple enumTuple = SearchSysCache1(ENUMOID, ObjectIdGetDatum(
distributionMethodOid));
if (!HeapTupleIsValid(enumTuple))
{
ereport(ERROR, (errmsg("invalid internal value for enum: %u",
distributionMethodOid)));
}
Form_pg_enum enumForm = (Form_pg_enum) GETSTRUCT(enumTuple);
const char *enumLabel = NameStr(enumForm->enumlabel);
if (strncmp(enumLabel, "append", NAMEDATALEN) == 0)
{
distributionMethod = DISTRIBUTE_BY_APPEND;
}
else if (strncmp(enumLabel, "hash", NAMEDATALEN) == 0)
{
distributionMethod = DISTRIBUTE_BY_HASH;
}
else if (strncmp(enumLabel, "range", NAMEDATALEN) == 0)
{
distributionMethod = DISTRIBUTE_BY_RANGE;
}
else
{
ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
}
ReleaseSysCache(enumTuple);
return distributionMethod;
}
/*
* SupportFunctionForColumn locates a support function given a column, an access method,
* and and id of a support function. This function returns InvalidOid if there is no
* support function for the operator class family of the column, but if the data type
* of the column has no default operator class whatsoever, this function errors out.
*/
static Oid
SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber)
{
Oid columnOid = partitionColumn->vartype;
Oid operatorClassId = GetDefaultOpClass(columnOid, accessMethodId);
/* currently only support using the default operator class */
if (operatorClassId == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("data type %s has no default operator class for specified"
" partition method", format_type_be(columnOid)),
errdatatype(columnOid),
errdetail("Partition column types must have a default operator"
" class defined.")));
}
Oid operatorFamilyId = get_opclass_family(operatorClassId);
Oid operatorClassInputType = get_opclass_input_type(operatorClassId);
Oid supportFunctionOid = get_opfamily_proc(operatorFamilyId, operatorClassInputType,
operatorClassInputType,
supportFunctionNumber);
return supportFunctionOid;
}
/*
* TableEmpty function checks whether given table contains any row and
* returns false if there is any data.
*/
bool
TableEmpty(Oid tableId)
{
Oid schemaId = get_rel_namespace(tableId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(tableId);
char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
StringInfo selectTrueQueryString = makeStringInfo();
bool readOnly = true;
int spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
appendStringInfo(selectTrueQueryString, SELECT_TRUE_QUERY, tableQualifiedName);
int spiQueryResult = SPI_execute(selectTrueQueryString->data, readOnly, 0);
if (spiQueryResult != SPI_OK_SELECT)
{
ereport(ERROR, (errmsg("execution was not successful \"%s\"",
selectTrueQueryString->data)));
}
/* we expect that SELECT TRUE query will return single value in a single row OR empty set */
Assert(SPI_processed == 1 || SPI_processed == 0);
bool localTableEmpty = !SPI_processed;
SPI_finish();
return localTableEmpty;
}
/*
* CanUseExclusiveConnections checks if we can open parallel connections
* while creating shards. We simply error out if we need to execute
* sequentially but there is data in the table, since we cannot copy the
* data to shards sequentially.
*/
static bool
CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
{
bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId);
bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
hasForeignKeyToReferenceTable;
if (shouldRunSequential && ParallelQueryExecutedInTransaction())
{
/*
* We decided to use sequential execution. It's either because relation
* has a pre-existing foreign key to a reference table or because we
* decided to use sequential execution due to a query executed in the
* current xact beforehand.
* We have specific error messages for either cases.
*/
char *relationName = get_rel_name(relationId);
if (hasForeignKeyToReferenceTable)
{
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
"transaction because it has a foreign key to "
"a reference table", relationName),
errdetail("If a hash distributed table has a foreign key "
"to a reference table, it has to be created "
"in sequential mode before any parallel commands "
"have been executed in the same transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because "
"a parallel query was executed in this transaction",
relationName),
errhint("If you have manually set "
"citus.multi_shard_modify_mode to 'sequential', "
"try with 'parallel' option. ")));
}
}
else if (shouldRunSequential)
{
return false;
}
else if (!localTableEmpty || IsMultiStatementTransaction())
{
return true;
}
return false;
}
/*
* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
* and assigns citus_truncate_trigger() as handler.
*/
void
CreateTruncateTrigger(Oid relationId)
{
StringInfo triggerName = makeStringInfo();
bool internal = true;
appendStringInfo(triggerName, "truncate_trigger");
CreateTrigStmt *trigger = makeNode(CreateTrigStmt);
trigger->trigname = triggerName->data;
trigger->relation = NULL;
trigger->funcname = SystemFuncName(CITUS_TRUNCATE_TRIGGER_NAME);
trigger->args = NIL;
trigger->row = false;
trigger->timing = TRIGGER_TYPE_AFTER;
trigger->events = TRIGGER_TYPE_TRUNCATE;
trigger->columns = NIL;
trigger->whenClause = NULL;
trigger->isconstraint = false;
CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
InvalidOid, InvalidOid, NULL,
internal, false);
}
/*
* RegularTable function returns true if given table's relation kind is RELKIND_RELATION
* or RELKIND_PARTITIONED_TABLE otherwise it returns false.
*/
bool
RegularTable(Oid relationId)
{
char relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE)
{
return true;
}
return false;
}
/*
* CopyLocalDataIntoShards copies data from the local table, which is hidden
* after converting it to a distributed table, into the shards of the distributed
* table. For partitioned tables, this functions returns without copying the data
* because we call this function for both partitioned tables and its partitions.
* Returning early saves us from copying data to workers twice.
*
* This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
* We cannot use a regular COPY here since that cannot read from a table. Instead
* we read from the table and pass each tuple to the CitusCopyDestReceiver which
* opens a connection and starts a COPY for each shard placement that will have
* data.
*
* We could call the planner and executor here and send the output to the
* DestReceiver, but we are in a tricky spot here since Citus is already
* intercepting queries on this table in the planner and executor hooks and we
* want to read from the local table. To keep it simple, we perform a heap scan
* directly on the table.
*
* Any writes on the table that are started during this operation will be handled
* as distributed queries once the current transaction commits. SELECTs will
* continue to read from the local table until the current transaction commits,
* after which new SELECTs will be handled as distributed queries.
*
* After copying local data into the distributed table, the local data remains
* in place and should be truncated at a later time.
*/
static void
CopyLocalDataIntoShards(Oid distributedRelationId)
{
/* take an ExclusiveLock to block all operations except SELECT */
Relation distributedRelation = table_open(distributedRelationId, ExclusiveLock);
/*
* Skip copying from partitioned tables, we will copy the data from
* partition to partition's shards.
*/
if (PartitionedTable(distributedRelationId))
{
table_close(distributedRelation, NoLock);
return;
}
/*
* All writes have finished, make sure that we can see them by using the
* latest snapshot. We use GetLatestSnapshot instead of
* GetTransactionSnapshot since the latter would not reveal all writes
* in serializable or repeatable read mode. Note that subsequent reads
* from the distributed table would reveal those writes, temporarily
* violating the isolation level. However, this seems preferable over
* dropping the writes entirely.
*/
PushActiveSnapshot(GetLatestSnapshot());
/* get the table columns */
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
TupleTableSlot *slot = CreateTableSlotForRel(distributedRelation);
List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
/* determine the partition column in the tuple descriptor */
Var *partitionColumn = PartitionColumn(distributedRelationId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* initialise per-tuple memory context */
EState *estate = CreateExecutorState();
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList,
partitionColumnIndex,
estate, NULL);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);
DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate);
/* finish writing into the shards */
copyDest->rShutdown(copyDest);
copyDest->rDestroy(copyDest);
/* free memory and close the relation */
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
table_close(distributedRelation, NoLock);
PopActiveSnapshot();
}
/*
* DoCopyFromLocalTableIntoShards performs a copy operation
* from local tables into shards.
*/
static void
DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate)
{
/* begin reading from local table */
TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0,
NULL);
MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
uint64 rowsCopied = 0;
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
{
/* send tuple it to a shard */
copyDest->receiveSlot(slot, copyDest);
/* clear tuple memory */
ResetPerTupleExprContext(estate);
/* make sure we roll back on cancellation */
CHECK_FOR_INTERRUPTS();
if (rowsCopied == 0)
{
ereport(NOTICE, (errmsg("Copying data from local table...")));
}
rowsCopied++;
if (rowsCopied % LOG_PER_TUPLE_AMOUNT == 0)
{
ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
}
}
if (rowsCopied % LOG_PER_TUPLE_AMOUNT != 0)
{
ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
}
if (rowsCopied > 0)
{
char *qualifiedRelationName =
generate_qualified_relation_name(RelationGetRelid(distributedRelation));
ereport(NOTICE, (errmsg("copying the data has completed"),
errdetail("The local data in the table is no longer visible, "
"but is still on disk."),
errhint("To remove the local data, run: SELECT "
"truncate_local_data_after_distributing_table($$%s$$)",
qualifiedRelationName)));
}
MemoryContextSwitchTo(oldContext);
/* finish reading from the local table */
table_endscan(scan);
}
/*
* TupleDescColumnNameList returns a list of column names for the given tuple
* descriptor as plain strings.
*/
static List *
TupleDescColumnNameList(TupleDesc tupleDescriptor)
{
List *columnNameList = NIL;
for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
{
Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
char *columnName = NameStr(currentColumn->attname);
if (currentColumn->attisdropped ||
currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED
)
{
continue;
}
columnNameList = lappend(columnNameList, columnName);
}
return columnNameList;
}
/*
* RelationUsesIdentityColumns returns whether a given relation uses
* GENERATED ... AS IDENTITY
*/
bool
RelationUsesIdentityColumns(TupleDesc relationDesc)
{
for (int attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++)
{
Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, attributeIndex);
if (attributeForm->attidentity != '\0')
{
return true;
}
}
return false;
}
/*
* DistributionColumnUsesGeneratedStoredColumn returns whether a given relation uses
* GENERATED ALWAYS AS (...) STORED on distribution column
*/
static bool
DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
Var *distributionColumn)
{
Form_pg_attribute attributeForm = TupleDescAttr(relationDesc,
distributionColumn->varattno - 1);
if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED)
{
return true;
}
return false;
}
/*
* ErrorIfForeignTable errors out if the relation with given relationOid
* is a foreign table.
*/
static void
ErrorIfForeignTable(Oid relationOid)
{
if (IsForeignTable(relationOid))
{
char *relname = get_rel_name(relationOid);
char *qualifiedRelname = generate_qualified_relation_name(relationOid);
ereport(ERROR, (errmsg("foreign tables cannot be distributed"),
(errhint("Can add foreign table \"%s\" to metadata by running: "
"SELECT citus_add_local_table_to_metadata($$%s$$);",
relname, qualifiedRelname))));
}
}