From 7769f1d01216d90924b1e024e94846c283f42d73 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Thu, 27 Jul 2017 12:25:18 +0300 Subject: [PATCH 1/2] Refactor distributed table creation logic This commit is preperation for introducing distributed partitioned table support. We want to clean and refactor some code in distributed table creation logic so that we can handle partitioned tables in more robust way. --- .../commands/create_distributed_table.c | 1063 ++++++++++------- .../master/master_expire_table_cache.c | 2 +- .../distributed/master/master_node_protocol.c | 2 +- .../distributed/master/master_repair_shards.c | 2 +- .../distributed/utils/citus_ruleutils.c | 68 +- .../distributed/utils/colocation_utils.c | 9 + src/include/distributed/citus_ruleutils.h | 1 + .../distributed/master_metadata_utility.h | 4 + .../regress/expected/multi_create_shards.out | 2 +- .../expected/multi_generate_ddl_commands.out | 2 +- .../expected/multi_partitioning_utils.out | 4 +- .../expected/multi_partitioning_utils_0.out | 4 +- 12 files changed, 657 insertions(+), 506 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0aeec0165..0a783f49b 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -29,6 +29,7 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" @@ -69,28 +70,30 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ -static void CreateReferenceTable(Oid distributedRelationId); -static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, +static char AppropriateReplicationModel(char distributionMethod, bool viaDeprecatedAPI); +static void CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, + bool localTableEmpty); +static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, char distributionMethod, char replicationModel, - uint32 colocationId); -static char LookupDistributionMethod(Oid distributionMethodOid); -static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, - int16 supportFunctionNumber); -static bool LocalTableEmpty(Oid tableId); -static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, - char *colocateWithTableName, - int shardCount, int replicationFactor); -static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalDataIntoShards(Oid relationId); -static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); -#if (PG_VERSION_NUM >= 100000) -static bool RelationUsesIdentityColumns(TupleDesc relationDesc); -#endif - + char *colocateWithTableName, bool viaDeprecatedAPI); +static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, + char distributionMethod, uint32 colocationId, + char replicationModel, bool viaDeprecatedAPI); +static void EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, + Oid distributionColumnType, + Oid sourceRelationId); static void EnsureSchemaExistsOnAllNodes(Oid relationId); static void EnsureLocalTableEmpty(Oid relationId); static void EnsureTableNotDistributed(Oid relationId); -static void EnsureIsTableId(Oid relationId); +static char LookupDistributionMethod(Oid distributionMethodOid); +static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, + int16 supportFunctionNumber); +static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod, + bool viaDepracatedAPI); +static bool LocalTableEmpty(Oid tableId); +static void CopyLocalDataIntoShards(Oid relationId); +static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); +static bool RelationUsesIdentityColumns(TupleDesc relationDesc); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -102,37 +105,50 @@ PG_FUNCTION_INFO_V1(create_reference_table); * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. * - * Note that this udf is depreciated and cannot create colocated tables, so we + * Note that this UDF is deprecated and cannot create colocated tables, so we * always use INVALID_COLOCATION_ID. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) { - Oid distributedRelationId = PG_GETARG_OID(0); + Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - char *distributionColumnName = text_to_cstring(distributionColumnText); - char distributionMethod = LookupDistributionMethod(distributionMethodOid); + char *distributionColumnName = NULL; + Var *distributionColumn = NULL; + char distributionMethod = 0; + char *colocateWithTableName = NULL; + bool viaDeprecatedAPI = true; + + Relation relation = NULL; - EnsureTableNotDistributed(distributedRelationId); - EnsureLocalTableEmpty(distributedRelationId); - EnsureCoordinator(); CheckCitusVersion(ERROR); + EnsureCoordinator(); - if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) - { - ereport(NOTICE, (errmsg("using statement-based replication"), - errdetail("The current replication_model setting is " - "'streaming', which is not supported by " - "master_create_distributed_table."), - errhint("Use create_distributed_table to use the streaming " - "replication model."))); - } + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); - ConvertToDistributedTable(distributedRelationId, distributionColumnName, - distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); + + distributionColumnName = text_to_cstring(distributionColumnText); + distributionColumn = BuildDistributionKeyFromColumnName(relation, + distributionColumnName); + distributionMethod = LookupDistributionMethod(distributionMethodOid); + + CreateDistributedTable(relationId, distributionColumn, distributionMethod, + colocateWithTableName, viaDeprecatedAPI); + + relation_close(relation, NoLock); PG_RETURN_VOID(); } @@ -146,90 +162,53 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { - Oid relationId = PG_GETARG_OID(0); - text *distributionColumnText = PG_GETARG_TEXT_P(1); - Oid distributionMethodOid = PG_GETARG_OID(2); - - char *distributionColumnName = text_to_cstring(distributionColumnText); - char distributionMethod = LookupDistributionMethod(distributionMethodOid); + Oid relationId = InvalidOid; + text *distributionColumnText = NULL; + Oid distributionMethodOid = InvalidOid; text *colocateWithTableNameText = NULL; + + Relation relation = NULL; + char *distributionColumnName = NULL; + Var *distributionColumn = NULL; + char distributionMethod = 0; + char *colocateWithTableName = NULL; - EnsureCoordinator(); + bool viaDeprecatedAPI = false; + CheckCitusVersion(ERROR); + EnsureCoordinator(); - /* guard against a binary update without a function update */ - if (PG_NARGS() >= 4) - { - colocateWithTableNameText = PG_GETARG_TEXT_P(3); - colocateWithTableName = text_to_cstring(colocateWithTableNameText); - } - else - { - colocateWithTableName = "default"; - } + relationId = PG_GETARG_OID(0); + distributionColumnText = PG_GETARG_TEXT_P(1); + distributionMethodOid = PG_GETARG_OID(2); + colocateWithTableNameText = PG_GETARG_TEXT_P(3); - /* check if we try to colocate with hash distributed tables */ - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && - pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) - { - Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); - char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); - if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || - distributionMethod != DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot distribute relation"), - errdetail("Currently, colocate_with option is only supported " - "for hash distributed tables."))); - } - } + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); - /* if distribution method is not hash, just create partition metadata */ - if (distributionMethod != DISTRIBUTE_BY_HASH) - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + distributionColumnName = text_to_cstring(distributionColumnText); + distributionColumn = BuildDistributionKeyFromColumnName(relation, + distributionColumnName); + distributionMethod = LookupDistributionMethod(distributionMethodOid); - if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) - { - ereport(NOTICE, (errmsg("using statement-based replication"), - errdetail("Streaming replication is supported only for " - "hash-distributed tables."))); - } + colocateWithTableName = text_to_cstring(colocateWithTableNameText); - ConvertToDistributedTable(relationId, distributionColumnName, - distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); - PG_RETURN_VOID(); - } + CreateDistributedTable(relationId, distributionColumn, distributionMethod, + colocateWithTableName, viaDeprecatedAPI); - /* use configuration values for shard count and shard replication factor */ - CreateHashDistributedTable(relationId, distributionColumnName, - colocateWithTableName, ShardCount, - ShardReplicationFactor); - - if (ShouldSyncTableMetadata(relationId)) - { - CreateTableMetadataOnWorkers(relationId); - } - - PG_RETURN_VOID(); -} - - -/* - * create_reference_table accepts a table and then it creates a distributed - * table which has one shard and replication factor is set to - * the worker count. - */ -Datum -create_reference_table(PG_FUNCTION_ARGS) -{ - Oid relationId = PG_GETARG_OID(0); - - CreateReferenceTable(relationId); + relation_close(relation, NoLock); PG_RETURN_VOID(); } @@ -240,23 +219,41 @@ create_reference_table(PG_FUNCTION_ARGS) * created table has one shard and replication factor is set to the active worker * count. In fact, the above is the definition of a reference table in Citus. */ -static void -CreateReferenceTable(Oid relationId) +Datum +create_reference_table(PG_FUNCTION_ARGS) { - uint32 colocationId = INVALID_COLOCATION_ID; + Oid relationId = PG_GETARG_OID(0); + + Relation relation = NULL; + char *colocateWithTableName = NULL; List *workerNodeList = NIL; - int replicationFactor = 0; - char *distributionColumnName = NULL; - char relationKind = 0; + int workerCount = 0; + Var *distributionColumn = NULL; + + bool viaDeprecatedAPI = false; EnsureCoordinator(); CheckCitusVersion(ERROR); + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); + + /* + * We should do this check here since the codes in the following lines rely + * on this relation to have a supported relation kind. More extensive checks + * will be performed in CreateDistributedTable. + */ + EnsureRelationKindSupported(relationId); + workerNodeList = ActivePrimaryNodeList(); - replicationFactor = list_length(workerNodeList); + workerCount = list_length(workerNodeList); /* if there are no workers, error out */ - if (replicationFactor == 0) + if (workerCount == 0) { char *relationName = get_rel_name(relationId); @@ -265,79 +262,320 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } - /* relax empty table requirement for regular (non-foreign) tables */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) + + CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE, + colocateWithTableName, viaDeprecatedAPI); + + relation_close(relation, NoLock); + + PG_RETURN_VOID(); +} + + +/* + * CreateDistributedTable creates distributed table in the given configuration. + * This functions contains all necessary logic to create distributed tables. It + * perform necessary checks to ensure distributing the table is safe. If it is + * safe to distribute the table, this function creates distributed table metadata, + * creates shards and copies local data to shards. + * + * viaDeprecatedAPI boolean flag is not optimal way to implement this function, + * but it helps reducing code duplication a lot. We hope to remove that flag one + * day, once we deprecate master_create_distribute_table completely. + */ +void +CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, + char *colocateWithTableName, bool viaDeprecatedAPI) +{ + char replicationModel = REPLICATION_MODEL_INVALID; + uint32 colocationId = INVALID_COLOCATION_ID; + Oid colocatedTableId = InvalidOid; + bool localTableEmpty = false; + + Relation colocatedRelation = NULL; + + replicationModel = AppropriateReplicationModel(distributionMethod, viaDeprecatedAPI); + + /* + * ColocationIdForNewTable assumes caller acquires lock on relationId. In our case, + * our caller already acquired lock on relationId. + */ + colocationId = ColocationIdForNewTable(relationId, distributionColumn, + distributionMethod, replicationModel, + colocateWithTableName, viaDeprecatedAPI); + + EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, + colocationId, replicationModel, viaDeprecatedAPI); + + /* we need to calculate these variables before creating distributed metadata */ + localTableEmpty = LocalTableEmpty(relationId); + colocatedTableId = ColocatedTableId(colocationId); + if (colocatedTableId != InvalidOid) { - EnsureTableNotDistributed(relationId); - } - else - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + /* + * We take lock on colocatedTableId, because we want to ensure that colocated + * table is not dropped until we create all colocated shards. + */ + colocatedRelation = relation_open(colocatedTableId, AccessShareLock); } - colocationId = CreateReferenceTableColocationId(); + /* create an entry for distributed table in pg_dist_partition */ + InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, + colocationId, replicationModel); - /* first, convert the relation into distributed relation */ - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); - - /* now, create the single shard replicated to all nodes */ - CreateReferenceTableShard(relationId); - - CreateTableMetadataOnWorkers(relationId); - - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) + /* foreign tables does not support TRUNCATE trigger */ + if (RegularTable(relationId)) { - CopyLocalDataIntoShards(relationId); + CreateTruncateTrigger(relationId); + } + + /* + * If we are using master_create_distributed_table, we don't need to continue, + * because deprecated API does not supports the following features. + */ + if (viaDeprecatedAPI) + { + /* + * We exit early but there is no need to close colocatedRelation. Because + * if viaDeprecatedAPI is true, we never open colocatedRelation in the first + * place. + */ + Assert(colocatedRelation == NULL); + + return; + } + + /* create shards for hash distributed and reference tables */ + if (distributionMethod == DISTRIBUTE_BY_HASH) + { + CreateHashDistributedTableShards(relationId, colocatedTableId, localTableEmpty); + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + CreateReferenceTableShard(relationId); + } + + /* copy over data for hash distributed and reference tables */ + if (distributionMethod == DISTRIBUTE_BY_HASH || + distributionMethod == DISTRIBUTE_BY_NONE) + { + if (RegularTable(relationId)) + { + CopyLocalDataIntoShards(relationId); + } + } + + if (colocatedRelation != NULL) + { + relation_close(colocatedRelation, NoLock); + } + + if (ShouldSyncTableMetadata(relationId)) + { + CreateTableMetadataOnWorkers(relationId); } } /* - * ConvertToDistributedTable converts the given regular PostgreSQL table into a - * distributed table. First, it checks if the given table can be distributed, - * then it creates related tuple in pg_dist_partition. - * - * XXX: We should perform more checks here to see if this table is fit for - * partitioning. At a minimum, we should validate the following: (i) this node - * runs as the master node, (ii) table does not make use of the inheritance - * mechanism and (iii) table does not have collated columns. + * AppropriateReplicationModel function decides which replication model should be + * used depending on given distribution configuration and global ReplicationModel + * variable. If ReplicationModel conflicts with distribution configuration, this + * function errors out. + */ +static char +AppropriateReplicationModel(char distributionMethod, bool viaDeprecatedAPI) +{ + if (viaDeprecatedAPI) + { + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("The current replication_model setting is " + "'streaming', which is not supported by " + "master_create_distributed_table."), + errhint("Use create_distributed_table to use the streaming " + "replication model."))); + } + + return REPLICATION_MODEL_COORDINATOR; + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + return REPLICATION_MODEL_2PC; + } + else if (distributionMethod == DISTRIBUTE_BY_HASH) + { + return ReplicationModel; + } + else + { + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) + { + ereport(NOTICE, (errmsg("using statement-based replication"), + errdetail("Streaming replication is supported only for " + "hash-distributed tables."))); + } + + return REPLICATION_MODEL_COORDINATOR; + } + + /* we should not reach to this point */ + return REPLICATION_MODEL_INVALID; +} + + +/* + * CreateHashDistributedTableShards creates shards of given hash distributed table. */ static void -ConvertToDistributedTable(Oid relationId, char *distributionColumnName, - char distributionMethod, char replicationModel, - uint32 colocationId) +CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, + bool localTableEmpty) +{ + bool useExclusiveConnection = false; + + /* + * Ensure schema exists on each worker node. We can not run this function + * transactionally, since we may create shards over separate sessions and + * shard creation depends on the schema being present and visible from all + * sessions. + */ + EnsureSchemaExistsOnAllNodes(relationId); + + if (RegularTable(relationId)) + { + useExclusiveConnection = IsTransactionBlock() || !localTableEmpty; + } + + if (colocatedTableId != InvalidOid) + { + CreateColocatedShards(relationId, colocatedTableId, useExclusiveConnection); + } + else + { + /* + * This path is only reached by create_distributed_table for the distributed + * tables which will not be part of an existing colocation group. Therefore, + * we can directly use ShardCount and ShardReplicationFactor global variables + * here. + */ + CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor, + useExclusiveConnection); + } +} + + +/* + * ColocationIdForNewTable returns a colocation id for hash-distributed table + * according to given configuration. If there is no such configuration, it + * creates one and returns colocation id of newly the created colocation group. + * For append and range distributed tables, this function errors out if + * colocateWithTableName parameter is not NULL, otherwise directly returns + * INVALID_COLOCATION_ID. + * + * This function assumes its caller take necessary lock on relationId to + * prevent possible changes on it. + */ +static uint32 +ColocationIdForNewTable(Oid relationId, Var *distributionColumn, + char distributionMethod, char replicationModel, + char *colocateWithTableName, bool viaDeprecatedAPI) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + + if (viaDeprecatedAPI) + { + return colocationId; + } + else if (distributionMethod == DISTRIBUTE_BY_APPEND || + distributionMethod == DISTRIBUTE_BY_RANGE) + { + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + + return colocationId; + } + else if (distributionMethod == DISTRIBUTE_BY_NONE) + { + return CreateReferenceTableColocationId(); + } + else + { + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. + */ + Relation pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + Oid distributionColumnType = distributionColumn->vartype; + + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) + { + /* check for default colocation group */ + colocationId = ColocationId(ShardCount, ShardReplicationFactor, + distributionColumnType); + + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, + distributionColumnType); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + colocationId = GetNextColocationId(); + } + else + { + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + Oid sourceRelationId = ResolveRelationId(colocateWithTableNameText); + + EnsureTableCanBeColocatedWith(relationId, replicationModel, + distributionColumnType, sourceRelationId); + + colocationId = TableColocationId(sourceRelationId); + } + + heap_close(pgDistColocation, NoLock); + } + + return colocationId; +} + + +/* + * EnsureRelationCanBeDistributed checks whether Citus can safely distribute given + * relation with the given configuration. We perform almost all safety checks for + * distributing table here. If there is an unsatisfied requirement, we error out + * and do not distribute the table. + * + * This function assumes, callers have already acquried necessary locks to ensure + * there will not be any change in the given relation. + */ +static void +EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, + char distributionMethod, uint32 colocationId, + char replicationModel, bool viaDeprecatedAPI) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; - char relationKind = 0; - Var *distributionColumn = NULL; - - /* check global replication settings before continuing */ - EnsureReplicationSettings(InvalidOid, replicationModel); - - /* - * Lock target relation with an exclusive lock - there's no way to make - * sense of this table until we've committed, and we don't want multiple - * backends manipulating this relation. - */ - relation = relation_open(relationId, ExclusiveLock); - relationDesc = RelationGetDescr(relation); - relationName = RelationGetRelationName(relation); EnsureTableOwner(relationId); + EnsureTableNotDistributed(relationId); + EnsureLocalTableEmptyIfNecessary(relationId, distributionMethod, viaDeprecatedAPI); + EnsureReplicationSettings(InvalidOid, replicationModel); - /* check that the relation is not already distributed */ - if (IsDistributedTable(relationId)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("table \"%s\" is already distributed", - relationName))); - } + /* we assume callers took necessary locks */ + relation = relation_open(relationId, NoLock); + relationDesc = RelationGetDescr(relation); + relationName = RelationGetRelationName(relation); /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ if (relationDesc->tdhasoid) @@ -348,25 +586,7 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "(OIDS) option in their definitions."))); } - /* verify target relation is either regular or foreign table */ - relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", - relationName), - errdetail("Distributed relations must be regular or " - "foreign tables."))); - } - -#if (PG_VERSION_NUM >= 100000) - if (relation->rd_rel->relispartition) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("cannot distribute relation: %s", relationName), - errdetail("Distributing partition tables is unsupported."))); - } - + /* verify target relation does not use identity columns */ if (RelationUsesIdentityColumns(relationDesc)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -374,14 +594,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, errdetail("Distributed relations must not use GENERATED " "... AS IDENTITY."))); } -#endif - - /* - * Distribution column returns NULL for reference tables, - * but it is not used below for reference tables. - */ - distributionColumn = BuildDistributionKeyFromColumnName(relation, - distributionColumnName); /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) @@ -417,18 +629,193 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, ErrorIfUnsupportedConstraint(relation, distributionMethod, distributionColumn, colocationId); - InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, - colocationId, replicationModel); - relation_close(relation, NoLock); +} - /* - * PostgreSQL supports truncate trigger for regular relations only. - * Truncate on foreign tables is not supported. - */ - if (relationKind == RELKIND_RELATION) + +/* + * EnsureTableCanBeColocatedWith checks whether a given replication model and + * distribution column type is suitable to distribute a table to be colocated + * with given source table. + * + * We only pass relationId to provide meaningful error messages. + */ +static void +EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, + Oid distributionColumnType, Oid sourceRelationId) +{ + DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId); + char sourceDistributionMethod = sourceTableEntry->partitionMethod; + char sourceReplicationModel = sourceTableEntry->replicationModel; + Var *sourceDistributionColumn = DistPartitionKey(sourceRelationId); + Oid sourceDistributionColumnType = InvalidOid; + + if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) { - CreateTruncateTrigger(relationId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + + if (sourceReplicationModel != replicationModel) + { + char *relationName = get_rel_name(relationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, relationName), + errdetail("Replication models don't match for %s and %s.", + sourceRelationName, relationName))); + } + + sourceDistributionColumnType = sourceDistributionColumn->vartype; + if (sourceDistributionColumnType != distributionColumnType) + { + char *relationName = get_rel_name(relationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, relationName), + errdetail("Distribution column types don't match for " + "%s and %s.", sourceRelationName, + relationName))); + } +} + + +/* + * EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user + * and creates the schema of the given relationId. The function errors out if the + * command cannot be executed in any of the worker nodes. + */ +static void +EnsureSchemaExistsOnAllNodes(Oid relationId) +{ + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + StringInfo applySchemaCreationDDL = makeStringInfo(); + + Oid schemaId = get_rel_namespace(relationId); + const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId); + uint64 connectionFlag = FORCE_NEW_CONNECTION; + + if (createSchemaDDL == NULL) + { + return; + } + + appendStringInfo(applySchemaCreationDDL, "%s", createSchemaDDL); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, NULL, + NULL); + + ExecuteCriticalRemoteCommand(connection, applySchemaCreationDDL->data); + } +} + + +/* + * EnsureLocalTableEmptyIfNecessary only checks for emptiness if only an empty + * relation can be distributed in given configuration. + * + * In some cases, it is possible and safe to send local data to shards while + * distributing the table. In those cases, we can distribute non-empty local + * tables. This function checks the distributionMethod and relation kind to + * see whether we need to be ensure emptiness of local table. If we need to + * be sure, this function calls EnsureLocalTableEmpty function to ensure + * that local table does not contain any data. + */ +static void +EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMethod, + bool viaDepracatedAPI) +{ + if (viaDepracatedAPI) + { + EnsureLocalTableEmpty(relationId); + } + else if (distributionMethod != DISTRIBUTE_BY_HASH && + distributionMethod != DISTRIBUTE_BY_NONE) + { + EnsureLocalTableEmpty(relationId); + } + else if (!RegularTable(relationId)) + { + EnsureLocalTableEmpty(relationId); + } +} + + +/* + * EnsureLocalTableEmpty errors out if the local table is not empty. + */ +static void +EnsureLocalTableEmpty(Oid relationId) +{ + char *relationName = get_rel_name(relationId); + bool localTableEmpty = LocalTableEmpty(relationId); + + if (!localTableEmpty) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", relationName), + errdetail("Relation \"%s\" contains data.", relationName), + errhint("Empty your table before distributing it."))); + } +} + + +/* + * EnsureTableNotDistributed errors out if the table is distributed. + */ +static void +EnsureTableNotDistributed(Oid relationId) +{ + char *relationName = get_rel_name(relationId); + bool isDistributedTable = false; + + isDistributedTable = IsDistributedTable(relationId); + + if (isDistributedTable) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("table \"%s\" is already distributed", + relationName))); + } +} + + +/* + * EnsureReplicationSettings checks whether the current replication factor + * setting is compatible with the replication model. This function errors + * out if caller tries to use streaming replication with more than one + * replication factor. + */ +void +EnsureReplicationSettings(Oid relationId, char replicationModel) +{ + char *msgSuffix = "the streaming replication model"; + char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; + + if (relationId != InvalidOid) + { + msgSuffix = "tables which use the streaming replication model"; + extraHint = ""; + } + + if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("replication factors above one are incompatible with %s", + msgSuffix), + errhint("Try again after reducing \"citus.shard_replication_" + "factor\" to one%s.", extraHint))); } } @@ -604,256 +991,24 @@ CreateTruncateTrigger(Oid relationId) /* - * CreateHashDistributedTable creates a hash distributed table. + * RegularTable function returns true if given table's relation kind is RELKIND_RELATION + * (or RELKIND_PARTITIONED_TABLE for PG >= 10), otherwise it returns false. */ -static void -CreateHashDistributedTable(Oid relationId, char *distributionColumnName, - char *colocateWithTableName, int shardCount, - int replicationFactor) +bool +RegularTable(Oid relationId) { - Relation distributedRelation = NULL; - Relation pgDistColocation = NULL; - uint32 colocationId = INVALID_COLOCATION_ID; - Oid sourceRelationId = InvalidOid; - Oid distributionColumnType = InvalidOid; - bool useExclusiveConnection = false; - char relationKind = 0; + char relationKind = get_rel_relkind(relationId); - /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ - distributedRelation = relation_open(relationId, AccessShareLock); - - /* - * Get an exclusive lock on the colocation system catalog. Therefore, we - * can be sure that there will no modifications on the colocation table - * until this transaction is committed. - */ - pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - - /* get distribution column data type */ - distributionColumnType = ColumnType(relationId, distributionColumnName); - - if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) - { - /* check for default colocation group */ - colocationId = ColocationId(shardCount, replicationFactor, - distributionColumnType); - if (colocationId == INVALID_COLOCATION_ID) - { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - } - else - { - sourceRelationId = ColocatedTableId(colocationId); - } - } - else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) - { - colocationId = GetNextColocationId(); - } - else - { - /* get colocation group of the target table */ - text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); - sourceRelationId = ResolveRelationId(colocateWithTableNameText); - - colocationId = TableColocationId(sourceRelationId); - } - - /* relax empty table requirement for regular (non-foreign) tables */ - relationKind = get_rel_relkind(relationId); +#if (PG_VERSION_NUM >= 100000) + if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE) +#else if (relationKind == RELKIND_RELATION) +#endif { - EnsureTableNotDistributed(relationId); - useExclusiveConnection = IsTransactionBlock() || !LocalTableEmpty(relationId); - } - else - { - EnsureTableNotDistributed(relationId); - EnsureLocalTableEmpty(relationId); + return true; } - /* create distributed table metadata */ - ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId); - - /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. - */ - EnsureSchemaExistsOnAllNodes(relationId); - - /* create shards */ - if (sourceRelationId != InvalidOid) - { - /* first run checks */ - CheckReplicationModel(sourceRelationId, relationId); - CheckDistributionColumnType(sourceRelationId, relationId); - - - CreateColocatedShards(relationId, sourceRelationId, useExclusiveConnection); - } - else - { - CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor, - useExclusiveConnection); - } - - /* copy over data for regular relations */ - if (relationKind == RELKIND_RELATION) - { - CopyLocalDataIntoShards(relationId); - } - - heap_close(pgDistColocation, NoLock); - relation_close(distributedRelation, NoLock); -} - - -/* - * EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user - * and creates the schema of the given relationId. The function errors out if the - * command cannot be executed in any of the worker nodes. - */ -static void -EnsureSchemaExistsOnAllNodes(Oid relationId) -{ - List *workerNodeList = ActivePrimaryNodeList(); - ListCell *workerNodeCell = NULL; - StringInfo applySchemaCreationDDL = makeStringInfo(); - - Oid schemaId = get_rel_namespace(relationId); - const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId); - uint64 connectionFlag = FORCE_NEW_CONNECTION; - - if (createSchemaDDL == NULL) - { - return; - } - - appendStringInfo(applySchemaCreationDDL, "%s", createSchemaDDL); - - foreach(workerNodeCell, workerNodeList) - { - WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - MultiConnection *connection = - GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, NULL, - NULL); - - ExecuteCriticalRemoteCommand(connection, applySchemaCreationDDL->data); - } -} - - -/* - * EnsureLocalTableEmpty errors out if the local table is not empty. - */ -static void -EnsureLocalTableEmpty(Oid relationId) -{ - bool localTableEmpty = false; - char *relationName = get_rel_name(relationId); - - localTableEmpty = LocalTableEmpty(relationId); - - if (!localTableEmpty) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot distribute relation \"%s\"", relationName), - errdetail("Relation \"%s\" contains data.", relationName), - errhint("Empty your table before distributing it."))); - } -} - - -/* - * EnsureIsTableId errors out if the id is not belong to a regular of foreign table. - */ -static void -EnsureIsTableId(Oid relationId) -{ - Relation relation = relation_open(relationId, AccessShareLock); - char *relationName = get_rel_name(relationId); - char relationKind = 0; - - /* verify target relation is either regular or foreign table */ - relationKind = relation->rd_rel->relkind; - if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular or foreign table", - relationName))); - } - - relation_close(relation, NoLock); -} - - -/* - * EnsureTableNotDistributed errors out if the relationId doesn't belong to regular or foreign table - * or the table is distributed. - */ -static void -EnsureTableNotDistributed(Oid relationId) -{ - char *relationName = get_rel_name(relationId); - bool isDistributedTable = false; - - EnsureIsTableId(relationId); - isDistributedTable = IsDistributedTable(relationId); - - if (isDistributedTable) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("table \"%s\" is already distributed", - relationName))); - } -} - - -/* - * ColumnType returns the column type of the given column. - */ -static Oid -ColumnType(Oid relationId, char *columnName) -{ - AttrNumber columnIndex = get_attnum(relationId, columnName); - Oid columnType = get_atttype(relationId, columnIndex); - - return columnType; -} - - -/* - * Check that the current replication factor setting is compatible with the - * replication model of relationId, if valid. If InvalidOid, check that the - * global replication model setting instead. Errors out if an invalid state - * is detected. - */ -void -EnsureReplicationSettings(Oid relationId, char replicationModel) -{ - char *msgSuffix = "the streaming replication model"; - char *extraHint = " or setting \"citus.replication_model\" to \"statement\""; - - if (relationId != InvalidOid) - { - msgSuffix = "tables which use the streaming replication model"; - extraHint = ""; - } - - if (replicationModel == REPLICATION_MODEL_STREAMING && ShardReplicationFactor != 1) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), - errmsg("replication factors above one are incompatible with %s", - msgSuffix), - errhint("Try again after reducing \"citus.shard_replication_" - "factor\" to one%s.", extraHint))); - } + return false; } @@ -1014,10 +1169,10 @@ TupleDescColumnNameList(TupleDesc tupleDescriptor) * RelationUsesIdentityColumns returns whether a given relation uses the SQL * GENERATED ... AS IDENTITY features supported as of PostgreSQL 10. */ -#if (PG_VERSION_NUM >= 100000) static bool RelationUsesIdentityColumns(TupleDesc relationDesc) { +#if (PG_VERSION_NUM >= 100000) int attributeIndex = 0; for (attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++) @@ -1029,9 +1184,7 @@ RelationUsesIdentityColumns(TupleDesc relationDesc) return true; } } +#endif return false; } - - -#endif diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c index d983325b7..3ef550891 100644 --- a/src/backend/distributed/master/master_expire_table_cache.c +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -183,7 +183,7 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval } } - if (relationKind == RELKIND_RELATION) + if (RegularTable(relationId)) { appendStringInfo(workerCommand, DROP_REGULAR_TABLE_COMMAND, shardNames->data); } diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 699de72db..c2a763c1c 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -721,7 +721,7 @@ ShardStorageType(Oid relationId) char shardStorageType = 0; char relationType = get_rel_relkind(relationId); - if (relationType == RELKIND_RELATION) + if (RegularTable(relationId)) { shardStorageType = SHARD_STORAGE_TABLE; } diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 1774895f8..1de50bdab 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -389,7 +389,7 @@ RecreateTableDDLCommandList(Oid relationId) bool includeSequenceDefaults = false; /* build appropriate DROP command based on relation kind */ - if (relationKind == RELKIND_RELATION) + if (RegularTable(relationId)) { appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND, qualifiedRelationName); diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index a543669e6..19d340187 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -36,6 +36,7 @@ #include "distributed/citus_ruleutils.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/relay_utility.h" +#include "distributed/master_metadata_utility.h" #include "foreign/foreign.h" #include "lib/stringinfo.h" #include "nodes/nodes.h" @@ -59,7 +60,6 @@ static void AppendOptionListToString(StringInfo stringData, List *options); -static bool SupportedRelationKind(Relation relation); static const char * convert_aclright_to_string(int aclright); @@ -291,7 +291,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) AttrNumber constraintIndex = 0; AttrNumber constraintCount = 0; StringInfoData buffer = { NULL, 0, 0, 0 }; - bool supportedRelationKind = false; /* * Instead of retrieving values from system catalogs as other functions in @@ -304,13 +303,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) relation = relation_open(tableRelationId, AccessShareLock); relationName = generate_relation_name(tableRelationId, NIL); - supportedRelationKind = SupportedRelationKind(relation); - if (!supportedRelationKind) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular, foreign, or partitioned table", - relationName))); - } + EnsureRelationKindSupported(tableRelationId); initStringInfo(&buffer); @@ -491,25 +484,33 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) /* - * SupportedRelationKind returns true if the given relation is supported as a - * distributed relation. + * EnsureRelationKindSupported errors out if the given relation is not supported + * as a distributed relation. */ -static bool -SupportedRelationKind(Relation relation) +void +EnsureRelationKindSupported(Oid relationId) { - char relationKind = relation->rd_rel->relkind; - bool supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind == - RELKIND_FOREIGN_TABLE); -#if (PG_VERSION_NUM >= 100000) - supportedRelationKind = supportedRelationKind || relationKind == - RELKIND_PARTITIONED_TABLE; -#endif + char relationKind = get_rel_relkind(relationId); + bool supportedRelationKind = false; - /* Citus doesn't support bare inhereted tables (i.e., not a partition or partitioned table) */ - supportedRelationKind = supportedRelationKind && !(IsChildTable(relation->rd_id) || - IsParentTable(relation->rd_id)); + supportedRelationKind = RegularTable(relationId) || + relationKind == RELKIND_FOREIGN_TABLE; - return supportedRelationKind; + /* + * Citus doesn't support bare inherited tables (i.e., not a partition or + * partitioned table) + */ + supportedRelationKind = supportedRelationKind && !(IsChildTable(relationId) || + IsParentTable(relationId)); + + if (!supportedRelationKind) + { + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("%s is not a regular, foreign or partitioned table", + relationName))); + } } @@ -523,15 +524,12 @@ char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) { Relation relation = NULL; - char *relationName = NULL; - char relationKind = 0; TupleDesc tupleDescriptor = NULL; AttrNumber attributeIndex = 0; List *columnOptionList = NIL; ListCell *columnOptionCell = NULL; bool firstOptionPrinted = false; StringInfoData buffer = { NULL, 0, 0, 0 }; - bool supportedRelationKind = false; /* * Instead of retrieving values from system catalogs, we open the relation, @@ -539,22 +537,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId) * This is primarily to maintain symmetry with pg_get_tableschemadef. */ relation = relation_open(tableRelationId, AccessShareLock); - relationName = generate_relation_name(tableRelationId, NIL); - relationKind = relation->rd_rel->relkind; - supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind == - RELKIND_FOREIGN_TABLE); -#if (PG_VERSION_NUM >= 100000) - supportedRelationKind = supportedRelationKind || relationKind == - RELKIND_PARTITIONED_TABLE; -#endif - - if (!supportedRelationKind) - { - ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), - errmsg("%s is not a regular or foreign table or partitioned", - relationName))); - } + EnsureRelationKindSupported(tableRelationId); /* * Iterate over the table's columns. If a particular column is not dropped diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 53e59b8fc..e12ce4313 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -904,6 +904,15 @@ ColocatedTableId(Oid colocationId) ScanKeyData scanKey[1]; int scanKeyCount = 1; + /* + * We may have a distributed table whose colocation id is INVALID_COLOCATION_ID. + * In this case, we do not want to send that table's id as colocated table id. + */ + if (colocationId == INVALID_COLOCATION_ID) + { + return colocatedTableId; + } + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 213dec2fa..0e14c3ed7 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation); +extern void EnsureRelationKindSupported(Oid relationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, int64 shardid, StringInfo buffer); diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index c305cc0ba..8f0eba6c3 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -139,6 +139,9 @@ extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void DeleteShardPlacementRow(uint64 placementId); extern void UpdateColocationGroupReplicationFactor(uint32 colocationId, int replicationFactor); +extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, + char distributionMethod, char *colocateWithTableName, + bool viaDeprecatedAPI); extern void CreateTruncateTrigger(Oid relationId); /* Remaining metadata utility functions */ @@ -147,6 +150,7 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSuperUser(void); extern void EnsureReplicationSettings(Oid relationId, char replicationModel); +extern bool RegularTable(Oid relationId); extern bool TableReferenced(Oid relationId); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/test/regress/expected/multi_create_shards.out b/src/test/regress/expected/multi_create_shards.out index 558a7584e..a3f221e70 100644 --- a/src/test/regress/expected/multi_create_shards.out +++ b/src/test/regress/expected/multi_create_shards.out @@ -48,7 +48,7 @@ DETAIL: Distributed relations must not specify the WITH (OIDS) option in their ALTER TABLE table_to_distribute SET WITHOUT OIDS; -- use an index instead of table name SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); -ERROR: table_to_distribute_pkey is not a regular or foreign table +ERROR: table_to_distribute_pkey is not a regular, foreign or partitioned table -- use a bad column name SELECT master_create_distributed_table('table_to_distribute', 'bad_column', 'hash'); ERROR: column "bad_column" of relation "table_to_distribute" does not exist diff --git a/src/test/regress/expected/multi_generate_ddl_commands.out b/src/test/regress/expected/multi_generate_ddl_commands.out index d993abf29..829e8b8f7 100644 --- a/src/test/regress/expected/multi_generate_ddl_commands.out +++ b/src/test/regress/expected/multi_generate_ddl_commands.out @@ -146,7 +146,7 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; SELECT table_ddl_command_array('local_view'); -ERROR: public.local_view is not a regular, foreign, or partitioned table +ERROR: local_view is not a regular, foreign or partitioned table -- clean up DROP VIEW IF EXISTS local_view; DROP FOREIGN TABLE IF EXISTS foreign_table; diff --git a/src/test/regress/expected/multi_partitioning_utils.out b/src/test/regress/expected/multi_partitioning_utils.out index 39a36c805..00f849b71 100644 --- a/src/test/regress/expected/multi_partitioning_utils.out +++ b/src/test/regress/expected/multi_partitioning_utils.out @@ -415,8 +415,8 @@ SELECT table_inherited('date_partitioned_table'); -- also these are not supported SELECT master_get_table_ddl_events('capitals'); -ERROR: public.capitals is not a regular, foreign, or partitioned table +ERROR: capitals is not a regular, foreign or partitioned table SELECT master_get_table_ddl_events('cities'); -ERROR: public.cities is not a regular, foreign, or partitioned table +ERROR: cities is not a regular, foreign or partitioned table -- dropping parents frop the partitions DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals; diff --git a/src/test/regress/expected/multi_partitioning_utils_0.out b/src/test/regress/expected/multi_partitioning_utils_0.out index a15d197a5..8f4906f48 100644 --- a/src/test/regress/expected/multi_partitioning_utils_0.out +++ b/src/test/regress/expected/multi_partitioning_utils_0.out @@ -355,9 +355,9 @@ LINE 1: SELECT table_inherited('date_partitioned_table'); ^ -- also these are not supported SELECT master_get_table_ddl_events('capitals'); -ERROR: public.capitals is not a regular, foreign, or partitioned table +ERROR: capitals is not a regular, foreign or partitioned table SELECT master_get_table_ddl_events('cities'); -ERROR: public.cities is not a regular, foreign, or partitioned table +ERROR: cities is not a regular, foreign or partitioned table -- dropping parents frop the partitions DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals; ERROR: table "date_partitioned_table" does not exist From 37b200a52ec952933494aac379f183e7422c45c6 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Mon, 31 Jul 2017 00:52:11 +0300 Subject: [PATCH 2/2] Fix broken isolation tests We try to run our isolation tests paralles as much as possible. In some of those isolation tests we used same table name which causes problem while running them in paralles. This commit changes table names in those tests to ensure tests can run in parallel. --- .../expected/isolation_concurrent_dml.out | 8 +- ...olation_copy_placement_vs_modification.out | 78 +++++++++---------- .../expected/isolation_dml_vs_repair.out | 60 +++++++------- src/test/regress/isolation_schedule | 3 +- .../specs/isolation_concurrent_dml.spec | 12 +-- ...lation_copy_placement_vs_modification.spec | 28 +++---- .../specs/isolation_dml_vs_repair.spec | 30 +++---- 7 files changed, 110 insertions(+), 109 deletions(-) diff --git a/src/test/regress/expected/isolation_concurrent_dml.out b/src/test/regress/expected/isolation_concurrent_dml.out index a5e6a1737..f7f8fc9ee 100644 --- a/src/test/regress/expected/isolation_concurrent_dml.out +++ b/src/test/regress/expected/isolation_concurrent_dml.out @@ -8,10 +8,10 @@ step s1-begin: BEGIN; step s1-insert: - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); step s2-update: - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; step s1-commit: COMMIT; @@ -23,8 +23,8 @@ master_create_worker_shards step s1-insert: - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); step s2-update: - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; diff --git a/src/test/regress/expected/isolation_copy_placement_vs_modification.out b/src/test/regress/expected/isolation_copy_placement_vs_modification.out index 6a6fbfd83..523f87f80 100644 --- a/src/test/regress/expected/isolation_copy_placement_vs_modification.out +++ b/src/test/regress/expected/isolation_copy_placement_vs_modification.out @@ -2,16 +2,16 @@ Parsed test spec with 2 sessions starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -29,7 +29,7 @@ master_copy_shard_placement step s1-update: - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: COMMIT; @@ -42,7 +42,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -55,16 +55,16 @@ nodeport success result starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -82,7 +82,7 @@ master_copy_shard_placement step s1-delete: - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; step s2-commit: COMMIT; @@ -95,7 +95,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -108,13 +108,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -132,7 +132,7 @@ master_copy_shard_placement step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s2-commit: COMMIT; @@ -145,7 +145,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -158,13 +158,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -182,7 +182,7 @@ master_copy_shard_placement step s1-copy: - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: COMMIT; @@ -195,7 +195,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -208,13 +208,13 @@ nodeport success result starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count step s1-load-cache: - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -232,7 +232,7 @@ master_copy_shard_placement step s1-ddl: - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); step s2-commit: COMMIT; @@ -245,7 +245,7 @@ step s2-print-index-count: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; @@ -258,13 +258,13 @@ nodeport success result starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -282,7 +282,7 @@ master_copy_shard_placement step s1-update: - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; step s2-commit: COMMIT; @@ -295,7 +295,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -308,13 +308,13 @@ nodeport success result starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -332,7 +332,7 @@ master_copy_shard_placement step s1-delete: - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; step s2-commit: COMMIT; @@ -345,7 +345,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -361,7 +361,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -379,7 +379,7 @@ master_copy_shard_placement step s1-insert: - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); step s2-commit: COMMIT; @@ -392,7 +392,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -408,7 +408,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -426,7 +426,7 @@ master_copy_shard_placement step s1-copy: - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; step s2-commit: COMMIT; @@ -439,7 +439,7 @@ step s2-print-content: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -455,7 +455,7 @@ step s1-begin: BEGIN; step s1-select: - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; count @@ -473,7 +473,7 @@ master_copy_shard_placement step s1-ddl: - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); step s2-commit: COMMIT; @@ -486,7 +486,7 @@ step s2-print-index-count: SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; diff --git a/src/test/regress/expected/isolation_dml_vs_repair.out b/src/test/regress/expected/isolation_dml_vs_repair.out index 543929531..19c01a48d 100644 --- a/src/test/regress/expected/isolation_dml_vs_repair.out +++ b/src/test/regress/expected/isolation_dml_vs_repair.out @@ -5,16 +5,16 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-begin: BEGIN; step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); step s1-commit: COMMIT; @@ -29,19 +29,19 @@ master_create_worker_shards step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-begin: BEGIN; step s1-insertall: - INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); step s1-commit: COMMIT; @@ -56,41 +56,41 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s2-commit: COMMIT; step s1-insertone: <... completed> step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data @@ -101,7 +101,7 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-prepared-insertone: EXECUTE insertone; @@ -110,7 +110,7 @@ step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement @@ -124,22 +124,22 @@ step s2-commit: step s1-prepared-insertone: <... completed> error in steps s2-commit s1-prepared-insertone: ERROR: prepared modifications cannot be executed on a shard while it is being copied step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data @@ -150,10 +150,10 @@ master_create_worker_shards step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s1-insertone: - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); step s1-prepared-insertall: EXECUTE insertall; @@ -162,7 +162,7 @@ step s2-begin: BEGIN; step s2-repair: - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); master_copy_shard_placement @@ -176,23 +176,23 @@ step s2-commit: step s1-prepared-insertall: <... completed> error in steps s2-commit s1-prepared-insertall: ERROR: prepared modifications cannot be executed on a shard while it is being copied step s2-invalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data 1 1 1 2 step s2-invalidate-57637: - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; step s2-revalidate-57638: - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; step s1-display: - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; test_id data diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index e373f8544..f7a665fd7 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -5,7 +5,8 @@ test: isolation_add_node_vs_reference_table_operations # that come later can be parallelized test: isolation_cluster_management -test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation +test: isolation_dml_vs_repair +test: isolation_copy_placement_vs_copy_placement isolation_cancellation test: isolation_concurrent_dml isolation_data_migration test: isolation_drop_shards isolation_copy_placement_vs_modification test: isolation_insert_vs_vacuum isolation_transaction_recovery diff --git a/src/test/regress/specs/isolation_concurrent_dml.spec b/src/test/regress/specs/isolation_concurrent_dml.spec index 3288d7521..74c86583f 100644 --- a/src/test/regress/specs/isolation_concurrent_dml.spec +++ b/src/test/regress/specs/isolation_concurrent_dml.spec @@ -1,13 +1,13 @@ setup { - CREATE TABLE test_table (test_id integer NOT NULL, data text); - SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); - SELECT master_create_worker_shards('test_table', 4, 2); + CREATE TABLE test_concurrent_dml (test_id integer NOT NULL, data text); + SELECT master_create_distributed_table('test_concurrent_dml', 'test_id', 'hash'); + SELECT master_create_worker_shards('test_concurrent_dml', 4, 2); } teardown { - DROP TABLE IF EXISTS test_table CASCADE; + DROP TABLE IF EXISTS test_concurrent_dml CASCADE; } session "s1" @@ -19,7 +19,7 @@ step "s1-begin" step "s1-insert" { - INSERT INTO test_table VALUES(1); + INSERT INTO test_concurrent_dml VALUES(1); } step "s1-commit" @@ -31,7 +31,7 @@ session "s2" step "s2-update" { - UPDATE test_table SET data = 'blarg' WHERE test_id = 1; + UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1; } # verify that an in-progress insert blocks concurrent updates diff --git a/src/test/regress/specs/isolation_copy_placement_vs_modification.spec b/src/test/regress/specs/isolation_copy_placement_vs_modification.spec index 201d4eca8..8ab9fe84b 100644 --- a/src/test/regress/specs/isolation_copy_placement_vs_modification.spec +++ b/src/test/regress/specs/isolation_copy_placement_vs_modification.spec @@ -4,15 +4,15 @@ setup { SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 2; - CREATE TABLE test_table (x int, y int); - SELECT create_distributed_table('test_table', 'x'); + CREATE TABLE test_copy_placement_vs_modification (x int, y int); + SELECT create_distributed_table('test_copy_placement_vs_modification', 'x'); - SELECT get_shard_id_for_distribution_column('test_table', 5) INTO selected_shard; + SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard; } teardown { - DROP TABLE test_table; + DROP TABLE test_copy_placement_vs_modification; DROP TABLE selected_shard; } @@ -23,41 +23,41 @@ step "s1-begin" BEGIN; } -# since test_table has rep > 1 simple select query doesn't hit all placements +# since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements # hence not all placements are cached step "s1-load-cache" { - TRUNCATE test_table; + TRUNCATE test_copy_placement_vs_modification; } step "s1-insert" { - INSERT INTO test_table VALUES (5, 10); + INSERT INTO test_copy_placement_vs_modification VALUES (5, 10); } step "s1-update" { - UPDATE test_table SET y = 5 WHERE x = 5; + UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5; } step "s1-delete" { - DELETE FROM test_table WHERE x = 5; + DELETE FROM test_copy_placement_vs_modification WHERE x = 5; } step "s1-select" { - SELECT count(*) FROM test_table WHERE x = 5; + SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5; } step "s1-ddl" { - CREATE INDEX test_table_index ON test_table(x); + CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x); } step "s1-copy" { - COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; + COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV; } step "s1-commit" @@ -92,7 +92,7 @@ step "s2-print-content" SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select y from %s WHERE x = 5') + run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5') WHERE shardid IN (SELECT * FROM selected_shard) ORDER BY @@ -104,7 +104,7 @@ step "s2-print-index-count" SELECT nodeport, success, result FROM - run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') + run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''') ORDER BY nodeport; } diff --git a/src/test/regress/specs/isolation_dml_vs_repair.spec b/src/test/regress/specs/isolation_dml_vs_repair.spec index 6da20e7d9..7a89b41df 100644 --- a/src/test/regress/specs/isolation_dml_vs_repair.spec +++ b/src/test/regress/specs/isolation_dml_vs_repair.spec @@ -1,13 +1,13 @@ setup { - CREATE TABLE test_table (test_id integer NOT NULL, data int); - SELECT master_create_distributed_table('test_table', 'test_id', 'hash'); - SELECT master_create_worker_shards('test_table', 1, 2); + CREATE TABLE test_dml_vs_repair (test_id integer NOT NULL, data int); + SELECT master_create_distributed_table('test_dml_vs_repair', 'test_id', 'hash'); + SELECT master_create_worker_shards('test_dml_vs_repair', 1, 2); } teardown { - DROP TABLE IF EXISTS test_table CASCADE; + DROP TABLE IF EXISTS test_dml_vs_repair CASCADE; } session "s1" @@ -15,9 +15,9 @@ session "s1" setup { DEALLOCATE all; - TRUNCATE test_table; - PREPARE insertone AS INSERT INTO test_table VALUES(1, 1); - PREPARE insertall AS INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + TRUNCATE test_dml_vs_repair; + PREPARE insertone AS INSERT INTO test_dml_vs_repair VALUES(1, 1); + PREPARE insertall AS INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; } step "s1-begin" @@ -27,7 +27,7 @@ step "s1-begin" step "s1-insertone" { - INSERT INTO test_table VALUES(1, 1); + INSERT INTO test_dml_vs_repair VALUES(1, 1); } step "s1-prepared-insertone" @@ -37,7 +37,7 @@ step "s1-prepared-insertone" step "s1-insertall" { - INSERT INTO test_table SELECT test_id, data+1 FROM test_table; + INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair; } step "s1-prepared-insertall" @@ -47,7 +47,7 @@ step "s1-prepared-insertall" step "s1-display" { - SELECT * FROM test_table WHERE test_id = 1; + SELECT * FROM test_dml_vs_repair WHERE test_id = 1; } step "s1-commit" @@ -65,27 +65,27 @@ step "s2-begin" step "s2-invalidate-57637" { - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; } step "s2-revalidate-57637" { - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637; } step "s2-invalidate-57638" { - UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; } step "s2-revalidate-57638" { - UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638; + UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638; } step "s2-repair" { - SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637); + SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637); } step "s2-commit"