diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0aeec0165..0a783f49b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -29,6 +29,7 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" @@ -69,28 +70,30 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ -static void CreateReferenceTable(Oid distributedRelationId); -static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, +static char AppropriateReplicationModel(char distributionMethod, bool viaDeprecatedAPI); +static void CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, + bool localTableEmpty); +static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, char distributionMethod, char replicationModel, - uint32 colocationId); -static char LookupDistributionMethod(Oid distributionMethodOid); -static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, - int16 supportFunctionNumber); -static bool LocalTableEmpty(Oid tableId); -static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, - char *colocateWithTableName, - int shardCount, int replicationFactor); -static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalDataIntoShards(Oid relationId); -static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); -#if (PG_VERSION_NUM >= 100000) -static bool RelationUsesIdentityColumns(TupleDesc relationDesc); -#endif - + char *colocateWithTableName, bool viaDeprecatedAPI); +static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, + char distributionMethod, uint32 colocationId, + char replicationModel, bool viaDeprecatedAPI); +static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, + Oid distributionColumnType, + Oid sourceRelationId); static void EnsureSchemaExistsOnAllNodes(Oid relationId); static void EnsureLocalTableEmpty(Oid relationId); static void EnsureTableNotDistributed(Oid relationId); -static void EnsureIsTableId(Oid relationId); +static char LookupDistributionMethod(Oid distributionMethodOid); +static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, + int16 supportFunctionNumber); +static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod, + bool viaDepracatedAPI); +static bool LocalTableEmpty(Oid tableId); +static void CopyLocalDataIntoShards(Oid relationId); +static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); +static bool RelationUsesIdentityColumns(TupleDesc relationDesc); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -102,37 +105,50 @@ PG_FUNCTION_INFO_V1(create_reference_table); * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. * - * Note that this udf is depreciated and cannot create colocated tables, so we + * Note that this UDF is deprecated and cannot create colocated tables, so we * always use INVALID_COLOCATION_ID. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) { - Oid distributedRelationId = PG_GETARG_OID(0); + Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - char *distributionColumnName = text_to_cstring(distributionColumnText); - char distributionMethod = LookupDistributionMethod(distributionMethodOid); + char *distributionColumnName = NULL; + Var *distributionColumn = NULL; + char distributionMethod = 0; + char *colocateWithTableName = NULL; + bool viaDeprecatedAPI = true; + + Relation relation = NULL; - EnsureTableNotDistributed(distributedRelationId); - EnsureLocalTableEmpty(distributedRelationId); - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); - if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) - { - ereport(NOTICE, (errmsg("using statement-based replication"), - errdetail("The current replication_model setting is " - "'streaming', which is not supported by " - "master_create_distributed_table."), - errhint("Use create_distributed_table to use the streaming " - "replication model."))); - } + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); - ConvertToDistributedTable(distributedRelationId, distributionColumnName, - distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); + + distributionColumnName = text_to_cstring(distributionColumnText); + distributionColumn = BuildDistributionKeyFromColumnName(relation, + distributionColumnName); + distributionMethod = LookupDistributionMethod(distributionMethodOid); + + CreateDistributedTable(relationId, distributionColumn, distributionMethod, + colocateWithTableName, viaDeprecatedAPI); + + relation_close(relation, NoLock); PG_RETURN_VOID(); } @@ -146,90 +162,53 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - text *distributionColumnText = PG_GETARG_TEXT_P(1); - Oid distributionMethodOid = PG_GETARG_OID(2); - - char *distributionColumnName = text_to_cstring(distributionColumnText); - char distributionMethod = LookupDistributionMethod(distributionMethodOid); + Oid relationId = InvalidOid; + text *distributionColumnText = NULL; + Oid distributionMethodOid = InvalidOid; text *colocateWithTableNameText = NULL; + + Relation relation = NULL; + char *distributionColumnName = NULL; + Var *distributionColumn = NULL; + char distributionMethod = 0; + char *colocateWithTableName = NULL; - EnsureCoordinator(); + bool viaDeprecatedAPI = false; + CheckCitusVersion(ERROR); + EnsureCoordinator(); - /* guard against a binary update without a function update */ - if (PG_NARGS() >= 4) - { - colocateWithTableNameText = PG_GETARG_TEXT_P(3); - colocateWithTableName = text_to_cstring(colocateWithTableNameText); - } - else - { - colocateWithTableName = "default"; - } + relationId = PG_GETARG_OID(0); + distributionColumnText = PG_GETARG_TEXT_P(1); + distributionMethodOid = PG_GETARG_OID(2); + colocateWithTableNameText = PG_GETARG_TEXT_P(3); - /* check if we try to colocate with hash distributed tables */ - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && - pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) - { - Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); - char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); - if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || - distributionMethod != DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot distribute relation"), - errdetail("Currently, colocate_with option is only supported " - "for hash distributed tables."))); - } - } + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); - /* if distribution method is not hash, just create partition metadata */ - if (distributionMethod != DISTRIBUTE_BY_HASH) - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + distributionColumnName = text_to_cstring(distributionColumnText); + distributionColumn = BuildDistributionKeyFromColumnName(relation, + distributionColumnName); + distributionMethod = LookupDistributionMethod(distributionMethodOid); - if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) - { - ereport(NOTICE, (errmsg("using statement-based replication"), - errdetail("Streaming replication is supported only for " - "hash-distributed tables."))); - } + colocateWithTableName = text_to_cstring(colocateWithTableNameText); - ConvertToDistributedTable(relationId, distributionColumnName, - distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); - PG_RETURN_VOID(); - } + CreateDistributedTable(relationId, distributionColumn, distributionMethod, + colocateWithTableName, viaDeprecatedAPI); - /* use configuration values for shard count and shard replication factor */ - CreateHashDistributedTable(relationId, distributionColumnName, - colocateWithTableName, ShardCount, - ShardReplicationFactor); - - if (ShouldSyncTableMetadata(relationId)) - { - CreateTableMetadataOnWorkers(relationId); - } - - PG_RETURN_VOID(); -} - - -/* - * create_reference_table accepts a table and then it creates a distributed - * table which has one shard and replication factor is set to - * the worker count. - */ -Datum -create_reference_table(PG_FUNCTION_ARGS) -{ - Oid relationId = PG_GETARG_OID(0); - - CreateReferenceTable(relationId); + relation_close(relation, NoLock); PG_RETURN_VOID(); } @@ -240,23 +219,41 @@ create_reference_table(PG_FUNCTION_ARGS) * created table has one shard and replication factor is set to the active worker * count. In fact, the above is the definition of a reference table in Citus. */ -static void -CreateReferenceTable(Oid relationId) +Datum +create_reference_table(PG_FUNCTION_ARGS) { - uint32 colocationId = INVALID_COLOCATION_ID; + Oid relationId = PG_GETARG_OID(0); + + Relation relation = NULL; + char *colocateWithTableName = NULL; List *workerNodeList = NIL; - int replicationFactor = 0; - char *distributionColumnName = NULL; - char relationKind = 0; + int workerCount = 0; + Var *distributionColumn = NULL; + + bool viaDeprecatedAPI = false; EnsureCoordinator(); CheckCitusVersion(ERROR); + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); + + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); + workerNodeList = ActivePrimaryNodeList(); - replicationFactor = list_length(workerNodeList); + workerCount = list_length(workerNodeList); /* if there are no workers, error out */ - if (replicationFactor == 0) + if (workerCount == 0) { char *relationName = get_rel_name(relationId); @@ -265,79 +262,320 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } - /* relax empty table requirement for regular (non-foreign) tables */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) + + CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE, + colocateWithTableName, viaDeprecatedAPI); + + relation_close(relation, NoLock); + + PG_RETURN_VOID(); +} + + +/* + * CreateDistributedTable creates distributed table in the given configuration. + * This functions contains all necessary logic to create distributed tables. It + * perform necessary checks to ensure distributing the table is safe. If it is + * safe to distribute the table, this function creates distributed table metadata, + * creates shards and copies local data to shards. + * + * viaDeprecatedAPI boolean flag is not optimal way to implement this function, + * but it helps reducing code duplication a lot. We hope to remove that flag one + * day, once we deprecate master_create_distribute_table completely. + */ +void +CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, + char *colocateWithTableName, bool viaDeprecatedAPI) +{ + char replicationModel = REPLICATION_MODEL_INVALID; + uint32 colocationId = INVALID_COLOCATION_ID; + Oid colocatedTableId = InvalidOid; + bool localTableEmpty = false; + + Relation colocatedRelation = NULL; + + replicationModel = AppropriateReplicationModel(distributionMethod, viaDeprecatedAPI); + + /* + * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, + * our caller already acquired lock on relationId. + */ + colocationId = ColocationIdForNewTable(relationId, distributionColumn, + distributionMethod, replicationModel, + colocateWithTableName, viaDeprecatedAPI); + + EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, + colocationId, replicationModel, viaDeprecatedAPI); + + /* we need to calculate these variables before creating distributed metadata */ + localTableEmpty = LocalTableEmpty(relationId); + colocatedTableId = ColocatedTableId(colocationId); + if (colocatedTableId != InvalidOid) { - EnsureTableNotDistributed(relationId); - } - else - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + /* + * We take lock on colocatedTableId, because we want to ensure that colocated + * table is not dropped until we create all colocated shards. + */ + colocatedRelation = relation_open(colocatedTableId, AccessShareLock); } - colocationId = CreateReferenceTableColocationId(); + /* create an entry for distributed table in pg_dist_partition */ + InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, + colocationId, replicationModel); - /* first, convert the relation into distributed relation */ - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); - - /* now, create the single shard replicated to all nodes */ - CreateReferenceTableShard(relationId); - - CreateTableMetadataOnWorkers(relationId); - - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) + /* foreign tables does not support TRUNCATE trigger */ + if (RegularTable(relationId)) { - CopyLocalDataIntoShards(relationId); + CreateTruncateTrigger(relationId); + } + + /* + * If we are using master_create_distributed_table, we don't need to continue, + * because deprecated API does not supports the following features. + */ + if (viaDeprecatedAPI) + { + /* + * We exit early but there is no need to close colocatedRelation. Because + * if viaDeprecatedAPI is true, we never open colocatedRelation in the first + * place. + */ + Assert(colocatedRelation == NULL); + + return; + } + + /* create shards for hash distributed and reference tables */ + if (distributionMethod == DISTRIBUTE_BY_HASH) + { + CreateHashDistributedTableShards(relationId, colocatedTableId, localTableEmpty); + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + CreateReferenceTableShard(relationId); + } + + /* copy over data for hash distributed and reference tables */ + if (distributionMethod == DISTRIBUTE_BY_HASH || + distributionMethod == DISTRIBUTE_BY_NONE) + { + if (RegularTable(relationId)) + { + CopyLocalDataIntoShards(relationId); + } + } + + if (colocatedRelation != NULL) + { + relation_close(colocatedRelation, NoLock); + } + + if (ShouldSyncTableMetadata(relationId)) + { + CreateTableMetadataOnWorkers(relationId); } } /* - * ConvertToDistributedTable converts the given regular PostgreSQL table into a - * distributed table. First, it checks if the given table can be distributed, - * then it creates related tuple in pg_dist_partition. - * - * XXX: We should perform more checks here to see if this table is fit for - * partitioning. At a minimum, we should validate the following: (i) this node - * runs as the master node, (ii) table does not make use of the inheritance - * mechanism and (iii) table does not have collated columns. + * AppropriateReplicationModel function decides which replication model should be + * used depending on given distribution configuration and global ReplicationModel + * variable. If ReplicationModel conflicts with distribution configuration, this + * function errors out. + */ +static char +AppropriateReplicationModel(char distributionMethod, bool viaDeprecatedAPI) +{ + if (viaDeprecatedAPI) + { + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("The current replication_model setting is " + "'streaming', which is not supported by " + "master_create_distributed_table."), + errhint("Use create_distributed_table to use the streaming " + "replication model."))); + } + + return REPLICATION_MODEL_COORDINATOR; + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + return REPLICATION_MODEL_2PC; + } + else if (distributionMethod == DISTRIBUTE_BY_HASH) + { + return ReplicationModel; + } + else + { + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("Streaming replication is supported only for " + "hash-distributed tables."))); + } + + return REPLICATION_MODEL_COORDINATOR; + } + + /* we should not reach to this point */ + return REPLICATION_MODEL_INVALID; +} + + +/* + * CreateHashDistributedTableShards creates shards of given hash distributed table. */ static void -ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, char replicationModel, - uint32 colocationId) +CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, + bool localTableEmpty) +{ + bool useExclusiveConnection = false; + + /* + * Ensure schema exists on each worker node. We can not run this function + * transactionally, since we may create shards over separate sessions and + * shard creation depends on the schema being present and visible from all + * sessions. + */ + EnsureSchemaExistsOnAllNodes(relationId); + + if (RegularTable(relationId)) + { + useExclusiveConnection = IsTransactionBlock() || !localTableEmpty; + } + + if (colocatedTableId != InvalidOid) + { + CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); + } + else + { + /* + * This path is only reached by create_distributed_table for the distributed + * tables which will not be part of an existing colocation group. Therefore, + * we can directly use ShardCount and ShardReplicationFactor global variables + * here. + */ + CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor, + useExclusiveConnection); + } +} + + +/* + * ColocationIdForNewTable returns a colocation id for hash-distributed table + * according to given configuration. If there is no such configuration, it + * creates one and returns colocation id of newly the created colocation group. + * For append and range distributed tables, this function errors out if + * colocateWithTableName parameter is not NULL, otherwise directly returns + * INVALID_COLOCATION_ID. + * + * This function assumes its caller take necessary lock on relationId to + * prevent possible changes on it. + */ +static uint32 +ColocationIdForNewTable(Oid relationId, Var *distributionColumn, + char distributionMethod, char replicationModel, + char *colocateWithTableName, bool viaDeprecatedAPI) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + + if (viaDeprecatedAPI) + { + return colocationId; + } + else if (distributionMethod == DISTRIBUTE_BY_APPEND || + distributionMethod == DISTRIBUTE_BY_RANGE) + { + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + + return colocationId; + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + return CreateReferenceTableColocationId(); + } + else + { + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. + */ + Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + Oid distributionColumnType = distributionColumn->vartype; + + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) + { + /* check for default colocation group */ + colocationId = ColocationId(ShardCount, ShardReplicationFactor, + distributionColumnType); + + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, + distributionColumnType); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + colocationId = GetNextColocationId(); + } + else + { + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText); + + EnsureTableCanBeColocatedWith(relationId, replicationModel, + distributionColumnType, sourceRelationId); + + colocationId = TableColocationId(sourceRelationId); + } + + heap_close(pgDistColocation, NoLock); + } + + return colocationId; +} + + +/* + * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given + * relation with the given configuration. We perform almost all safety checks for + * distributing table here. If there is an unsatisfied requirement, we error out + * and do not distribute the table. + * + * This function assumes, callers have already acquried necessary locks to ensure + * there will not be any change in the given relation. + */ +static void +EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, + char distributionMethod, uint32 colocationId, + char replicationModel, bool viaDeprecatedAPI) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; - char relationKind = 0; - Var *distributionColumn = NULL; - - /* check global replication settings before continuing */ - EnsureReplicationSettings(InvalidOid, replicationModel); - - /* - * Lock target relation with an exclusive lock - there's no way to make - * sense of this table until we've committed, and we don't want multiple - * backends manipulating this relation. - */ - relation = relation_open(relationId, ExclusiveLock); - relationDesc = RelationGetDescr(relation); - relationName = RelationGetRelationName(relation); EnsureTableOwner(relationId); + EnsureTableNotDistributed(relationId); + EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI); + EnsureReplicationSettings(InvalidOid, replicationModel); - /* check that the relation is not already distributed */ - if (IsDistributedTable(relationId)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("table \"%s\" is already distributed", - relationName))); - } + /* we assume callers took necessary locks */ + relation = relation_open(relationId, NoLock); + relationDesc = RelationGetDescr(relation); + relationName = RelationGetRelationName(relation); /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ if (relationDesc->tdhasoid) @@ -348,25 +586,7 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "(OIDS) option in their definitions."))); } - /* verify target relation is either regular or foreign table */ - relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", - relationName), - errdetail("Distributed relations must be regular or " - "foreign tables."))); - } - -#if (PG_VERSION_NUM >= 100000) - if (relation->rd_rel->relispartition) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", relationName), - errdetail("Distributing partition tables is unsupported."))); - } - + /* verify target relation does not use identity columns */ if (RelationUsesIdentityColumns(relationDesc)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -374,14 +594,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, errdetail("Distributed relations must not use GENERATED " "... AS IDENTITY."))); } -#endif - - /* - * Distribution column returns NULL for reference tables, - * but it is not used below for reference tables. - */ - distributionColumn = BuildDistributionKeyFromColumnName(relation, - distributionColumnName); /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) @@ -417,18 +629,193 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, ErrorIfUnsupportedConstraint(relation, distributionMethod, distributionColumn, colocationId); - InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, - colocationId, replicationModel); - relation_close(relation, NoLock); +} - /* - * PostgreSQL supports truncate trigger for regular relations only. - * Truncate on foreign tables is not supported. - */ - if (relationKind == RELKIND_RELATION) + +/* + * EnsureTableCanBeColocatedWith checks whether a given replication model and + * distribution column type is suitable to distribute a table to be colocated + * with given source table. + * + * We only pass relationId to provide meaningful error messages. + */ +static void +EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, + Oid distributionColumnType, Oid sourceRelationId) +{ + DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId); + char sourceDistributionMethod = sourceTableEntry->partitionMethod; + char sourceReplicationModel = sourceTableEntry->replicationModel; + Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); + Oid sourceDistributionColumnType = InvalidOid; + + if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) { - CreateTruncateTrigger(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + + if (sourceReplicationModel != replicationModel) + { + char *relationName = get_rel_name(relationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, relationName), + errdetail("Replication models don't match for %s and %s.", + sourceRelationName, relationName))); + } + + sourceDistributionColumnType = sourceDistributionColumn->vartype; + if (sourceDistributionColumnType != distributionColumnType) + { + char *relationName = get_rel_name(relationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, relationName), + errdetail("Distribution column types don't match for " + "%s and %s.", sourceRelationName, + relationName))); + } +} + + +/* + * EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user + * and creates the schema of the given relationId. The function errors out if the + * command cannot be executed in any of the worker nodes. + */ +static void +EnsureSchemaExistsOnAllNodes(Oid relationId) +{ + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + StringInfo applySchemaCreationDDL = makeStringInfo(); + + Oid schemaId = get_rel_namespace(relationId); + const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId); + uint64 connectionFlag = FORCE_NEW_CONNECTION; + + if (createSchemaDDL == NULL) + { + return; + } + + appendStringInfo(applySchemaCreationDDL, "%s", createSchemaDDL); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, NULL, + NULL); + + ExecuteCriticalRemoteCommand(connection, applySchemaCreationDDL->data); + } +} + + +/* + * EnsureLocalTableEmptyIfNecessary only checks for emptiness if only an empty + * relation can be distributed in given configuration. + * + * In some cases, it is possible and safe to send local data to shards while + * distributing the table. In those cases, we can distribute non-empty local + * tables. This function checks the distributionMethod and relation kind to + * see whether we need to be ensure emptiness of local table. If we need to + * be sure, this function calls EnsureLocalTableEmpty function to ensure + * that local table does not contain any data. + */ +static void +EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod, + bool viaDepracatedAPI) +{ + if (viaDepracatedAPI) + { + EnsureLocalTableEmpty(relationId); + } + else if (distributionMethod != DISTRIBUTE_BY_HASH && + distributionMethod != DISTRIBUTE_BY_NONE) + { + EnsureLocalTableEmpty(relationId); + } + else if (!RegularTable(relationId)) + { + EnsureLocalTableEmpty(relationId); + } +} + + +/* + * EnsureLocalTableEmpty errors out if the local table is not empty. + */ +static void +EnsureLocalTableEmpty(Oid relationId) +{ + char *relationName = get_rel_name(relationId); + bool localTableEmpty = LocalTableEmpty(relationId); + + if (!localTableEmpty) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", relationName), + errdetail("Relation \"%s\" contains data.", relationName), + errhint("Empty your table before distributing it."))); + } +} + + +/* + * EnsureTableNotDistributed errors out if the table is distributed. + */ +static void +EnsureTableNotDistributed(Oid relationId) +{ + char *relationName = get_rel_name(relationId); + bool isDistributedTable = false; + + isDistributedTable = IsDistributedTable(relationId); + + if (isDistributedTable) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("table \"%s\" is already distributed", + relationName))); + } +} + + +/* + * EnsureReplicationSettings checks whether the current replication factor + * setting is compatible with the replication model. This function errors + * out if caller tries to use streaming replication with more than one + * replication factor. + */ +void +EnsureReplicationSettings(Oid relationId, char replicationModel) +{ + char *msgSuffix = "the streaming replication model"; + char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; + + if (relationId != InvalidOid) + { + msgSuffix = "tables which use the streaming replication model"; + extraHint = ""; + } + + if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication factors above one are incompatible with %s", + msgSuffix), + errhint("Try again after reducing \"citus.shard_replication_" + "factor\" to one%s.", extraHint))); } } @@ -604,256 +991,24 @@ CreateTruncateTrigger(Oid relationId) /* - * CreateHashDistributedTable creates a hash distributed table. + * RegularTable function returns true if given table's relation kind is RELKIND_RELATION + * (or RELKIND_PARTITIONED_TABLE for PG >= 10), otherwise it returns false. */ -static void -CreateHashDistributedTable(Oid relationId, char *distributionColumnName, - char *colocateWithTableName, int shardCount, - int replicationFactor) +bool +RegularTable(Oid relationId) { - Relation distributedRelation = NULL; - Relation pgDistColocation = NULL; - uint32 colocationId = INVALID_COLOCATION_ID; - Oid sourceRelationId = InvalidOid; - Oid distributionColumnType = InvalidOid; - bool useExclusiveConnection = false; - char relationKind = 0; + char relationKind = get_rel_relkind(relationId); - /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ - distributedRelation = relation_open(relationId, AccessShareLock); - - /* - * Get an exclusive lock on the colocation system catalog. Therefore, we - * can be sure that there will no modifications on the colocation table - * until this transaction is committed. - */ - pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - - /* get distribution column data type */ - distributionColumnType = ColumnType(relationId, distributionColumnName); - - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) - { - /* check for default colocation group */ - colocationId = ColocationId(shardCount, replicationFactor, - distributionColumnType); - if (colocationId == INVALID_COLOCATION_ID) - { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - } - else - { - sourceRelationId = ColocatedTableId(colocationId); - } - } - else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) - { - colocationId = GetNextColocationId(); - } - else - { - /* get colocation group of the target table */ - text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); - sourceRelationId = ResolveRelationId(colocateWithTableNameText); - - colocationId = TableColocationId(sourceRelationId); - } - - /* relax empty table requirement for regular (non-foreign) tables */ - relationKind = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE) +#else if (relationKind == RELKIND_RELATION) +#endif { - EnsureTableNotDistributed(relationId); - useExclusiveConnection = IsTransactionBlock() || !LocalTableEmpty(relationId); - } - else - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + return true; } - /* create distributed table metadata */ - ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId); - - /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. - */ - EnsureSchemaExistsOnAllNodes(relationId); - - /* create shards */ - if (sourceRelationId != InvalidOid) - { - /* first run checks */ - CheckReplicationModel(sourceRelationId, relationId); - CheckDistributionColumnType(sourceRelationId, relationId); - - - CreateColocatedShards(relationId, sourceRelationId, useExclusiveConnection); - } - else - { - CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor, - useExclusiveConnection); - } - - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) - { - CopyLocalDataIntoShards(relationId); - } - - heap_close(pgDistColocation, NoLock); - relation_close(distributedRelation, NoLock); -} - - -/* - * EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user - * and creates the schema of the given relationId. The function errors out if the - * command cannot be executed in any of the worker nodes. - */ -static void -EnsureSchemaExistsOnAllNodes(Oid relationId) -{ - List *workerNodeList = ActivePrimaryNodeList(); - ListCell *workerNodeCell = NULL; - StringInfo applySchemaCreationDDL = makeStringInfo(); - - Oid schemaId = get_rel_namespace(relationId); - const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId); - uint64 connectionFlag = FORCE_NEW_CONNECTION; - - if (createSchemaDDL == NULL) - { - return; - } - - appendStringInfo(applySchemaCreationDDL, "%s", createSchemaDDL); - - foreach(workerNodeCell, workerNodeList) - { - WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, NULL, - NULL); - - ExecuteCriticalRemoteCommand(connection, applySchemaCreationDDL->data); - } -} - - -/* - * EnsureLocalTableEmpty errors out if the local table is not empty. - */ -static void -EnsureLocalTableEmpty(Oid relationId) -{ - bool localTableEmpty = false; - char *relationName = get_rel_name(relationId); - - localTableEmpty = LocalTableEmpty(relationId); - - if (!localTableEmpty) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot distribute relation \"%s\"", relationName), - errdetail("Relation \"%s\" contains data.", relationName), - errhint("Empty your table before distributing it."))); - } -} - - -/* - * EnsureIsTableId errors out if the id is not belong to a regular of foreign table. - */ -static void -EnsureIsTableId(Oid relationId) -{ - Relation relation = relation_open(relationId, AccessShareLock); - char *relationName = get_rel_name(relationId); - char relationKind = 0; - - /* verify target relation is either regular or foreign table */ - relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular or foreign table", - relationName))); - } - - relation_close(relation, NoLock); -} - - -/* - * EnsureTableNotDistributed errors out if the relationId doesn't belong to regular or foreign table - * or the table is distributed. - */ -static void -EnsureTableNotDistributed(Oid relationId) -{ - char *relationName = get_rel_name(relationId); - bool isDistributedTable = false; - - EnsureIsTableId(relationId); - isDistributedTable = IsDistributedTable(relationId); - - if (isDistributedTable) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("table \"%s\" is already distributed", - relationName))); - } -} - - -/* - * ColumnType returns the column type of the given column. - */ -static Oid -ColumnType(Oid relationId, char *columnName) -{ - AttrNumber columnIndex = get_attnum(relationId, columnName); - Oid columnType = get_atttype(relationId, columnIndex); - - return columnType; -} - - -/* - * Check that the current replication factor setting is compatible with the - * replication model of relationId, if valid. If InvalidOid, check that the - * global replication model setting instead. Errors out if an invalid state - * is detected. - */ -void -EnsureReplicationSettings(Oid relationId, char replicationModel) -{ - char *msgSuffix = "the streaming replication model"; - char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; - - if (relationId != InvalidOid) - { - msgSuffix = "tables which use the streaming replication model"; - extraHint = ""; - } - - if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication factors above one are incompatible with %s", - msgSuffix), - errhint("Try again after reducing \"citus.shard_replication_" - "factor\" to one%s.", extraHint))); - } + return false; } @@ -1014,10 +1169,10 @@ TupleDescColumnNameList(TupleDesc tupleDescriptor) * RelationUsesIdentityColumns returns whether a given relation uses the SQL * GENERATED ... AS IDENTITY features supported as of PostgreSQL 10. */ -#if (PG_VERSION_NUM >= 100000) static bool RelationUsesIdentityColumns(TupleDesc relationDesc) { +#if (PG_VERSION_NUM >= 100000) int attributeIndex = 0; for (attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++) @@ -1029,9 +1184,7 @@ RelationUsesIdentityColumns(TupleDesc relationDesc) return true; } } +#endif return false; } - - -#endif diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index d983325b7..3ef550891 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -183,7 +183,7 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval } } - if (relationKind == RELKIND_RELATION) + if (RegularTable(relationId)) { appendStringInfo(workerCommand, DROP_REGULAR_TABLE_COMMAND, shardNames->data); } diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 699de72db..c2a763c1c 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -721,7 +721,7 @@ ShardStorageType(Oid relationId) char shardStorageType = 0; char relationType = get_rel_relkind(relationId); - if (relationType == RELKIND_RELATION) + if (RegularTable(relationId)) { shardStorageType = SHARD_STORAGE_TABLE; } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 1774895f8..1de50bdab 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -389,7 +389,7 @@ RecreateTableDDLCommandList(Oid relationId) bool includeSequenceDefaults = false; /* build appropriate DROP command based on relation kind */ - if (relationKind == RELKIND_RELATION) + if (RegularTable(relationId)) { appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND, qualifiedRelationName); diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index a543669e6..19d340187 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -36,6 +36,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/relay_utility.h" +#include "distributed/master_metadata_utility.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" @@ -59,7 +60,6 @@ static void AppendOptionListToString(StringInfo stringData, List *options); -static bool SupportedRelationKind(Relation relation); static const char * convert_aclright_to_string(int aclright); @@ -291,7 +291,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) AttrNumber constraintIndex = 0; AttrNumber constraintCount = 0; StringInfoData buffer = { NULL, 0, 0, 0 }; - bool supportedRelationKind = false; /* * Instead of retrieving values from system catalogs as other functions in @@ -304,13 +303,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) relation = relation_open(tableRelationId, AccessShareLock); relationName = generate_relation_name(tableRelationId, NIL); - supportedRelationKind = SupportedRelationKind(relation); - if (!supportedRelationKind) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular, foreign, or partitioned table", - relationName))); - } + EnsureRelationKindSupported(tableRelationId); initStringInfo(&buffer); @@ -491,25 +484,33 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) /* - * SupportedRelationKind returns true if the given relation is supported as a - * distributed relation. + * EnsureRelationKindSupported errors out if the given relation is not supported + * as a distributed relation. */ -static bool -SupportedRelationKind(Relation relation) +void +EnsureRelationKindSupported(Oid relationId) { - char relationKind = relation->rd_rel->relkind; - bool supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind == - RELKIND_FOREIGN_TABLE); -#if (PG_VERSION_NUM >= 100000) - supportedRelationKind = supportedRelationKind || relationKind == - RELKIND_PARTITIONED_TABLE; -#endif + char relationKind = get_rel_relkind(relationId); + bool supportedRelationKind = false; - /* Citus doesn't support bare inhereted tables (i.e., not a partition or partitioned table) */ - supportedRelationKind = supportedRelationKind && !(IsChildTable(relation->rd_id) || - IsParentTable(relation->rd_id)); + supportedRelationKind = RegularTable(relationId) || + relationKind == RELKIND_FOREIGN_TABLE; - return supportedRelationKind; + /* + * Citus doesn't support bare inherited tables (i.e., not a partition or + * partitioned table) + */ + supportedRelationKind = supportedRelationKind && !(IsChildTable(relationId) || + IsParentTable(relationId)); + + if (!supportedRelationKind) + { + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("%s is not a regular, foreign or partitioned table", + relationName))); + } } @@ -523,15 +524,12 @@ char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) { Relation relation = NULL; - char *relationName = NULL; - char relationKind = 0; TupleDesc tupleDescriptor = NULL; AttrNumber attributeIndex = 0; List *columnOptionList = NIL; ListCell *columnOptionCell = NULL; bool firstOptionPrinted = false; StringInfoData buffer = { NULL, 0, 0, 0 }; - bool supportedRelationKind = false; /* * Instead of retrieving values from system catalogs, we open the relation, @@ -539,22 +537,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) * This is primarily to maintain symmetry with pg_get_tableschemadef. */ relation = relation_open(tableRelationId, AccessShareLock); - relationName = generate_relation_name(tableRelationId, NIL); - relationKind = relation->rd_rel->relkind; - supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind == - RELKIND_FOREIGN_TABLE); -#if (PG_VERSION_NUM >= 100000) - supportedRelationKind = supportedRelationKind || relationKind == - RELKIND_PARTITIONED_TABLE; -#endif - - if (!supportedRelationKind) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular or foreign table or partitioned", - relationName))); - } + EnsureRelationKindSupported(tableRelationId); /* * Iterate over the table's columns. If a particular column is not dropped diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 53e59b8fc..e12ce4313 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -904,6 +904,15 @@ ColocatedTableId(Oid colocationId) ScanKeyData scanKey[1]; int scanKeyCount = 1; + /* + * We may have a distributed table whose colocation id is INVALID_COLOCATION_ID. + * In this case, we do not want to send that table's id as colocated table id. + */ + if (colocationId == INVALID_COLOCATION_ID) + { + return colocatedTableId; + } + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 213dec2fa..0e14c3ed7 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); +extern void EnsureRelationKindSupported(Oid relationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index c305cc0ba..8f0eba6c3 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -139,6 +139,9 @@ extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void DeleteShardPlacementRow(uint64 placementId); extern void UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor); +extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, + char distributionMethod, char *colocateWithTableName, + bool viaDeprecatedAPI); extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ @@ -147,6 +150,7 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); extern void EnsureReplicationSettings(Oid relationId, char replicationModel); +extern bool RegularTable(Oid relationId); extern bool TableReferenced(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/test/regress/expected/isolation_concurrent_dml.out b/src/test/regress/expected/isolation_concurrent_dml.out index a5e6a1737..f7f8fc9ee 100644 --- a/src/test/regress/expected/isolation_concurrent_dml.out +++ b/src/test/regress/expected/isolation_concurrent_dml.out @@ -8,10 +8,10 @@ step s1-begin: BEGIN; step s1-insert: - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); step s2-update: - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; step s1-commit: COMMIT; @@ -23,8 +23,8 @@ master_create_worker_shards step s1-insert: - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); step s2-update: - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; diff --git a/src/test/regress/expected/isolation_copy_placement_vs_modification.out b/src/test/regress/expected/isolation_copy_placement_vs_modification.out index 6a6fbfd83..523f87f80 100644 --- a/src/test/regress/expected/isolation_copy_placement_vs_modification.out +++ b/src/test/regress/expected/isolation_copy_placement_vs_modification.out @@ -2,16 +2,16 @@ Parsed test spec with 2 sessions starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -29,7 +29,7 @@ master_copy_shard_placement step s1-update: - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: COMMIT; @@ -42,7 +42,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -55,16 +55,16 @@ nodeport success result starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -82,7 +82,7 @@ master_copy_shard_placement step s1-delete: - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; step s2-commit: COMMIT; @@ -95,7 +95,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -108,13 +108,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -132,7 +132,7 @@ master_copy_shard_placement step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s2-commit: COMMIT; @@ -145,7 +145,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -158,13 +158,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -182,7 +182,7 @@ master_copy_shard_placement step s1-copy: - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: COMMIT; @@ -195,7 +195,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -208,13 +208,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -232,7 +232,7 @@ master_copy_shard_placement step s1-ddl: - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); step s2-commit: COMMIT; @@ -245,7 +245,7 @@ step s2-print-index-count: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; @@ -258,13 +258,13 @@ nodeport success result starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -282,7 +282,7 @@ master_copy_shard_placement step s1-update: - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: COMMIT; @@ -295,7 +295,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -308,13 +308,13 @@ nodeport success result starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -332,7 +332,7 @@ master_copy_shard_placement step s1-delete: - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; step s2-commit: COMMIT; @@ -345,7 +345,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -361,7 +361,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -379,7 +379,7 @@ master_copy_shard_placement step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s2-commit: COMMIT; @@ -392,7 +392,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -408,7 +408,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -426,7 +426,7 @@ master_copy_shard_placement step s1-copy: - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: COMMIT; @@ -439,7 +439,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -455,7 +455,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -473,7 +473,7 @@ master_copy_shard_placement step s1-ddl: - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); step s2-commit: COMMIT; @@ -486,7 +486,7 @@ step s2-print-index-count: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; diff --git a/src/test/regress/expected/isolation_dml_vs_repair.out b/src/test/regress/expected/isolation_dml_vs_repair.out index 543929531..19c01a48d 100644 --- a/src/test/regress/expected/isolation_dml_vs_repair.out +++ b/src/test/regress/expected/isolation_dml_vs_repair.out @@ -5,16 +5,16 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-begin: BEGIN; step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); step s1-commit: COMMIT; @@ -29,19 +29,19 @@ master_create_worker_shards step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-begin: BEGIN; step s1-insertall: - INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); step s1-commit: COMMIT; @@ -56,41 +56,41 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-commit: COMMIT; step s1-insertone: <... completed> step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data @@ -101,7 +101,7 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-prepared-insertone: EXECUTE insertone; @@ -110,7 +110,7 @@ step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement @@ -124,22 +124,22 @@ step s2-commit: step s1-prepared-insertone: <... completed> error in steps s2-commit s1-prepared-insertone: ERROR: prepared modifications cannot be executed on a shard while it is being copied step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data @@ -150,10 +150,10 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s1-prepared-insertall: EXECUTE insertall; @@ -162,7 +162,7 @@ step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement @@ -176,23 +176,23 @@ step s2-commit: step s1-prepared-insertall: <... completed> error in steps s2-commit s1-prepared-insertall: ERROR: prepared modifications cannot be executed on a shard while it is being copied step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 1 2 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data diff --git a/src/test/regress/expected/multi_create_shards.out b/src/test/regress/expected/multi_create_shards.out index 558a7584e..a3f221e70 100644 --- a/src/test/regress/expected/multi_create_shards.out +++ b/src/test/regress/expected/multi_create_shards.out @@ -48,7 +48,7 @@ DETAIL: Distributed relations must not specify the WITH (OIDS) option in their ALTER TABLE table_to_distribute SET WITHOUT OIDS; -- use an index instead of table name SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); -ERROR: table_to_distribute_pkey is not a regular or foreign table +ERROR: table_to_distribute_pkey is not a regular, foreign or partitioned table -- use a bad column name SELECT master_create_distributed_table('table_to_distribute', 'bad_column', 'hash'); ERROR: column "bad_column" of relation "table_to_distribute" does not exist diff --git a/src/test/regress/expected/multi_generate_ddl_commands.out b/src/test/regress/expected/multi_generate_ddl_commands.out index d993abf29..829e8b8f7 100644 --- a/src/test/regress/expected/multi_generate_ddl_commands.out +++ b/src/test/regress/expected/multi_generate_ddl_commands.out @@ -146,7 +146,7 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; SELECT table_ddl_command_array('local_view'); -ERROR: public.local_view is not a regular, foreign, or partitioned table +ERROR: local_view is not a regular, foreign or partitioned table -- clean up DROP VIEW IF EXISTS local_view; DROP FOREIGN TABLE IF EXISTS foreign_table; diff --git a/src/test/regress/expected/multi_partitioning_utils.out b/src/test/regress/expected/multi_partitioning_utils.out index 39a36c805..00f849b71 100644 --- a/src/test/regress/expected/multi_partitioning_utils.out +++ b/src/test/regress/expected/multi_partitioning_utils.out @@ -415,8 +415,8 @@ SELECT table_inherited('date_partitioned_table'); -- also these are not supported SELECT master_get_table_ddl_events('capitals'); -ERROR: public.capitals is not a regular, foreign, or partitioned table +ERROR: capitals is not a regular, foreign or partitioned table SELECT master_get_table_ddl_events('cities'); -ERROR: public.cities is not a regular, foreign, or partitioned table +ERROR: cities is not a regular, foreign or partitioned table -- dropping parents frop the partitions DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals; diff --git a/src/test/regress/expected/multi_partitioning_utils_0.out b/src/test/regress/expected/multi_partitioning_utils_0.out index a15d197a5..8f4906f48 100644 --- a/src/test/regress/expected/multi_partitioning_utils_0.out +++ b/src/test/regress/expected/multi_partitioning_utils_0.out @@ -355,9 +355,9 @@ LINE 1: SELECT table_inherited('date_partitioned_table'); ^ -- also these are not supported SELECT master_get_table_ddl_events('capitals'); -ERROR: public.capitals is not a regular, foreign, or partitioned table +ERROR: capitals is not a regular, foreign or partitioned table SELECT master_get_table_ddl_events('cities'); -ERROR: public.cities is not a regular, foreign, or partitioned table +ERROR: cities is not a regular, foreign or partitioned table -- dropping parents frop the partitions DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals; ERROR: table "date_partitioned_table" does not exist diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index e373f8544..f7a665fd7 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -5,7 +5,8 @@ test: isolation_add_node_vs_reference_table_operations # that come later can be parallelized test: isolation_cluster_management -test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation +test: isolation_dml_vs_repair +test: isolation_copy_placement_vs_copy_placement isolation_cancellation test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery diff --git a/src/test/regress/specs/isolation_concurrent_dml.spec b/src/test/regress/specs/isolation_concurrent_dml.spec index 3288d7521..74c86583f 100644 --- a/src/test/regress/specs/isolation_concurrent_dml.spec +++ b/src/test/regress/specs/isolation_concurrent_dml.spec @@ -1,13 +1,13 @@ setup { - CREATE TABLE test_table (test_id integer NOT NULL, data text); - SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); - SELECT master_create_worker_shards('test_table', 4, 2); + CREATE TABLE test_concurrent_dml (test_id integer NOT NULL, data text); + SELECT master_create_distributed_table('test_concurrent_dml', 'test_id', 'hash'); + SELECT master_create_worker_shards('test_concurrent_dml', 4, 2); } teardown { - DROP TABLE IF EXISTS test_table CASCADE; + DROP TABLE IF EXISTS test_concurrent_dml CASCADE; } session "s1" @@ -19,7 +19,7 @@ step "s1-begin" step "s1-insert" { - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); } step "s1-commit" @@ -31,7 +31,7 @@ session "s2" step "s2-update" { - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; } # verify that an in-progress insert blocks concurrent updates diff --git a/src/test/regress/specs/isolation_copy_placement_vs_modification.spec b/src/test/regress/specs/isolation_copy_placement_vs_modification.spec index 201d4eca8..8ab9fe84b 100644 --- a/src/test/regress/specs/isolation_copy_placement_vs_modification.spec +++ b/src/test/regress/specs/isolation_copy_placement_vs_modification.spec @@ -4,15 +4,15 @@ setup { SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 2; - CREATE TABLE test_table (x int, y int); - SELECT create_distributed_table('test_table', 'x'); + CREATE TABLE test_copy_placement_vs_modification (x int, y int); + SELECT create_distributed_table('test_copy_placement_vs_modification', 'x'); - SELECT get_shard_id_for_distribution_column('test_table', 5) INTO selected_shard; + SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard; } teardown { - DROP TABLE test_table; + DROP TABLE test_copy_placement_vs_modification; DROP TABLE selected_shard; } @@ -23,41 +23,41 @@ step "s1-begin" BEGIN; } -# since test_table has rep > 1 simple select query doesn't hit all placements +# since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements # hence not all placements are cached step "s1-load-cache" { - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; } step "s1-insert" { - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); } step "s1-update" { - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; } step "s1-delete" { - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; } step "s1-select" { - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; } step "s1-ddl" { - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); } step "s1-copy" { - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; } step "s1-commit" @@ -92,7 +92,7 @@ step "s2-print-content" SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -104,7 +104,7 @@ step "s2-print-index-count" SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; } diff --git a/src/test/regress/specs/isolation_dml_vs_repair.spec b/src/test/regress/specs/isolation_dml_vs_repair.spec index 6da20e7d9..7a89b41df 100644 --- a/src/test/regress/specs/isolation_dml_vs_repair.spec +++ b/src/test/regress/specs/isolation_dml_vs_repair.spec @@ -1,13 +1,13 @@ setup { - CREATE TABLE test_table (test_id integer NOT NULL, data int); - SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); - SELECT master_create_worker_shards('test_table', 1, 2); + CREATE TABLE test_dml_vs_repair (test_id integer NOT NULL, data int); + SELECT master_create_distributed_table('test_dml_vs_repair', 'test_id', 'hash'); + SELECT master_create_worker_shards('test_dml_vs_repair', 1, 2); } teardown { - DROP TABLE IF EXISTS test_table CASCADE; + DROP TABLE IF EXISTS test_dml_vs_repair CASCADE; } session "s1" @@ -15,9 +15,9 @@ session "s1" setup { DEALLOCATE all; - TRUNCATE test_table; - PREPARE insertone AS INSERT INTO test_table VALUES(1, 1); - PREPARE insertall AS INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + TRUNCATE test_dml_vs_repair; + PREPARE insertone AS INSERT INTO test_dml_vs_repair VALUES(1, 1); + PREPARE insertall AS INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; } step "s1-begin" @@ -27,7 +27,7 @@ step "s1-begin" step "s1-insertone" { - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); } step "s1-prepared-insertone" @@ -37,7 +37,7 @@ step "s1-prepared-insertone" step "s1-insertall" { - INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; } step "s1-prepared-insertall" @@ -47,7 +47,7 @@ step "s1-prepared-insertall" step "s1-display" { - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; } step "s1-commit" @@ -65,27 +65,27 @@ step "s2-begin" step "s2-invalidate-57637" { - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; } step "s2-revalidate-57637" { - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; } step "s2-invalidate-57638" { - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; } step "s2-revalidate-57638" { - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; } step "s2-repair" { - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); } step "s2-commit"