diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 0feecd06c..5230b1394 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -103,6 +103,8 @@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql cat $^ > $@ $(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql cat $^ > $@ +$(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-4--6.1-5.sql b/src/backend/distributed/citus--6.1-4--6.1-5.sql new file mode 100644 index 000000000..95451b0c1 --- /dev/null +++ b/src/backend/distributed/citus--6.1-4--6.1-5.sql @@ -0,0 +1,20 @@ +/* citus--6.1-4--6.1-5.sql */ + +SET search_path = 'pg_catalog'; + +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text) + IS 'creates a distributed table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 04ef361d3..399f6d7a0 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-4' +default_version = '6.1-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index b59add3fe..b931d1a36 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,7 +9,6 @@ */ #include "postgres.h" -#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -33,7 +32,6 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" @@ -76,11 +74,10 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static uint32 ColocationId(int shardCount, int replicationFactor, - Oid distributionColumnType); -static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor); +static Oid ColumnType(Oid relationId, char *columnName); /* exports for SQL callable functions */ @@ -114,8 +111,9 @@ master_create_distributed_table(PG_FUNCTION_ARGS) /* - * create_distributed_table accepts a table, distribution column and - * distribution method, then it creates a distributed table. + * create_distributed_table gets a table name, distribution column, + * distribution method and colocate_with option, then it creates a + * distributed table. */ Datum create_distributed_table(PG_FUNCTION_ARGS) @@ -126,6 +124,36 @@ create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + text *colocateWithTableNameText = NULL; + char *colocateWithTableName = NULL; + + /* guard against a binary update without a function update */ + if (PG_NARGS() >= 4) + { + colocateWithTableNameText = PG_GETARG_TEXT_P(3); + colocateWithTableName = text_to_cstring(colocateWithTableNameText); + } + else + { + colocateWithTableName = "default"; + } + + /* check if we try to colocate with hash distributed tables */ + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + { + Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText); + char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid); + + if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH || + distributionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation"), + errdetail("Currently, colocate_with option is only supported " + "for hash distributed tables."))); + } + } /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) @@ -135,8 +163,9 @@ create_distributed_table(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } - /* use configuration values for shard count and shard replication factor*/ - CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, + /* use configuration values for shard count and shard replication factor */ + CreateHashDistributedTable(relationId, distributionColumnName, + colocateWithTableName, ShardCount, ShardReplicationFactor); PG_RETURN_VOID(); @@ -154,6 +183,7 @@ create_reference_table(PG_FUNCTION_ARGS) Oid relationId = PG_GETARG_OID(0); int shardCount = 1; AttrNumber firstColumnAttrNumber = 1; + char *colocateWithTableName = "default"; char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); if (firstColumnName == NULL) @@ -164,8 +194,8 @@ create_reference_table(PG_FUNCTION_ARGS) "least one column", relationName))); } - CreateHashDistributedTable(relationId, firstColumnName, shardCount, - ShardReplicationFactor); + CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName, + shardCount, ShardReplicationFactor); PG_RETURN_VOID(); } @@ -849,147 +879,21 @@ CreateTruncateTrigger(Oid relationId) /* - * ColocationId searches pg_dist_colocation for shard count, replication factor - * and distribution column type. If a matching entry is found, it returns the - * colocation id, otherwise it returns INVALID_COLOCATION_ID. - */ -static uint32 -ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = INVALID_COLOCATION_ID; - HeapTuple colocationTuple = NULL; - SysScanDesc scanDescriptor; - const int scanKeyCount = 3; - ScanKeyData scanKey[scanKeyCount]; - bool indexOK = true; - - Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); - - /* set scan arguments */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); - ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); - - scanDescriptor = systable_beginscan(pgDistColocation, - DistColocationConfigurationIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - colocationTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(colocationTuple)) - { - Form_pg_dist_colocation colocationForm = - (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); - - colocationId = colocationForm->colocationid; - } - - systable_endscan(scanDescriptor); - heap_close(pgDistColocation, AccessShareLock); - - return colocationId; -} - - -/* - * CreateColocationGroup creates a new colocation id and writes it into - * pg_dist_colocation with the given configuration. It also returns the created - * colocation id. - */ -uint32 -CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = GetNextColocationId(); - Relation pgDistColocation = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple heapTuple = NULL; - Datum values[Natts_pg_dist_colocation]; - bool isNulls[Natts_pg_dist_colocation]; - - /* form new colocation tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); - values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); - values[Anum_pg_dist_colocation_replicationfactor - 1] = - UInt32GetDatum(replicationFactor); - values[Anum_pg_dist_colocation_distributioncolumntype - 1] = - ObjectIdGetDatum(distributionColumnType); - - /* open colocation relation and insert the new tuple */ - pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); - - tupleDescriptor = RelationGetDescr(pgDistColocation); - heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - - simple_heap_insert(pgDistColocation, heapTuple); - CatalogUpdateIndexes(pgDistColocation, heapTuple); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); - heap_close(pgDistColocation, RowExclusiveLock); - - return colocationId; -} - - -/* - * GetNextColocationId allocates and returns a unique colocationId for the - * colocation group to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having - * colocationId collisions. - * - * Please note that the caller is still responsible for finalizing colocationId - * with the master node. Further note that this function relies on an internal - * sequence created in initdb to generate unique identifiers. - */ -static uint32 -GetNextColocationId() -{ - text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); - Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - Datum colocationIdDatum = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - - /* generate new and unique colocation id from sequence */ - colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - - colocationId = DatumGetUInt32(colocationIdDatum); - - return colocationId; -} - - -/* - * CreateHashDistributedTable creates a hash distributed table with given - * shard count and shard replication factor. + * CreateHashDistributedTable creates a hash distributed table. */ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + char *colocateWithTableName, int shardCount, int replicationFactor) { Relation distributedRelation = NULL; Relation pgDistColocation = NULL; - Var *distributionColumn = NULL; - Oid distributionColumnType = 0; uint32 colocationId = INVALID_COLOCATION_ID; + Oid sourceRelationId = InvalidOid; + Oid distributionColumnType = InvalidOid; - /* get distribution column type */ + /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); - distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionColumnType = distributionColumn->vartype; /* * Get an exclusive lock on the colocation system catalog. Therefore, we @@ -998,38 +902,68 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - /* check for existing colocations */ - colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + /* get distribution column data type */ + distributionColumnType = ColumnType(relationId, distributionColumnName); - /* - * If there is a colocation group for the current configuration, get a - * colocated table from the group and use its shards as a reference to - * create new shards. Otherwise, create a new colocation group and create - * shards with the default round robin algorithm. - */ - if (colocationId != INVALID_COLOCATION_ID) + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0) { - char *relationName = get_rel_name(relationId); - - Oid colocatedTableId = ColocatedTableId(colocationId); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); - - CreateColocatedShards(relationId, colocatedTableId); - ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", - relationName, colocationId))); + /* check for default colocation group */ + colocationId = ColocationId(shardCount, replicationFactor, + distributionColumnType); + if (colocationId == INVALID_COLOCATION_ID) + { + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType); + } + else + { + sourceRelationId = ColocatedTableId(colocationId); + } + } + else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0) + { + colocationId = GetNextColocationId(); } else { - colocationId = CreateColocationGroup(shardCount, replicationFactor, - distributionColumnType); - ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_HASH, colocationId); + /* get colocation group of the target table */ + text *colocateWithTableNameText = cstring_to_text(colocateWithTableName); + sourceRelationId = ResolveRelationId(colocateWithTableNameText); - /* use the default way to create shards */ + colocationId = TableColocationId(sourceRelationId); + } + + /* create distributed table metadata */ + ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, + colocationId); + + /* create shards */ + if (sourceRelationId != InvalidOid) + { + /* first run checks */ + CheckReplicationModel(sourceRelationId, relationId); + CheckDistributionColumnType(sourceRelationId, relationId); + + CreateColocatedShards(relationId, sourceRelationId); + } + else + { CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } + + +/* + * ColumnType returns the column type of the given column. + */ +static Oid +ColumnType(Oid relationId, char *columnName) +{ + AttrNumber columnIndex = get_attnum(relationId, columnName); + Oid columnType = get_atttype(relationId, columnIndex); + + return columnType; +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 432fedd27..c9e246219 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -10,20 +10,24 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -37,6 +41,8 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); +static List * ColocationGroupTableList(Oid colocationId); +static void DeleteColocationGroup(uint32 colocationId); /* exports for SQL callable functions */ @@ -69,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS) for (relationIndex = 0; relationIndex < relationCount; relationIndex++) { Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]); + MarkTablesColocated(sourceRelationId, nextRelationOid); } @@ -86,32 +93,14 @@ static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) { uint32 sourceColocationId = INVALID_COLOCATION_ID; + uint32 targetColocationId = INVALID_COLOCATION_ID; Relation pgDistColocation = NULL; - Var *sourceDistributionColumn = NULL; - Var *targetDistributionColumn = NULL; - Oid sourceDistributionColumnType = InvalidOid; - Oid targetDistributionColumnType = InvalidOid; CheckHashPartitionedTable(sourceRelationId); CheckHashPartitionedTable(targetRelationId); - sourceDistributionColumn = PartitionKey(sourceRelationId); - sourceDistributionColumnType = sourceDistributionColumn->vartype; - - targetDistributionColumn = PartitionKey(targetRelationId); - targetDistributionColumnType = targetDistributionColumn->vartype; - - if (sourceDistributionColumnType != targetDistributionColumnType) - { - char *sourceRelationName = get_rel_name(sourceRelationId); - char *targetRelationName = get_rel_name(targetRelationId); - - ereport(ERROR, (errmsg("cannot colocate tables %s and %s", - sourceRelationName, targetRelationName), - errdetail("Distribution column types don't match for " - "%s and %s.", sourceRelationName, - targetRelationName))); - } + CheckReplicationModel(sourceRelationId, targetRelationId); + CheckDistributionColumnType(sourceRelationId, targetRelationId); /* * Get an exclusive lock on the colocation system catalog. Therefore, we @@ -133,14 +122,31 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId) uint32 shardCount = ShardIntervalCount(sourceRelationId); uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId); + Var *sourceDistributionColumn = PartitionKey(sourceRelationId); + Oid sourceDistributionColumnType = sourceDistributionColumn->vartype; + sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, sourceDistributionColumnType); UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); } + targetColocationId = TableColocationId(targetRelationId); + /* finally set colocation group for the target relation */ UpdateRelationColocationGroup(targetRelationId, sourceColocationId); + /* if there is not any remaining table in the colocation group, delete it */ + if (targetColocationId != INVALID_COLOCATION_ID) + { + List *colocatedTableList = ColocationGroupTableList(targetColocationId); + int colocatedTableCount = list_length(colocatedTableList); + + if (colocatedTableCount == 0) + { + DeleteColocationGroup(targetColocationId); + } + } + heap_close(pgDistColocation, NoLock); } @@ -339,6 +345,192 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) } +/* + * ColocationId searches pg_dist_colocation for shard count, replication factor + * and distribution column type. If a matching entry is found, it returns the + * colocation id, otherwise it returns INVALID_COLOCATION_ID. + */ +uint32 +ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + const int scanKeyCount = 3; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. + */ +uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = GetNextColocationId(); + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, RowExclusiveLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +uint32 +GetNextColocationId() +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} + + +/* + * CheckReplicationModel checks if given relations are from the same + * replication model. Otherwise, it errors out. + */ +void +CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId) +{ + DistTableCacheEntry *sourceTableEntry = NULL; + DistTableCacheEntry *targetTableEntry = NULL; + char sourceReplicationModel = 0; + char targetReplicationModel = 0; + + sourceTableEntry = DistributedTableCacheEntry(sourceRelationId); + sourceReplicationModel = sourceTableEntry->replicationModel; + + targetTableEntry = DistributedTableCacheEntry(targetRelationId); + targetReplicationModel = targetTableEntry->replicationModel; + + if (sourceReplicationModel != targetReplicationModel) + { + char *sourceRelationName = get_rel_name(sourceRelationId); + char *targetRelationName = get_rel_name(targetRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, targetRelationName), + errdetail("Replication models don't match for %s and %s.", + sourceRelationName, targetRelationName))); + } +} + + +/* + * CheckDistributionColumnType checks if distribution column types of relations + * are same. Otherwise, it errors out. + */ +void +CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId) +{ + Var *sourceDistributionColumn = NULL; + Var *targetDistributionColumn = NULL; + Oid sourceDistributionColumnType = InvalidOid; + Oid targetDistributionColumnType = InvalidOid; + + sourceDistributionColumn = PartitionKey(sourceRelationId); + sourceDistributionColumnType = sourceDistributionColumn->vartype; + + targetDistributionColumn = PartitionKey(targetRelationId); + targetDistributionColumnType = targetDistributionColumn->vartype; + + if (sourceDistributionColumnType != targetDistributionColumnType) + { + char *sourceRelationName = get_rel_name(sourceRelationId); + char *targetRelationName = get_rel_name(targetRelationId); + + ereport(ERROR, (errmsg("cannot colocate tables %s and %s", + sourceRelationName, targetRelationName), + errdetail("Distribution column types don't match for " + "%s and %s.", sourceRelationName, + targetRelationName))); + } +} + + /* * UpdateRelationColocationGroup updates colocation group in pg_dist_partition * for the given relation. @@ -475,6 +667,30 @@ ColocatedTableList(Oid distributedTableId) uint32 tableColocationId = TableColocationId(distributedTableId); List *colocatedTableList = NIL; + /* + * If distribution type of the table is not hash, the table is only co-located + * with itself. + */ + if (tableColocationId == INVALID_COLOCATION_ID) + { + colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); + return colocatedTableList; + } + + colocatedTableList = ColocationGroupTableList(tableColocationId); + + return colocatedTableList; +} + + +/* + * ColocationGroupTableList returns the list of tables in the given colocation + * group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL. + */ +static List * +ColocationGroupTableList(Oid colocationId) +{ + List *colocatedTableList = NIL; Relation pgDistPartition = NULL; TupleDesc tupleDescriptor = NULL; SysScanDesc scanDescriptor = NULL; @@ -487,14 +703,13 @@ ColocatedTableList(Oid distributedTableId) * If distribution type of the table is not hash, the table is only co-located * with itself. */ - if (tableColocationId == INVALID_COLOCATION_ID) + if (colocationId == INVALID_COLOCATION_ID) { - colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); - return colocatedTableList; + return NIL; } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -633,3 +848,40 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex) return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId; } + + +/* + * DeleteColocationGroup deletes the colocation group from pg_dist_colocation. + */ +static void +DeleteColocationGroup(uint32 colocationId) +{ + Relation pgDistColocation = NULL; + SysScanDesc scanDescriptor = NULL; + int scanKeyCount = 1; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = false; + HeapTuple heapTuple = NULL; + + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId)); + + scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + /* if a record is found, delete it */ + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + simple_heap_delete(pgDistColocation, &(heapTuple->t_self)); + + CatalogUpdateIndexes(pgDistColocation, heapTuple); + CitusInvalidateRelcacheByRelid(DistColocationRelationId()); + CommandCounterIncrement(); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, RowExclusiveLock); +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index b48c62fa8..063a6f4fc 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId) } else { - ereport(ERROR, (errmsg("relation %u is not distributed", - distributedRelationId))); + char *relationName = get_rel_name(distributedRelationId); + ereport(ERROR, (errmsg("relation %s is not distributed", + relationName))); } } diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 09de91ee3..461ddbb72 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -25,8 +25,12 @@ extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); +uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType); +extern uint32 GetNextColocationId(void); +extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId); +extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId); #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 7bd235aeb..945d03fff 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -504,6 +504,115 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); (1 row) +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + create_distributed_table +-------------------------- + +(1 row) + +SET citus.shard_count = 3; +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 5 | 2 | 2 | 23 + 9 | 3 | 2 | 23 +(5 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +----------------------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + schema_collocation.table4_groupe | 5 + table4_groupe | 5 + table1_group_none_1 | 6 + table2_group_none_1 | 6 + table1_group_none_2 | 7 + table1_group_none_3 | 8 + table1_group_default | 9 +(17 rows) + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +ERROR: cannot distribute relation +DETAIL: Currently, colocate_with option is only supported for hash distributed tables. +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +ERROR: relation table_postgresql is not distributed +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +ERROR: relation "no_table" does not exist +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +ERROR: invalid name syntax +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + create_distributed_table +-------------------------- + +(1 row) + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); +ERROR: cannot colocate tables table1_groupe and table_bigint +DETAIL: Distribution column types don't match for table1_groupe and table_bigint. -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -535,8 +644,8 @@ SELECT create_reference_table('table2_groupF'); (1 row) -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ @@ -544,8 +653,9 @@ SELECT * FROM pg_dist_colocation 3 | 2 | 2 | 25 4 | 4 | 2 | 23 5 | 2 | 2 | 23 - 6 | 1 | 2 | 23 -(5 rows) + 9 | 3 | 2 | 23 + 10 | 1 | 2 | 23 +(6 rows) -- cross check with internal colocation API SELECT @@ -563,22 +673,27 @@ WHERE ORDER BY table1, table2; - table1 | table2 | colocated ----------------+----------------------------------+----------- - table1_group1 | table2_group1 | t - table1_groupb | table2_groupb | t - table1_groupc | table2_groupc | t - table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t - table1_groupe | table2_groupe | t - table1_groupe | table3_groupe | t - table1_groupe | schema_collocation.table4_groupe | t - table2_groupe | table3_groupe | t - table2_groupe | schema_collocation.table4_groupe | t - table3_groupe | schema_collocation.table4_groupe | t - table1_groupf | table2_groupf | t -(13 rows) + table1 | table2 | colocated +----------------------------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table1_groupe | table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table2_groupe | table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t + table3_groupe | table4_groupe | t + schema_collocation.table4_groupe | table4_groupe | t + table1_group_none_1 | table2_group_none_1 | t + table1_groupf | table2_groupf | t +(18 rows) -- check created shards SELECT @@ -653,11 +768,39 @@ ORDER BY schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 - table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 - table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 - table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 -(56 rows) + table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300060 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300060 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300061 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300061 | t | 57637 | 0 | 2147483647 + table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647 + table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647 + table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647 +(84 rows) -- reset colocation ids to test mark_tables_colocated ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -666,16 +809,16 @@ DELETE FROM pg_dist_colocation UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -697,16 +840,16 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_gro ERROR: cannot colocate tables table1_groupb and table1_groupd DETAIL: Shard counts don't match for table1_groupb and table1_groupd. -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; colocationid | shardcount | replicationfactor | distributioncolumntype --------------+------------+-------------------+------------------------ (0 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; logicalrelid | colocationid --------------+-------------- (0 rows) @@ -749,6 +892,21 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); (1 row) +SET citus.shard_count = 2; +CREATE TABLE table1_group_none ( id int ); +SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_group_none ( id int ); +SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -763,20 +921,114 @@ SELECT * FROM pg_dist_colocation (5 rows) SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; - logicalrelid | colocationid ----------------+-------------- - table1_groupb | 2 - table2_groupb | 2 - table1_groupc | 3 - table2_groupc | 3 - table1_groupd | 4 - table2_groupd | 4 - table1_groupe | 5 - table2_groupe | 5 - table3_groupe | 5 - table1_groupf | 6 - table2_groupf | 6 -(11 rows) + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +-------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupe | 5 + table2_groupe | 5 + table3_groupe | 5 + table1_groupf | 6 + table2_groupf | 6 + table1_group_none | 7 + table2_group_none | 8 +(13 rows) +-- move the all tables in colocation group 5 to colocation group 7 +SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']); + mark_tables_colocated +----------------------- + +(1 row) + +-- move a table with a colocation id which is already not in pg_dist_colocation +SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']); + mark_tables_colocated +----------------------- + +(1 row) + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 6 | 1 | 2 | 23 +(4 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + logicalrelid | colocationid +-------------------+-------------- + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table1_groupf | 6 + table2_groupf | 6 + table1_groupe | 7 + table2_groupe | 7 + table3_groupe | 7 + table1_group_none | 7 + table2_group_none | 7 +(13 rows) + +-- try to colocate different replication models +CREATE TABLE table1_groupG ( id int ); +SELECT create_distributed_table('table1_groupG', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- update replication model +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid = 'table1_groupG'::regclass; +CREATE TABLE table2_groupG ( id int ); +SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'table1_groupG'); +ERROR: cannot colocate tables table1_groupg and table2_groupg +DETAIL: Replication models don't match for table1_groupg and table2_groupg. +CREATE TABLE table2_groupG ( id int ); +ERROR: relation "table2_groupg" already exists +SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'NONE'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT mark_tables_colocated('table1_groupG', ARRAY['table2_groupG']); +ERROR: cannot colocate tables table1_groupg and table2_groupg +DETAIL: Replication models don't match for table1_groupg and table2_groupg. +-- drop tables to clean test space +DROP TABLE table1_groupb; +DROP TABLE table2_groupb; +DROP TABLE table1_groupc; +DROP TABLE table2_groupc; +DROP TABLE table1_groupd; +DROP TABLE table2_groupd; +DROP TABLE table1_groupf; +DROP TABLE table2_groupf; +DROP TABLE table1_groupe; +DROP TABLE table2_groupe; +DROP TABLE table3_groupe; +DROP TABLE table4_groupe; +DROP TABLE schema_collocation.table4_groupe; +DROP TABLE table1_group_none_1; +DROP TABLE table2_group_none_1; +DROP TABLE table1_group_none_2; +DROP TABLE table1_group_none_3; +DROP TABLE table1_group_none; +DROP TABLE table2_group_none; +DROP TABLE table1_group_default; diff --git a/src/test/regress/expected/multi_distribution_metadata.out b/src/test/regress/expected/multi_distribution_metadata.out index 5831d849e..22740e09f 100644 --- a/src/test/regress/expected/multi_distribution_metadata.out +++ b/src/test/regress/expected/multi_distribution_metadata.out @@ -156,7 +156,7 @@ SELECT partition_type('events_hash'); (1 row) SELECT partition_type('pg_type'); -ERROR: relation 1247 is not distributed +ERROR: relation pg_type is not distributed -- should see true for events_hash, false for others SELECT is_distributed_table('events_hash'); is_distributed_table diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ae79c879c..8ffcce4c3 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; +ALTER EXTENSION citus UPDATE TO '6.1-5'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 184d21173..c1dfaeedb 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation; CREATE TABLE schema_collocation.table4_groupE ( id int ); SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); +-- test colocate_with option +CREATE TABLE table1_group_none_1 ( id int ); +SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none'); + +CREATE TABLE table2_group_none_1 ( id int ); +SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1'); + +CREATE TABLE table1_group_none_2 ( id int ); +SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none'); + +CREATE TABLE table4_groupE ( id int ); +SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default'); + +SET citus.shard_count = 3; + +-- check that this new configuration does not have a default group +CREATE TABLE table1_group_none_3 ( id int ); +SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE'); + +-- a new table does not use a non-default group +CREATE TABLE table1_group_default ( id int ); +SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- check failing colocate_with options +CREATE TABLE table_postgresql( id int ); +CREATE TABLE table_failing ( id int ); + +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append'); +SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table'); +SELECT create_distributed_table('table_failing', 'id', colocate_with => ''); +SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL); + +-- check with different distribution column types +CREATE TABLE table_bigint ( id bigint ); +SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); + -- check worker table schemas \c - - - :worker_1_port \d table3_groupE_1300050 @@ -255,8 +302,8 @@ CREATE TABLE table2_groupF ( id int ); SELECT create_reference_table('table2_groupF'); -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; -- cross check with internal colocation API @@ -304,13 +351,13 @@ UPDATE pg_dist_partition SET colocationid = 0 WHERE colocationid >= 1 AND colocationid < 1000; -- check metadata -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- first check failing cases SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); @@ -320,13 +367,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']); -- check metadata to see failing calls didn't have any side effects -SELECT * FROM pg_dist_colocation - WHERE colocationid >= 1 AND colocationid < 1000 +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; -- check successfully cololated tables SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); @@ -338,11 +385,71 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']); -- check to colocate with itself SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']); +SET citus.shard_count = 2; + +CREATE TABLE table1_group_none ( id int ); +SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE'); + +CREATE TABLE table2_group_none ( id int ); +SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE'); + -- check metadata to see colocation groups are created successfully SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 ORDER BY colocationid; SELECT logicalrelid, colocationid FROM pg_dist_partition - WHERE colocationid >= 1 AND colocationid < 1000 - ORDER BY logicalrelid; + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- move the all tables in colocation group 5 to colocation group 7 +SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']); + +-- move a table with a colocation id which is already not in pg_dist_colocation +SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']); + +-- check metadata to see that unused colocation group is deleted +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid, logicalrelid; + +-- try to colocate different replication models +CREATE TABLE table1_groupG ( id int ); +SELECT create_distributed_table('table1_groupG', 'id'); + +-- update replication model +UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid = 'table1_groupG'::regclass; + +CREATE TABLE table2_groupG ( id int ); +SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'table1_groupG'); + +CREATE TABLE table2_groupG ( id int ); +SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'NONE'); + +SELECT mark_tables_colocated('table1_groupG', ARRAY['table2_groupG']); + +-- drop tables to clean test space +DROP TABLE table1_groupb; +DROP TABLE table2_groupb; +DROP TABLE table1_groupc; +DROP TABLE table2_groupc; +DROP TABLE table1_groupd; +DROP TABLE table2_groupd; +DROP TABLE table1_groupf; +DROP TABLE table2_groupf; +DROP TABLE table1_groupe; +DROP TABLE table2_groupe; +DROP TABLE table3_groupe; +DROP TABLE table4_groupe; +DROP TABLE schema_collocation.table4_groupe; +DROP TABLE table1_group_none_1; +DROP TABLE table2_group_none_1; +DROP TABLE table1_group_none_2; +DROP TABLE table1_group_none_3; +DROP TABLE table1_group_none; +DROP TABLE table2_group_none; +DROP TABLE table1_group_default; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 209955668..314d17ade 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-4'; +ALTER EXTENSION citus UPDATE TO '6.1-5'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*)