From be5b633e3041c9674b933fdcad6af41b7050ccee Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Thu, 1 Dec 2016 18:06:43 +0200 Subject: [PATCH 1/4] Add colocate_with option to create_distributed_table() With this commit, we support three versions of colocate_with: i.default, ii.none and iii. a specific table name. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-3--6.1-4.sql | 29 ++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 157 ++++++--- .../distributed/utils/colocation_utils.c | 13 +- .../distributed/utils/metadata_cache.c | 5 +- src/include/distributed/colocation_utils.h | 5 +- src/include/distributed/pg_dist_colocation.h | 4 +- .../expected/multi_colocation_utils.out | 323 ++++++++++++++---- .../expected/multi_distribution_metadata.out | 2 +- src/test/regress/expected/multi_extension.out | 1 + .../expected/multi_metadata_snapshot.out | 4 +- .../regress/sql/multi_colocation_utils.sql | 76 ++++- src/test/regress/sql/multi_extension.sql | 1 + 14 files changed, 506 insertions(+), 120 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-3--6.1-4.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 84801a1c5..0feecd06c 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 + 6.1-1 6.1-2 6.1-3 6.1-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -101,6 +101,8 @@ $(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql cat $^ > $@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql cat $^ > $@ +$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-3--6.1-4.sql b/src/backend/distributed/citus--6.1-3--6.1-4.sql new file mode 100644 index 000000000..15294e3b2 --- /dev/null +++ b/src/backend/distributed/citus--6.1-3--6.1-4.sql @@ -0,0 +1,29 @@ +/* citus--6.1-3--6.1-4.sql */ + +SET search_path = 'pg_catalog'; + +ALTER TABLE pg_dist_colocation ADD COLUMN defaultgroup BOOLEAN; + +UPDATE pg_dist_colocation SET defaultgroup = TRUE; + +DROP INDEX pg_dist_colocation_configuration_index; + +CREATE INDEX pg_dist_colocation_configuration_index +ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype, defaultgroup); + +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text) + IS 'creates a distributed table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 27e416df9..04ef361d3 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-3' +default_version = '6.1-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index b59add3fe..1cad39e28 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -63,6 +63,7 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); +static Oid ColumnType(Oid relationId, char *columnName); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, @@ -76,10 +77,9 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static uint32 ColocationId(int shardCount, int replicationFactor, - Oid distributionColumnType); static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor); @@ -123,10 +123,29 @@ create_distributed_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); + text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + /* check if we try to colocate with hash distributed tables */ + char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + { + Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); + char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + + if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || + distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + } + /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { @@ -135,8 +154,9 @@ create_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - /* use configuration values for shard count and shard replication factor*/ - CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, + /* use configuration values for shard count and shard replication factor */ + CreateHashDistributedTable(relationId, distributionColumnName, + colocateWithTableName, ShardCount, ShardReplicationFactor); PG_RETURN_VOID(); @@ -154,6 +174,7 @@ create_reference_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); int shardCount = 1; AttrNumber firstColumnAttrNumber = 1; + char *colocateWithTableName = "default"; char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); if (firstColumnName == NULL) @@ -164,8 +185,8 @@ create_reference_table(PG_FUNCTION_ARGS) "least one column", relationName))); } - CreateHashDistributedTable(relationId, firstColumnName, shardCount, - ShardReplicationFactor); + CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName, + shardCount, ShardReplicationFactor); PG_RETURN_VOID(); } @@ -725,6 +746,19 @@ LookupDistributionMethod(Oid distributionMethodOid) } +/* + * ColumnType returns the column type of the given column. + */ +static Oid +ColumnType(Oid relationId, char *columnName) +{ + AttrNumber columnIndex = get_attnum(relationId, columnName); + Oid columnType = get_atttype(relationId, columnIndex); + + return columnType; +} + + /* * SupportFunctionForColumn locates a support function given a column, an access method, * and and id of a support function. This function returns InvalidOid if there is no @@ -849,19 +883,22 @@ CreateTruncateTrigger(Oid relationId) /* - * ColocationId searches pg_dist_colocation for shard count, replication factor - * and distribution column type. If a matching entry is found, it returns the - * colocation id, otherwise it returns INVALID_COLOCATION_ID. + * DefaultColocationId searches pg_dist_colocation for the default colocation group + * with the given configuration: shard count, replication factor and distribution + * column type. If a matching entry is found, it returns the colocation id, + * otherwise it returns INVALID_COLOCATION_ID. */ -static uint32 -ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) +uint32 +DefaultColocationGroupId(int shardCount, int replicationFactor, + Oid distributionColumnType) { uint32 colocationId = INVALID_COLOCATION_ID; HeapTuple colocationTuple = NULL; SysScanDesc scanDescriptor; - const int scanKeyCount = 3; + const int scanKeyCount = 4; ScanKeyData scanKey[scanKeyCount]; bool indexOK = true; + bool defaulColocationGroup = true; Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); @@ -872,6 +909,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaulColocationGroup)); scanDescriptor = systable_beginscan(pgDistColocation, DistColocationConfigurationIndexId(), @@ -899,7 +938,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) * colocation id. */ uint32 -CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType, + bool defaultColocationGroup) { uint32 colocationId = GetNextColocationId(); Relation pgDistColocation = NULL; @@ -918,6 +958,8 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol UInt32GetDatum(replicationFactor); values[Anum_pg_dist_colocation_distributioncolumntype - 1] = ObjectIdGetDatum(distributionColumnType); + values[Anum_pg_dist_colocation_defaultgroup - 1] = + BoolGetDatum(defaultColocationGroup); /* open colocation relation and insert the new tuple */ pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); @@ -947,7 +989,7 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol * sequence created in initdb to generate unique identifiers. */ static uint32 -GetNextColocationId() +GetNextColocationId(void) { text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); @@ -977,19 +1019,17 @@ GetNextColocationId() */ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; - Var *distributionColumn = NULL; - Oid distributionColumnType = 0; uint32 colocationId = INVALID_COLOCATION_ID; + Oid colocationTableId = InvalidOid; + Oid distributionColumnType = InvalidOid; - /* get distribution column type */ + /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionColumnType = distributionColumn->vartype; /* * Get an exclusive lock on the colocation system catalog. Therefore, we @@ -998,35 +1038,66 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - /* check for existing colocations */ - colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + /* get distribution column data type */ + distributionColumnType = ColumnType(relationId, distributionColumnName); - /* - * If there is a colocation group for the current configuration, get a - * colocated table from the group and use its shards as a reference to - * create new shards. Otherwise, create a new colocation group and create - * shards with the default round robin algorithm. - */ - if (colocationId != INVALID_COLOCATION_ID) + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { - char *relationName = get_rel_name(relationId); - - Oid colocatedTableId = ColocatedTableId(colocationId); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); - - CreateColocatedShards(relationId, colocatedTableId); - ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", - relationName, colocationId))); + /* check for default colocation group */ + colocationId = DefaultColocationGroupId(shardCount, replicationFactor, + distributionColumnType); + if (colocationId == INVALID_COLOCATION_ID) + { + bool defaultColocationGroup = true; + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType, + defaultColocationGroup); + } + else + { + colocationTableId = ColocatedTableId(colocationId); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + bool defaultColocationGroup = false; + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType, + defaultColocationGroup); } else { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); + Var *colocationTablePartitionColumn = NULL; + Oid colocationTablePartitionColumnType = InvalidOid; - /* use the default way to create shards */ + /* get colocation group of the target table */ + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + colocationTableId = ResolveRelationId(colocateWithTableNameText); + + colocationId = TableColocationId(colocationTableId); + + colocationTablePartitionColumn = PartitionKey(colocationTableId); + colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype; + + if (colocationTablePartitionColumnType != distributionColumnType) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot colocate with %s", colocateWithTableName), + errdetail("Distribution column types are different."))); + } + } + + /* create distributed table metadata */ + ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, + colocationId); + + /* create shards */ + if (colocationTableId != InvalidOid) + { + CreateColocatedShards(relationId, colocationTableId); + } + else + { CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 432fedd27..b8d405967 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -91,6 +91,7 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) Var *targetDistributionColumn = NULL; Oid sourceDistributionColumnType = InvalidOid; Oid targetDistributionColumnType = InvalidOid; + bool defaultColocationGroup = false; CheckHashPartitionedTable(sourceRelationId); CheckHashPartitionedTable(targetRelationId); @@ -132,9 +133,19 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 shardCount = ShardIntervalCount(sourceRelationId); uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId); + uint32 defaultColocationId = INVALID_COLOCATION_ID; + + /* check if there is a default colocation group */ + defaultColocationId = DefaultColocationGroupId(shardCount, shardReplicationFactor, + sourceDistributionColumnType); + if (defaultColocationId == INVALID_COLOCATION_ID) + { + defaultColocationGroup = true; + } sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, - sourceDistributionColumnType); + sourceDistributionColumnType, + defaultColocationGroup); UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b48c62fa8..063a6f4fc 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId) } else { - ereport(ERROR, (errmsg("relation %u is not distributed", - distributedRelationId))); + char *relationName = get_rel_name(distributedRelationId); + ereport(ERROR, (errmsg("relation %s is not distributed", + relationName))); } } diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 09de91ee3..c2f4b49ba 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -18,6 +18,8 @@ #define INVALID_COLOCATION_ID 0 extern uint32 TableColocationId(Oid distributedTableId); +extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor, + Oid distributionColumnType); extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); @@ -26,7 +28,8 @@ extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, - Oid distributionColumnType); + Oid distributionColumnType, + bool defaultColocationGroup); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/pg_dist_colocation.h b/src/include/distributed/pg_dist_colocation.h index 985c1332d..8ef60fe8a 100644 --- a/src/include/distributed/pg_dist_colocation.h +++ b/src/include/distributed/pg_dist_colocation.h @@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_colocation uint32 shardcount; uint32 replicationfactor; Oid distributioncolumntype; + bool defaultgroup; } FormData_pg_dist_colocation; /* ---------------- @@ -35,11 +36,12 @@ typedef FormData_pg_dist_colocation *Form_pg_dist_colocation; * compiler constants for pg_dist_colocation * ---------------- */ -#define Natts_pg_dist_colocation 4 +#define Natts_pg_dist_colocation 5 #define Anum_pg_dist_colocation_colocationid 1 #define Anum_pg_dist_colocation_shardcount 2 #define Anum_pg_dist_colocation_replicationfactor 3 #define Anum_pg_dist_colocation_distributioncolumntype 4 +#define Anum_pg_dist_colocation_defaultgroup 5 #define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq" diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 7bd235aeb..933e8615d 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -432,12 +432,12 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 1 | 2 | 2 | 23 | t + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t (4 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -459,16 +459,16 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition -- check effects of dropping tables DROP TABLE table1_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 1; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 1 | 2 | 2 | 23 | t (1 row) -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 1; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) -- create dropped colocation group again @@ -504,6 +504,118 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); (1 row) +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + create_distributed_table +-------------------------- + +(1 row) + +SET citus.shard_count = 3; +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | f + 8 | 3 | 2 | 23 | f + 9 | 3 | 2 | 23 | t +(8 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 +(17 rows) + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +ERROR: relation table_postgresql is not distributed +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +ERROR: relation "no_table" does not exist +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +ERROR: invalid name syntax +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + create_distributed_table +-------------------------- + +(1 row) + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); +ERROR: cannot colocate with table1_groupE +DETAIL: Distribution column types are different. -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -535,17 +647,77 @@ SELECT create_reference_table('table2_groupF'); (1 row) -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 -(5 rows) + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | f + 8 | 3 | 2 | 23 | f + 9 | 3 | 2 | 23 | t + 10 | 1 | 2 | 23 | t +(9 rows) + +-- test mark_colocation_group_default() +SELECT mark_colocation_group_default(7); + mark_colocation_group_default +------------------------------- + +(1 row) + +SELECT mark_colocation_group_default(8); + mark_colocation_group_default +------------------------------- + +(1 row) + +-- check metadata after mark_colocation_group_default() is run +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | f + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | t + 8 | 3 | 2 | 23 | t + 9 | 3 | 2 | 23 | f + 10 | 1 | 2 | 23 | t +(9 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 + table1_groupf | 10 + table2_groupf | 10 +(19 rows) -- cross check with internal colocation API SELECT @@ -563,22 +735,27 @@ WHERE ORDER BY table1, table2; - table1 | table2 | colocated ----------------+----------------------------------+----------- - table1_group1 | table2_group1 | t - table1_groupb | table2_groupb | t - table1_groupc | table2_groupc | t - table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t - table1_groupe | table2_groupe | t - table1_groupe | table3_groupe | t - table1_groupe | schema_collocation.table4_groupe | t - table2_groupe | table3_groupe | t - table2_groupe | schema_collocation.table4_groupe | t - table3_groupe | schema_collocation.table4_groupe | t - table1_groupf | table2_groupf | t -(13 rows) + table1 | table2 | colocated +----------------------------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table1_groupe | table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table2_groupe | table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t + table3_groupe | table4_groupe | t + schema_collocation.table4_groupe | table4_groupe | t + table1_group_none_1 | table2_group_none_1 | t + table1_groupf | table2_groupf | t +(18 rows) -- check created shards SELECT @@ -653,11 +830,39 @@ ORDER BY schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 - table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 - table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 -(56 rows) + table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300060 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300060 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300061 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300061 | t | 57637 | 0 | 2147483647 + table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647 + table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647 + table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647 +(84 rows) -- reset colocation ids to test mark_tables_colocated ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -669,13 +874,13 @@ UPDATE pg_dist_partition SET colocationid = 0 SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -697,16 +902,16 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_gro ERROR: cannot colocate tables table1_groupb and table1_groupd DETAIL: Shard counts don't match for table1_groupb and table1_groupd. -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -753,13 +958,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 1 | 2 | 23 | t (5 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 31ada84da..0e664db86 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -156,7 +156,7 @@ SELECT partition_type('events_hash'); (1 row) SELECT partition_type('pg_type'); -ERROR: relation 1247 is not distributed +ERROR: relation pg_type is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 237a6808d..ae79c879c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; +ALTER EXTENSION citus UPDATE TO '6.1-4'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index f3ef9c6aa..ea018492c 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -238,8 +238,8 @@ Indexes: -- Check that pg_dist_colocation is not synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) -- Make sure that truncate trigger has been set for the MX table on worker diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 184d21173..53d761b73 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation; CREATE TABLE schema_collocation.table4_groupE ( id int ); SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + +SET citus.shard_count = 3; + +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); + +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); + -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -259,6 +306,19 @@ SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; +-- test mark_colocation_group_default() +SELECT mark_colocation_group_default(7); +SELECT mark_colocation_group_default(8); + +-- check metadata after mark_colocation_group_default() is run +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + -- cross check with internal colocation API SELECT p1.logicalrelid::regclass AS table1, @@ -304,13 +364,13 @@ UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- first check failing cases SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); @@ -320,13 +380,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- check successfully cololated tables SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index c67ec9a6d..209955668 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; +ALTER EXTENSION citus UPDATE TO '6.1-4'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) From b681843d844d162b125119d6c39c2ef1390e8ae5 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 6 Dec 2016 12:02:44 +0200 Subject: [PATCH 2/4] Move colocation related functions to colocation_utils.c --- .../commands/create_distributed_table.c | 135 --------------- .../distributed/utils/colocation_utils.c | 162 ++++++++++++++++-- src/include/distributed/colocation_utils.h | 6 +- 3 files changed, 152 insertions(+), 151 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 1cad39e28..e99e1c9f3 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,7 +9,6 @@ */ #include "postgres.h" -#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -33,7 +32,6 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" @@ -41,7 +39,6 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" -#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "executor/spi.h" #include "nodes/execnodes.h" @@ -77,7 +74,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, char *colocateWithTableName, int shardCount, int replicationFactor); @@ -882,137 +878,6 @@ CreateTruncateTrigger(Oid relationId) } -/* - * DefaultColocationId searches pg_dist_colocation for the default colocation group - * with the given configuration: shard count, replication factor and distribution - * column type. If a matching entry is found, it returns the colocation id, - * otherwise it returns INVALID_COLOCATION_ID. - */ -uint32 -DefaultColocationGroupId(int shardCount, int replicationFactor, - Oid distributionColumnType) -{ - uint32 colocationId = INVALID_COLOCATION_ID; - HeapTuple colocationTuple = NULL; - SysScanDesc scanDescriptor; - const int scanKeyCount = 4; - ScanKeyData scanKey[scanKeyCount]; - bool indexOK = true; - bool defaulColocationGroup = true; - - Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); - - /* set scan arguments */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); - ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); - ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup, - BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaulColocationGroup)); - - scanDescriptor = systable_beginscan(pgDistColocation, - DistColocationConfigurationIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - colocationTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(colocationTuple)) - { - Form_pg_dist_colocation colocationForm = - (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); - - colocationId = colocationForm->colocationid; - } - - systable_endscan(scanDescriptor); - heap_close(pgDistColocation, AccessShareLock); - - return colocationId; -} - - -/* - * CreateColocationGroup creates a new colocation id and writes it into - * pg_dist_colocation with the given configuration. It also returns the created - * colocation id. - */ -uint32 -CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType, - bool defaultColocationGroup) -{ - uint32 colocationId = GetNextColocationId(); - Relation pgDistColocation = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple heapTuple = NULL; - Datum values[Natts_pg_dist_colocation]; - bool isNulls[Natts_pg_dist_colocation]; - - /* form new colocation tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); - values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); - values[Anum_pg_dist_colocation_replicationfactor - 1] = - UInt32GetDatum(replicationFactor); - values[Anum_pg_dist_colocation_distributioncolumntype - 1] = - ObjectIdGetDatum(distributionColumnType); - values[Anum_pg_dist_colocation_defaultgroup - 1] = - BoolGetDatum(defaultColocationGroup); - - /* open colocation relation and insert the new tuple */ - pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); - - tupleDescriptor = RelationGetDescr(pgDistColocation); - heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - - simple_heap_insert(pgDistColocation, heapTuple); - CatalogUpdateIndexes(pgDistColocation, heapTuple); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); - heap_close(pgDistColocation, RowExclusiveLock); - - return colocationId; -} - - -/* - * GetNextColocationId allocates and returns a unique colocationId for the - * colocation group to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having - * colocationId collisions. - * - * Please note that the caller is still responsible for finalizing colocationId - * with the master node. Further note that this function relies on an internal - * sequence created in initdb to generate unique identifiers. - */ -static uint32 -GetNextColocationId(void) -{ - text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); - Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - Datum colocationIdDatum = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - - /* generate new and unique colocation id from sequence */ - colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - - colocationId = DatumGetUInt32(colocationIdDatum); - - return colocationId; -} - - /* * CreateHashDistributedTable creates a hash distributed table with given * shard count and shard replication factor. diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index b8d405967..8154193ea 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -10,20 +10,24 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,6 +40,7 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); +static uint32 GetNextColocationId(void); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); @@ -350,6 +355,150 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) } +/* + * TableColocationId function returns co-location id of given table. This function + * errors out if given table is not distributed. + */ +uint32 +TableColocationId(Oid distributedTableId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + + return cacheEntry->colocationId; +} + + +/* + * DefaultColocationId searches pg_dist_colocation for the default colocation group + * with the given configuration: shard count, replication factor and distribution + * column type. If a matching entry is found, it returns the colocation id, + * otherwise it returns INVALID_COLOCATION_ID. + */ +uint32 +DefaultColocationGroupId(int shardCount, int replicationFactor, + Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + const int scanKeyCount = 4; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + bool defaultColocationGroup = true; + + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaultColocationGroup)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. + */ +uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType, + bool defaultColocationGroup) +{ + uint32 colocationId = GetNextColocationId(); + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + values[Anum_pg_dist_colocation_defaultgroup - 1] = + BoolGetDatum(defaultColocationGroup); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, RowExclusiveLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +static uint32 +GetNextColocationId(void) +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} + + /* * UpdateRelationColocationGroup updates colocation group in pg_dist_partition * for the given relation. @@ -407,19 +556,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) } -/* - * TableColocationId function returns co-location id of given table. This function - * errors out if given table is not distributed. - */ -uint32 -TableColocationId(Oid distributedTableId) -{ - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - - return cacheEntry->colocationId; -} - - /* * TablesColocated function checks whether given two tables are co-located and * returns true if they are co-located. A table is always co-located with itself. diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index c2f4b49ba..de81b2d3b 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -20,6 +20,9 @@ extern uint32 TableColocationId(Oid distributedTableId); extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor, Oid distributionColumnType); +extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, + Oid distributionColumnType, + bool defaultColocationGroup); extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); @@ -27,9 +30,6 @@ extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); -extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, - Oid distributionColumnType, - bool defaultColocationGroup); #endif /* COLOCATION_UTILS_H_ */ From 18d8a72ca8da5916098320ae69cf15f7d9ed4735 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 6 Dec 2016 15:40:21 +0200 Subject: [PATCH 3/4] Add mark_colocation_group_default() This udf helps to update default colocation group. --- .../distributed/citus--6.1-3--6.1-4.sql | 47 +++++++++++++++++++ .../expected/multi_colocation_utils.out | 4 +- .../regress/sql/multi_colocation_utils.sql | 4 +- 3 files changed, 51 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/citus--6.1-3--6.1-4.sql b/src/backend/distributed/citus--6.1-3--6.1-4.sql index 15294e3b2..89d7c741e 100644 --- a/src/backend/distributed/citus--6.1-3--6.1-4.sql +++ b/src/backend/distributed/citus--6.1-3--6.1-4.sql @@ -26,4 +26,51 @@ COMMENT ON FUNCTION create_distributed_table(table_name regclass, colocate_with text) IS 'creates a distributed table'; +CREATE FUNCTION mark_colocation_group_default(colocation_id integer) + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $$ +DECLARE + shard_count integer; + replication_factor integer; + distribution_column_type oid; +BEGIN + -- get colocation group configuration + SELECT + shardcount, + replicationfactor, + distributioncolumntype + INTO + shard_count, + replication_factor, + distribution_column_type + FROM + pg_dist_colocation + WHERE + colocationid = colocation_id; + + -- set all defaults to false + UPDATE + pg_dist_colocation + SET + defaultgroup = false + WHERE + shardcount = shard_count AND + replicationfactor = replication_factor AND + distributioncolumntype = distribution_column_type; + + -- set new default colocation group + UPDATE + pg_dist_colocation + SET + defaultgroup = true + WHERE + colocationid = colocation_id; +END; +$$; + +COMMENT ON FUNCTION mark_colocation_group_default(colocation_id integer) + IS 'marks given colocation group as default'; + RESET search_path; diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 933e8615d..cc5d485af 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -871,8 +871,8 @@ DELETE FROM pg_dist_colocation UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup --------------+------------+-------------------+------------------------+-------------- diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 53d761b73..3f65de670 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -302,8 +302,8 @@ CREATE TABLE table2_groupF ( id int ); SELECT create_reference_table('table2_groupF'); -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; -- test mark_colocation_group_default() From 41bb6fb3beb75f6c37587e37c07bb7f84c42744e Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 13 Dec 2016 13:53:20 +0200 Subject: [PATCH 4/4] Delete unused colocation groups after mark_tables_colocated() --- .../distributed/utils/colocation_utils.c | 89 ++++++++++++++++++- .../expected/multi_colocation_utils.out | 53 ++++++++++- .../regress/sql/multi_colocation_utils.sql | 17 ++++ 3 files changed, 153 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 8154193ea..2949e1ee6 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -42,6 +42,8 @@ static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); static uint32 GetNextColocationId(void); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); +static List * ColocationGroupTableList(Oid colocationId); +static void DeleteColocationGroup(uint32 colocationId); /* exports for SQL callable functions */ @@ -91,6 +93,7 @@ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 sourceColocationId = INVALID_COLOCATION_ID; + uint32 targetColocationId = INVALID_COLOCATION_ID; Relation pgDistColocation = NULL; Var *sourceDistributionColumn = NULL; Var *targetDistributionColumn = NULL; @@ -154,9 +157,23 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); } + targetColocationId = TableColocationId(targetRelationId); + /* finally set colocation group for the target relation */ UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + /* if there is not any remaining table in the colocation group, delete it */ + if (targetColocationId != INVALID_COLOCATION_ID) + { + List *colocatedTableList = ColocationGroupTableList(targetColocationId); + int colocatedTableCount = list_length(colocatedTableList); + + if (colocatedTableCount == 0) + { + DeleteColocationGroup(targetColocationId); + } + } + heap_close(pgDistColocation, NoLock); } @@ -622,6 +639,30 @@ ColocatedTableList(Oid distributedTableId) uint32 tableColocationId = TableColocationId(distributedTableId); List *colocatedTableList = NIL; + /* + * If distribution type of the table is not hash, the table is only co-located + * with itself. + */ + if (tableColocationId == INVALID_COLOCATION_ID) + { + colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); + return colocatedTableList; + } + + colocatedTableList = ColocationGroupTableList(tableColocationId); + + return colocatedTableList; +} + + +/* + * ColocationGroupTableList returns the list of tables in the given colocation + * group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL. + */ +static List * +ColocationGroupTableList(Oid colocationId) +{ + List *colocatedTableList = NIL; Relation pgDistPartition = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; @@ -634,14 +675,13 @@ ColocatedTableList(Oid distributedTableId) * If distribution type of the table is not hash, the table is only co-located * with itself. */ - if (tableColocationId == INVALID_COLOCATION_ID) + if (colocationId == INVALID_COLOCATION_ID) { - colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); - return colocatedTableList; + return NIL; } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -780,3 +820,44 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId; } + + +/* + * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. + */ +static void +DeleteColocationGroup(uint32 colocationId) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = false; + HeapTuple heapTuple = NULL; + + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for colocation group %d", + colocationId))); + } + + simple_heap_delete(pgDistColocation, &(heapTuple->t_self)); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + CitusInvalidateRelcacheByRelid(DistColocationRelationId()); + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, RowExclusiveLock); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); +} diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index cc5d485af..8896cfd27 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -954,6 +954,14 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); (1 row) +SET citus.shard_count = 2; +CREATE TABLE table5_groupE ( id int ); +SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -965,7 +973,8 @@ SELECT * FROM pg_dist_colocation 4 | 4 | 2 | 23 | t 5 | 2 | 2 | 23 | t 6 | 1 | 2 | 23 | t -(5 rows) + 7 | 2 | 2 | 23 | f +(6 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 @@ -983,5 +992,45 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table3_groupe | 5 table1_groupf | 6 table2_groupf | 6 -(11 rows) + table5_groupe | 7 +(12 rows) + +-- move the only table in colocation group 7 to colocation group 5 +SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 1 | 2 | 23 | t +(5 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +---------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + table5_groupe | 5 + table1_groupf | 6 + table2_groupf | 6 +(12 rows) diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 3f65de670..7e5a7992f 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -398,6 +398,11 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); -- check to colocate with itself SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); +SET citus.shard_count = 2; + +CREATE TABLE table5_groupE ( id int ); +SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE'); + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -406,3 +411,15 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; + +-- move the only table in colocation group 7 to colocation group 5 +SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']); + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid;