diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 84801a1c5..0feecd06c 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 + 6.1-1 6.1-2 6.1-3 6.1-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -101,6 +101,8 @@ $(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql cat $^ > $@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql cat $^ > $@ +$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-3--6.1-4.sql b/src/backend/distributed/citus--6.1-3--6.1-4.sql new file mode 100644 index 000000000..89d7c741e --- /dev/null +++ b/src/backend/distributed/citus--6.1-3--6.1-4.sql @@ -0,0 +1,76 @@ +/* citus--6.1-3--6.1-4.sql */ + +SET search_path = 'pg_catalog'; + +ALTER TABLE pg_dist_colocation ADD COLUMN defaultgroup BOOLEAN; + +UPDATE pg_dist_colocation SET defaultgroup = TRUE; + +DROP INDEX pg_dist_colocation_configuration_index; + +CREATE INDEX pg_dist_colocation_configuration_index +ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype, defaultgroup); + +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text) + IS 'creates a distributed table'; + +CREATE FUNCTION mark_colocation_group_default(colocation_id integer) + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $$ +DECLARE + shard_count integer; + replication_factor integer; + distribution_column_type oid; +BEGIN + -- get colocation group configuration + SELECT + shardcount, + replicationfactor, + distributioncolumntype + INTO + shard_count, + replication_factor, + distribution_column_type + FROM + pg_dist_colocation + WHERE + colocationid = colocation_id; + + -- set all defaults to false + UPDATE + pg_dist_colocation + SET + defaultgroup = false + WHERE + shardcount = shard_count AND + replicationfactor = replication_factor AND + distributioncolumntype = distribution_column_type; + + -- set new default colocation group + UPDATE + pg_dist_colocation + SET + defaultgroup = true + WHERE + colocationid = colocation_id; +END; +$$; + +COMMENT ON FUNCTION mark_colocation_group_default(colocation_id integer) + IS 'marks given colocation group as default'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 27e416df9..04ef361d3 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-3' +default_version = '6.1-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index b59add3fe..e99e1c9f3 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,7 +9,6 @@ */ #include "postgres.h" -#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -33,7 +32,6 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" @@ -41,7 +39,6 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" -#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "executor/spi.h" #include "nodes/execnodes.h" @@ -63,6 +60,7 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); +static Oid ColumnType(Oid relationId, char *columnName); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, @@ -76,10 +74,8 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static uint32 ColocationId(int shardCount, int replicationFactor, - Oid distributionColumnType); -static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor); @@ -123,10 +119,29 @@ create_distributed_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); + text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + /* check if we try to colocate with hash distributed tables */ + char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + { + Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); + char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + + if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || + distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + } + /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { @@ -135,8 +150,9 @@ create_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - /* use configuration values for shard count and shard replication factor*/ - CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, + /* use configuration values for shard count and shard replication factor */ + CreateHashDistributedTable(relationId, distributionColumnName, + colocateWithTableName, ShardCount, ShardReplicationFactor); PG_RETURN_VOID(); @@ -154,6 +170,7 @@ create_reference_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); int shardCount = 1; AttrNumber firstColumnAttrNumber = 1; + char *colocateWithTableName = "default"; char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); if (firstColumnName == NULL) @@ -164,8 +181,8 @@ create_reference_table(PG_FUNCTION_ARGS) "least one column", relationName))); } - CreateHashDistributedTable(relationId, firstColumnName, shardCount, - ShardReplicationFactor); + CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName, + shardCount, ShardReplicationFactor); PG_RETURN_VOID(); } @@ -725,6 +742,19 @@ LookupDistributionMethod(Oid distributionMethodOid) } +/* + * ColumnType returns the column type of the given column. + */ +static Oid +ColumnType(Oid relationId, char *columnName) +{ + AttrNumber columnIndex = get_attnum(relationId, columnName); + Oid columnType = get_atttype(relationId, columnIndex); + + return columnType; +} + + /* * SupportFunctionForColumn locates a support function given a column, an access method, * and and id of a support function. This function returns InvalidOid if there is no @@ -848,148 +878,23 @@ CreateTruncateTrigger(Oid relationId) } -/* - * ColocationId searches pg_dist_colocation for shard count, replication factor - * and distribution column type. If a matching entry is found, it returns the - * colocation id, otherwise it returns INVALID_COLOCATION_ID. - */ -static uint32 -ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = INVALID_COLOCATION_ID; - HeapTuple colocationTuple = NULL; - SysScanDesc scanDescriptor; - const int scanKeyCount = 3; - ScanKeyData scanKey[scanKeyCount]; - bool indexOK = true; - - Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); - - /* set scan arguments */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); - ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); - - scanDescriptor = systable_beginscan(pgDistColocation, - DistColocationConfigurationIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - colocationTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(colocationTuple)) - { - Form_pg_dist_colocation colocationForm = - (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); - - colocationId = colocationForm->colocationid; - } - - systable_endscan(scanDescriptor); - heap_close(pgDistColocation, AccessShareLock); - - return colocationId; -} - - -/* - * CreateColocationGroup creates a new colocation id and writes it into - * pg_dist_colocation with the given configuration. It also returns the created - * colocation id. - */ -uint32 -CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = GetNextColocationId(); - Relation pgDistColocation = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple heapTuple = NULL; - Datum values[Natts_pg_dist_colocation]; - bool isNulls[Natts_pg_dist_colocation]; - - /* form new colocation tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); - values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); - values[Anum_pg_dist_colocation_replicationfactor - 1] = - UInt32GetDatum(replicationFactor); - values[Anum_pg_dist_colocation_distributioncolumntype - 1] = - ObjectIdGetDatum(distributionColumnType); - - /* open colocation relation and insert the new tuple */ - pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); - - tupleDescriptor = RelationGetDescr(pgDistColocation); - heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - - simple_heap_insert(pgDistColocation, heapTuple); - CatalogUpdateIndexes(pgDistColocation, heapTuple); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); - heap_close(pgDistColocation, RowExclusiveLock); - - return colocationId; -} - - -/* - * GetNextColocationId allocates and returns a unique colocationId for the - * colocation group to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having - * colocationId collisions. - * - * Please note that the caller is still responsible for finalizing colocationId - * with the master node. Further note that this function relies on an internal - * sequence created in initdb to generate unique identifiers. - */ -static uint32 -GetNextColocationId() -{ - text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); - Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - Datum colocationIdDatum = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - - /* generate new and unique colocation id from sequence */ - colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - - colocationId = DatumGetUInt32(colocationIdDatum); - - return colocationId; -} - - /* * CreateHashDistributedTable creates a hash distributed table with given * shard count and shard replication factor. */ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; - Var *distributionColumn = NULL; - Oid distributionColumnType = 0; uint32 colocationId = INVALID_COLOCATION_ID; + Oid colocationTableId = InvalidOid; + Oid distributionColumnType = InvalidOid; - /* get distribution column type */ + /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionColumnType = distributionColumn->vartype; /* * Get an exclusive lock on the colocation system catalog. Therefore, we @@ -998,35 +903,66 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - /* check for existing colocations */ - colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + /* get distribution column data type */ + distributionColumnType = ColumnType(relationId, distributionColumnName); - /* - * If there is a colocation group for the current configuration, get a - * colocated table from the group and use its shards as a reference to - * create new shards. Otherwise, create a new colocation group and create - * shards with the default round robin algorithm. - */ - if (colocationId != INVALID_COLOCATION_ID) + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { - char *relationName = get_rel_name(relationId); - - Oid colocatedTableId = ColocatedTableId(colocationId); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); - - CreateColocatedShards(relationId, colocatedTableId); - ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", - relationName, colocationId))); + /* check for default colocation group */ + colocationId = DefaultColocationGroupId(shardCount, replicationFactor, + distributionColumnType); + if (colocationId == INVALID_COLOCATION_ID) + { + bool defaultColocationGroup = true; + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType, + defaultColocationGroup); + } + else + { + colocationTableId = ColocatedTableId(colocationId); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + bool defaultColocationGroup = false; + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType, + defaultColocationGroup); } else { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); + Var *colocationTablePartitionColumn = NULL; + Oid colocationTablePartitionColumnType = InvalidOid; - /* use the default way to create shards */ + /* get colocation group of the target table */ + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + colocationTableId = ResolveRelationId(colocateWithTableNameText); + + colocationId = TableColocationId(colocationTableId); + + colocationTablePartitionColumn = PartitionKey(colocationTableId); + colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype; + + if (colocationTablePartitionColumnType != distributionColumnType) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot colocate with %s", colocateWithTableName), + errdetail("Distribution column types are different."))); + } + } + + /* create distributed table metadata */ + ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, + colocationId); + + /* create shards */ + if (colocationTableId != InvalidOid) + { + CreateColocatedShards(relationId, colocationTableId); + } + else + { CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 432fedd27..2949e1ee6 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -10,20 +10,24 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,7 +40,10 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); +static uint32 GetNextColocationId(void); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); +static List * ColocationGroupTableList(Oid colocationId); +static void DeleteColocationGroup(uint32 colocationId); /* exports for SQL callable functions */ @@ -86,11 +93,13 @@ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 sourceColocationId = INVALID_COLOCATION_ID; + uint32 targetColocationId = INVALID_COLOCATION_ID; Relation pgDistColocation = NULL; Var *sourceDistributionColumn = NULL; Var *targetDistributionColumn = NULL; Oid sourceDistributionColumnType = InvalidOid; Oid targetDistributionColumnType = InvalidOid; + bool defaultColocationGroup = false; CheckHashPartitionedTable(sourceRelationId); CheckHashPartitionedTable(targetRelationId); @@ -132,15 +141,39 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 shardCount = ShardIntervalCount(sourceRelationId); uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId); + uint32 defaultColocationId = INVALID_COLOCATION_ID; + + /* check if there is a default colocation group */ + defaultColocationId = DefaultColocationGroupId(shardCount, shardReplicationFactor, + sourceDistributionColumnType); + if (defaultColocationId == INVALID_COLOCATION_ID) + { + defaultColocationGroup = true; + } sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, - sourceDistributionColumnType); + sourceDistributionColumnType, + defaultColocationGroup); UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); } + targetColocationId = TableColocationId(targetRelationId); + /* finally set colocation group for the target relation */ UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + /* if there is not any remaining table in the colocation group, delete it */ + if (targetColocationId != INVALID_COLOCATION_ID) + { + List *colocatedTableList = ColocationGroupTableList(targetColocationId); + int colocatedTableCount = list_length(colocatedTableList); + + if (colocatedTableCount == 0) + { + DeleteColocationGroup(targetColocationId); + } + } + heap_close(pgDistColocation, NoLock); } @@ -339,6 +372,150 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) } +/* + * TableColocationId function returns co-location id of given table. This function + * errors out if given table is not distributed. + */ +uint32 +TableColocationId(Oid distributedTableId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + + return cacheEntry->colocationId; +} + + +/* + * DefaultColocationId searches pg_dist_colocation for the default colocation group + * with the given configuration: shard count, replication factor and distribution + * column type. If a matching entry is found, it returns the colocation id, + * otherwise it returns INVALID_COLOCATION_ID. + */ +uint32 +DefaultColocationGroupId(int shardCount, int replicationFactor, + Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + const int scanKeyCount = 4; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + bool defaultColocationGroup = true; + + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaultColocationGroup)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. + */ +uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType, + bool defaultColocationGroup) +{ + uint32 colocationId = GetNextColocationId(); + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + values[Anum_pg_dist_colocation_defaultgroup - 1] = + BoolGetDatum(defaultColocationGroup); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, RowExclusiveLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +static uint32 +GetNextColocationId(void) +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} + + /* * UpdateRelationColocationGroup updates colocation group in pg_dist_partition * for the given relation. @@ -396,19 +573,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId) } -/* - * TableColocationId function returns co-location id of given table. This function - * errors out if given table is not distributed. - */ -uint32 -TableColocationId(Oid distributedTableId) -{ - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); - - return cacheEntry->colocationId; -} - - /* * TablesColocated function checks whether given two tables are co-located and * returns true if they are co-located. A table is always co-located with itself. @@ -475,6 +639,30 @@ ColocatedTableList(Oid distributedTableId) uint32 tableColocationId = TableColocationId(distributedTableId); List *colocatedTableList = NIL; + /* + * If distribution type of the table is not hash, the table is only co-located + * with itself. + */ + if (tableColocationId == INVALID_COLOCATION_ID) + { + colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); + return colocatedTableList; + } + + colocatedTableList = ColocationGroupTableList(tableColocationId); + + return colocatedTableList; +} + + +/* + * ColocationGroupTableList returns the list of tables in the given colocation + * group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL. + */ +static List * +ColocationGroupTableList(Oid colocationId) +{ + List *colocatedTableList = NIL; Relation pgDistPartition = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; @@ -487,14 +675,13 @@ ColocatedTableList(Oid distributedTableId) * If distribution type of the table is not hash, the table is only co-located * with itself. */ - if (tableColocationId == INVALID_COLOCATION_ID) + if (colocationId == INVALID_COLOCATION_ID) { - colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); - return colocatedTableList; + return NIL; } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -633,3 +820,44 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId; } + + +/* + * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. + */ +static void +DeleteColocationGroup(uint32 colocationId) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = false; + HeapTuple heapTuple = NULL; + + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for colocation group %d", + colocationId))); + } + + simple_heap_delete(pgDistColocation, &(heapTuple->t_self)); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + CitusInvalidateRelcacheByRelid(DistColocationRelationId()); + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, RowExclusiveLock); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b48c62fa8..063a6f4fc 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId) } else { - ereport(ERROR, (errmsg("relation %u is not distributed", - distributedRelationId))); + char *relationName = get_rel_name(distributedRelationId); + ereport(ERROR, (errmsg("relation %s is not distributed", + relationName))); } } diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 09de91ee3..de81b2d3b 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -18,6 +18,11 @@ #define INVALID_COLOCATION_ID 0 extern uint32 TableColocationId(Oid distributedTableId); +extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor, + Oid distributionColumnType); +extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, + Oid distributionColumnType, + bool defaultColocationGroup); extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); @@ -25,8 +30,6 @@ extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); -extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, - Oid distributionColumnType); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/pg_dist_colocation.h b/src/include/distributed/pg_dist_colocation.h index 985c1332d..8ef60fe8a 100644 --- a/src/include/distributed/pg_dist_colocation.h +++ b/src/include/distributed/pg_dist_colocation.h @@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_colocation uint32 shardcount; uint32 replicationfactor; Oid distributioncolumntype; + bool defaultgroup; } FormData_pg_dist_colocation; /* ---------------- @@ -35,11 +36,12 @@ typedef FormData_pg_dist_colocation *Form_pg_dist_colocation; * compiler constants for pg_dist_colocation * ---------------- */ -#define Natts_pg_dist_colocation 4 +#define Natts_pg_dist_colocation 5 #define Anum_pg_dist_colocation_colocationid 1 #define Anum_pg_dist_colocation_shardcount 2 #define Anum_pg_dist_colocation_replicationfactor 3 #define Anum_pg_dist_colocation_distributioncolumntype 4 +#define Anum_pg_dist_colocation_defaultgroup 5 #define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq" diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 7bd235aeb..8896cfd27 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -432,12 +432,12 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 1 | 2 | 2 | 23 | t + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t (4 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition @@ -459,16 +459,16 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition -- check effects of dropping tables DROP TABLE table1_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 1; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 1 | 2 | 2 | 23 + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 1 | 2 | 2 | 23 | t (1 row) -- dropping all tables in a colocation group also deletes the colocation group DROP TABLE table2_groupA; SELECT * FROM pg_dist_colocation WHERE colocationid = 1; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) -- create dropped colocation group again @@ -504,6 +504,118 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); (1 row) +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + create_distributed_table +-------------------------- + +(1 row) + +SET citus.shard_count = 3; +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | f + 8 | 3 | 2 | 23 | f + 9 | 3 | 2 | 23 | t +(8 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 +(17 rows) + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +ERROR: relation table_postgresql is not distributed +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +ERROR: relation "no_table" does not exist +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +ERROR: invalid name syntax +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + create_distributed_table +-------------------------- + +(1 row) + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); +ERROR: cannot colocate with table1_groupE +DETAIL: Distribution column types are different. -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -535,17 +647,77 @@ SELECT create_reference_table('table2_groupF'); (1 row) -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 -(5 rows) + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | f + 8 | 3 | 2 | 23 | f + 9 | 3 | 2 | 23 | t + 10 | 1 | 2 | 23 | t +(9 rows) + +-- test mark_colocation_group_default() +SELECT mark_colocation_group_default(7); + mark_colocation_group_default +------------------------------- + +(1 row) + +SELECT mark_colocation_group_default(8); + mark_colocation_group_default +------------------------------- + +(1 row) + +-- check metadata after mark_colocation_group_default() is run +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | f + 6 | 2 | 2 | 23 | f + 7 | 2 | 2 | 23 | t + 8 | 3 | 2 | 23 | t + 9 | 3 | 2 | 23 | f + 10 | 1 | 2 | 23 | t +(9 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 + table1_groupf | 10 + table2_groupf | 10 +(19 rows) -- cross check with internal colocation API SELECT @@ -563,22 +735,27 @@ WHERE ORDER BY table1, table2; - table1 | table2 | colocated ----------------+----------------------------------+----------- - table1_group1 | table2_group1 | t - table1_groupb | table2_groupb | t - table1_groupc | table2_groupc | t - table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t - table1_groupe | table2_groupe | t - table1_groupe | table3_groupe | t - table1_groupe | schema_collocation.table4_groupe | t - table2_groupe | table3_groupe | t - table2_groupe | schema_collocation.table4_groupe | t - table3_groupe | schema_collocation.table4_groupe | t - table1_groupf | table2_groupf | t -(13 rows) + table1 | table2 | colocated +----------------------------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table1_groupe | table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table2_groupe | table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t + table3_groupe | table4_groupe | t + schema_collocation.table4_groupe | table4_groupe | t + table1_group_none_1 | table2_group_none_1 | t + table1_groupf | table2_groupf | t +(18 rows) -- check created shards SELECT @@ -653,11 +830,39 @@ ORDER BY schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 - table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 - table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 -(56 rows) + table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300060 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300060 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300061 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300061 | t | 57637 | 0 | 2147483647 + table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647 + table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647 + table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647 +(84 rows) -- reset colocation ids to test mark_tables_colocated ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -666,16 +871,16 @@ DELETE FROM pg_dist_colocation UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -697,16 +902,16 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_gro ERROR: cannot colocate tables table1_groupb and table1_groupd DETAIL: Shard counts don't match for table1_groupb and table1_groupd. -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -749,18 +954,27 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); (1 row) +SET citus.shard_count = 2; +CREATE TABLE table5_groupE ( id int ); +SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ - 2 | 2 | 1 | 23 - 3 | 2 | 2 | 25 - 4 | 4 | 2 | 23 - 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 -(5 rows) + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 1 | 2 | 23 | t + 7 | 2 | 2 | 23 | f +(6 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 @@ -778,5 +992,45 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table3_groupe | 5 table1_groupf | 6 table2_groupf | 6 -(11 rows) + table5_groupe | 7 +(12 rows) + +-- move the only table in colocation group 7 to colocation group 5 +SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- + 2 | 2 | 1 | 23 | t + 3 | 2 | 2 | 25 | t + 4 | 4 | 2 | 23 | t + 5 | 2 | 2 | 23 | t + 6 | 1 | 2 | 23 | t +(5 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +---------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + table5_groupe | 5 + table1_groupf | 6 + table2_groupf | 6 +(12 rows) diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 31ada84da..0e664db86 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -156,7 +156,7 @@ SELECT partition_type('events_hash'); (1 row) SELECT partition_type('pg_type'); -ERROR: relation 1247 is not distributed +ERROR: relation pg_type is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 237a6808d..ae79c879c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; +ALTER EXTENSION citus UPDATE TO '6.1-4'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index f3ef9c6aa..ea018492c 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -238,8 +238,8 @@ Indexes: -- Check that pg_dist_colocation is not synced SELECT * FROM pg_dist_colocation ORDER BY colocationid; - colocationid | shardcount | replicationfactor | distributioncolumntype ---------------+------------+-------------------+------------------------ + colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup +--------------+------------+-------------------+------------------------+-------------- (0 rows) -- Make sure that truncate trigger has been set for the MX table on worker diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 184d21173..7e5a7992f 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation; CREATE TABLE schema_collocation.table4_groupE ( id int ); SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + +SET citus.shard_count = 3; + +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); + +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); + -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -255,10 +302,23 @@ CREATE TABLE table2_groupF ( id int ); SELECT create_reference_table('table2_groupF'); -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; +-- test mark_colocation_group_default() +SELECT mark_colocation_group_default(7); +SELECT mark_colocation_group_default(8); + +-- check metadata after mark_colocation_group_default() is run +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + -- cross check with internal colocation API SELECT p1.logicalrelid::regclass AS table1, @@ -304,13 +364,13 @@ UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- first check failing cases SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); @@ -320,13 +380,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- check successfully cololated tables SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); @@ -338,6 +398,11 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); -- check to colocate with itself SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); +SET citus.shard_count = 2; + +CREATE TABLE table5_groupE ( id int ); +SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE'); + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -346,3 +411,15 @@ SELECT * FROM pg_dist_colocation SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY logicalrelid; + +-- move the only table in colocation group 7 to colocation group 5 +SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']); + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index c67ec9a6d..209955668 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18'; ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; +ALTER EXTENSION citus UPDATE TO '6.1-4'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)