From 86cca54857b24a861191aba69258b527d8eba1ad Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Thu, 15 Dec 2016 15:46:59 +0200 Subject: [PATCH] Add colocate_with option to create_distributed_table() With this commit, we support three versions of colocate_with: i.default, ii.none and iii. a specific table name. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-4--6.1-5.sql | 20 ++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 145 ++++++--- .../distributed/utils/colocation_utils.c | 88 ++++- .../distributed/utils/metadata_cache.c | 5 +- src/include/distributed/colocation_utils.h | 1 + .../expected/multi_colocation_utils.out | 300 +++++++++++++++--- .../expected/multi_distribution_metadata.out | 2 +- src/test/regress/expected/multi_extension.out | 1 + .../regress/sql/multi_colocation_utils.sql | 90 +++++- src/test/regress/sql/multi_extension.sql | 1 + 12 files changed, 554 insertions(+), 105 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-4--6.1-5.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 0feecd06c..5230b1394 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -103,6 +103,8 @@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql cat $^ > $@ $(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql cat $^ > $@ +$(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-4--6.1-5.sql b/src/backend/distributed/citus--6.1-4--6.1-5.sql new file mode 100644 index 000000000..95451b0c1 --- /dev/null +++ b/src/backend/distributed/citus--6.1-4--6.1-5.sql @@ -0,0 +1,20 @@ +/* citus--6.1-4--6.1-5.sql */ + +SET search_path = 'pg_catalog'; + +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text) + IS 'creates a distributed table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 04ef361d3..399f6d7a0 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-4' +default_version = '6.1-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9f1de23ac..a2fedd372 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -75,7 +75,9 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor); +static Oid ColumnType(Oid relationId, char *columnName); /* exports for SQL callable functions */ @@ -109,8 +111,9 @@ master_create_distributed_table(PG_FUNCTION_ARGS) /* - * create_distributed_table accepts a table, distribution column and - * distribution method, then it creates a distributed table. + * create_distributed_table gets a table name, distribution column, + * distribution method and colocate_with option, then it creates a + * distributed table. */ Datum create_distributed_table(PG_FUNCTION_ARGS) @@ -121,6 +124,36 @@ create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + text *colocateWithTableNameText = NULL; + char *colocateWithTableName = NULL; + + /* guard against a binary update without a function update */ + if (PG_NARGS() >= 4) + { + colocateWithTableNameText = PG_GETARG_TEXT_P(3); + colocateWithTableName = text_to_cstring(colocateWithTableNameText); + } + else + { + colocateWithTableName = "default"; + } + + /* check if we try to colocate with hash distributed tables */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + { + Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); + char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + + if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || + distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + } /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) @@ -130,8 +163,9 @@ create_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - /* use configuration values for shard count and shard replication factor*/ - CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, + /* use configuration values for shard count and shard replication factor */ + CreateHashDistributedTable(relationId, distributionColumnName, + colocateWithTableName, ShardCount, ShardReplicationFactor); PG_RETURN_VOID(); @@ -149,6 +183,7 @@ create_reference_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); int shardCount = 1; AttrNumber firstColumnAttrNumber = 1; + char *colocateWithTableName = "default"; char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); if (firstColumnName == NULL) @@ -159,8 +194,8 @@ create_reference_table(PG_FUNCTION_ARGS) "least one column", relationName))); } - CreateHashDistributedTable(relationId, firstColumnName, shardCount, - ShardReplicationFactor); + CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName, + shardCount, ShardReplicationFactor); PG_RETURN_VOID(); } @@ -844,24 +879,21 @@ CreateTruncateTrigger(Oid relationId) /* - * CreateHashDistributedTable creates a hash distributed table with given - * shard count and shard replication factor. + * CreateHashDistributedTable creates a hash distributed table. */ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; - Var *distributionColumn = NULL; - Oid distributionColumnType = 0; uint32 colocationId = INVALID_COLOCATION_ID; + Oid colocationTableId = InvalidOid; + Oid distributionColumnType = InvalidOid; - /* get distribution column type */ + /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionColumnType = distributionColumn->vartype; /* * Get an exclusive lock on the colocation system catalog. Therefore, we @@ -870,38 +902,77 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - /* check for existing colocations */ - colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + /* get distribution column data type */ + distributionColumnType = ColumnType(relationId, distributionColumnName); - /* - * If there is a colocation group for the current configuration, get a - * colocated table from the group and use its shards as a reference to - * create new shards. Otherwise, create a new colocation group and create - * shards with the default round robin algorithm. - */ - if (colocationId != INVALID_COLOCATION_ID) + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { - char *relationName = get_rel_name(relationId); - - Oid colocatedTableId = ColocatedTableId(colocationId); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); - - CreateColocatedShards(relationId, colocatedTableId); - ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", - relationName, colocationId))); + /* check for default colocation group */ + colocationId = ColocationId(shardCount, replicationFactor, + distributionColumnType); + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType); + } + else + { + colocationTableId = ColocatedTableId(colocationId); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + colocationId = GetNextColocationId(); } else { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); + Var *colocationTablePartitionColumn = NULL; + Oid colocationTablePartitionColumnType = InvalidOid; - /* use the default way to create shards */ + /* get colocation group of the target table */ + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + colocationTableId = ResolveRelationId(colocateWithTableNameText); + + colocationId = TableColocationId(colocationTableId); + + colocationTablePartitionColumn = PartitionKey(colocationTableId); + colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype; + + if (colocationTablePartitionColumnType != distributionColumnType) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot colocate with %s", colocateWithTableName), + errdetail("Distribution column types are different."))); + } + } + + /* create distributed table metadata */ + ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, + colocationId); + + /* create shards */ + if (colocationTableId != InvalidOid) + { + CreateColocatedShards(relationId, colocationTableId); + } + else + { CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } + + +/* + * ColumnType returns the column type of the given column. + */ +static Oid +ColumnType(Oid relationId, char *columnName) +{ + AttrNumber columnIndex = get_attnum(relationId, columnName); + Oid columnType = get_atttype(relationId, columnIndex); + + return columnType; +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 6c3485894..328367bfa 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -40,8 +40,9 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); -static uint32 GetNextColocationId(void); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); +static List * ColocationGroupTableList(Oid colocationId); +static void DeleteColocationGroup(uint32 colocationId); /* exports for SQL callable functions */ @@ -91,6 +92,7 @@ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 sourceColocationId = INVALID_COLOCATION_ID; + uint32 targetColocationId = INVALID_COLOCATION_ID; Relation pgDistColocation = NULL; Var *sourceDistributionColumn = NULL; Var *targetDistributionColumn = NULL; @@ -143,9 +145,23 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); } + targetColocationId = TableColocationId(targetRelationId); + /* finally set colocation group for the target relation */ UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + /* if there is not any remaining table in the colocation group, delete it */ + if (targetColocationId != INVALID_COLOCATION_ID) + { + List *colocatedTableList = ColocationGroupTableList(targetColocationId); + int colocatedTableCount = list_length(colocatedTableList); + + if (colocatedTableCount == 0) + { + DeleteColocationGroup(targetColocationId); + } + } + heap_close(pgDistColocation, NoLock); } @@ -442,7 +458,7 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol * with the master node. Further note that this function relies on an internal * sequence created in initdb to generate unique identifiers. */ -static uint32 +uint32 GetNextColocationId() { text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); @@ -603,6 +619,30 @@ ColocatedTableList(Oid distributedTableId) uint32 tableColocationId = TableColocationId(distributedTableId); List *colocatedTableList = NIL; + /* + * If distribution type of the table is not hash, the table is only co-located + * with itself. + */ + if (tableColocationId == INVALID_COLOCATION_ID) + { + colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); + return colocatedTableList; + } + + colocatedTableList = ColocationGroupTableList(tableColocationId); + + return colocatedTableList; +} + + +/* + * ColocationGroupTableList returns the list of tables in the given colocation + * group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL. + */ +static List * +ColocationGroupTableList(Oid colocationId) +{ + List *colocatedTableList = NIL; Relation pgDistPartition = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; @@ -615,14 +655,13 @@ ColocatedTableList(Oid distributedTableId) * If distribution type of the table is not hash, the table is only co-located * with itself. */ - if (tableColocationId == INVALID_COLOCATION_ID) + if (colocationId == INVALID_COLOCATION_ID) { - colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); - return colocatedTableList; + return NIL; } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -761,3 +800,40 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId; } + + +/* + * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. + */ +static void +DeleteColocationGroup(uint32 colocationId) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = false; + HeapTuple heapTuple = NULL; + + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + /* if a record id found, delete it */ + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + simple_heap_delete(pgDistColocation, &(heapTuple->t_self)); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + CitusInvalidateRelcacheByRelid(DistColocationRelationId()); + CommandCounterIncrement(); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, RowExclusiveLock); +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b48c62fa8..063a6f4fc 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId) } else { - ereport(ERROR, (errmsg("relation %u is not distributed", - distributedRelationId))); + char *relationName = get_rel_name(distributedRelationId); + ereport(ERROR, (errmsg("relation %s is not distributed", + relationName))); } } diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index dc3754ab3..63fa301c1 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -28,6 +28,7 @@ extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType); +extern uint32 GetNextColocationId(void); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 7bd235aeb..a007ba040 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -504,6 +504,115 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); (1 row) +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + create_distributed_table +-------------------------- + +(1 row) + +SET citus.shard_count = 3; +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 5 | 2 | 2 | 23 + 9 | 3 | 2 | 23 +(5 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 +(17 rows) + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +ERROR: relation table_postgresql is not distributed +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +ERROR: relation "no_table" does not exist +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +ERROR: invalid name syntax +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + create_distributed_table +-------------------------- + +(1 row) + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); +ERROR: cannot colocate with table1_groupE +DETAIL: Distribution column types are different. -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -535,8 +644,8 @@ SELECT create_reference_table('table2_groupF'); (1 row) -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ @@ -544,8 +653,9 @@ SELECT * FROM pg_dist_colocation 3 | 2 | 2 | 25 4 | 4 | 2 | 23 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 -(5 rows) + 9 | 3 | 2 | 23 + 10 | 1 | 2 | 23 +(6 rows) -- cross check with internal colocation API SELECT @@ -563,22 +673,27 @@ WHERE ORDER BY table1, table2; - table1 | table2 | colocated ----------------+----------------------------------+----------- - table1_group1 | table2_group1 | t - table1_groupb | table2_groupb | t - table1_groupc | table2_groupc | t - table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t - table1_groupe | table2_groupe | t - table1_groupe | table3_groupe | t - table1_groupe | schema_collocation.table4_groupe | t - table2_groupe | table3_groupe | t - table2_groupe | schema_collocation.table4_groupe | t - table3_groupe | schema_collocation.table4_groupe | t - table1_groupf | table2_groupf | t -(13 rows) + table1 | table2 | colocated +----------------------------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table1_groupe | table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table2_groupe | table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t + table3_groupe | table4_groupe | t + schema_collocation.table4_groupe | table4_groupe | t + table1_group_none_1 | table2_group_none_1 | t + table1_groupf | table2_groupf | t +(18 rows) -- check created shards SELECT @@ -653,11 +768,39 @@ ORDER BY schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 - table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 - table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 -(56 rows) + table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300060 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300060 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300061 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300061 | t | 57637 | 0 | 2147483647 + table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647 + table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647 + table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647 +(84 rows) -- reset colocation ids to test mark_tables_colocated ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -666,16 +809,16 @@ DELETE FROM pg_dist_colocation UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -697,16 +840,16 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_gro ERROR: cannot colocate tables table1_groupb and table1_groupd DETAIL: Shard counts don't match for table1_groupb and table1_groupd. -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -749,6 +892,21 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); (1 row) +SET citus.shard_count = 2; +CREATE TABLE table1_group_none ( id int ); +SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none ( id int ); +SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -765,18 +923,66 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; - logicalrelid | colocationid ----------------+-------------- - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table1_groupe | 5 - table2_groupe | 5 - table3_groupe | 5 - table1_groupf | 6 - table2_groupf | 6 -(11 rows) + logicalrelid | colocationid +-------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + table1_groupf | 6 + table2_groupf | 6 + table1_group_none | 7 + table2_group_none | 8 +(13 rows) + +-- move the all tables in colocation group 5 to colocation group 7 +SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']); + mark_tables_colocated +----------------------- + +(1 row) + +-- move a table with a colocation id which is already not in pg_dist_colocation +SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 6 | 1 | 2 | 23 +(4 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +-------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupf | 6 + table2_groupf | 6 + table1_groupe | 7 + table2_groupe | 7 + table3_groupe | 7 + table1_group_none | 7 + table2_group_none | 7 +(13 rows) diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 5831d849e..22740e09f 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -156,7 +156,7 @@ SELECT partition_type('events_hash'); (1 row) SELECT partition_type('pg_type'); -ERROR: relation 1247 is not distributed +ERROR: relation pg_type is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ae79c879c..8ffcce4c3 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; +ALTER EXTENSION citus UPDATE TO '6.1-5'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 184d21173..ad2182406 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation; CREATE TABLE schema_collocation.table4_groupE ( id int ); SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + +SET citus.shard_count = 3; + +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); + +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); + -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -255,8 +302,8 @@ CREATE TABLE table2_groupF ( id int ); SELECT create_reference_table('table2_groupF'); -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; -- cross check with internal colocation API @@ -304,13 +351,13 @@ UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- first check failing cases SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); @@ -320,13 +367,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- check successfully cololated tables SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); @@ -338,6 +385,14 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); -- check to colocate with itself SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); +SET citus.shard_count = 2; + +CREATE TABLE table1_group_none ( id int ); +SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE'); + +CREATE TABLE table2_group_none ( id int ); +SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE'); + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -346,3 +401,18 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; + +-- move the all tables in colocation group 5 to colocation group 7 +SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']); + +-- move a table with a colocation id which is already not in pg_dist_colocation +SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']); + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 209955668..314d17ade 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; +ALTER EXTENSION citus UPDATE TO '6.1-5'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)