Merge pull request #1046 from citusdata/feature/colocate_with

Add colocate_with option to create_distributed_table()
pull/1027/head
Metin Döşlü 2016-12-16 14:33:22 +02:00 committed by GitHub
commit 6c333d464f
12 changed files with 830 additions and 256 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -103,6 +103,8 @@ $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
cat $^ > $@
$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql
cat $^ > $@
$(EXTENSION)--6.1-5.sql: $(EXTENSION)--6.1-4.sql $(EXTENSION)--6.1-4--6.1-5.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,20 @@
/* citus--6.1-4--6.1-5.sql */
SET search_path = 'pg_catalog';
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type);
CREATE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text)
IS 'creates a distributed table';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-4'
default_version = '6.1-5'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -9,7 +9,6 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/hash.h"
@ -33,7 +32,6 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h"
#include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h"
@ -76,11 +74,10 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
static uint32 ColocationId(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
/* exports for SQL callable functions */
@ -114,8 +111,9 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
/*
* create_distributed_table accepts a table, distribution column and
* distribution method, then it creates a distributed table.
* create_distributed_table gets a table name, distribution column,
* distribution method and colocate_with option, then it creates a
* distributed table.
*/
Datum
create_distributed_table(PG_FUNCTION_ARGS)
@ -126,6 +124,36 @@ create_distributed_table(PG_FUNCTION_ARGS)
char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
text *colocateWithTableNameText = NULL;
char *colocateWithTableName = NULL;
/* guard against a binary update without a function update */
if (PG_NARGS() >= 4)
{
colocateWithTableNameText = PG_GETARG_TEXT_P(3);
colocateWithTableName = text_to_cstring(colocateWithTableNameText);
}
else
{
colocateWithTableName = "default";
}
/* check if we try to colocate with hash distributed tables */
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText);
char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid);
if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH ||
distributionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
}
/* if distribution method is not hash, just create partition metadata */
if (distributionMethod != DISTRIBUTE_BY_HASH)
@ -136,7 +164,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
}
/* use configuration values for shard count and shard replication factor */
CreateHashDistributedTable(relationId, distributionColumnName, ShardCount,
CreateHashDistributedTable(relationId, distributionColumnName,
colocateWithTableName, ShardCount,
ShardReplicationFactor);
PG_RETURN_VOID();
@ -154,6 +183,7 @@ create_reference_table(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0);
int shardCount = 1;
AttrNumber firstColumnAttrNumber = 1;
char *colocateWithTableName = "default";
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
if (firstColumnName == NULL)
@ -164,8 +194,8 @@ create_reference_table(PG_FUNCTION_ARGS)
"least one column", relationName)));
}
CreateHashDistributedTable(relationId, firstColumnName, shardCount,
ShardReplicationFactor);
CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName,
shardCount, ShardReplicationFactor);
PG_RETURN_VOID();
}
@ -849,147 +879,21 @@ CreateTruncateTrigger(Oid relationId)
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
static uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 3;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/*
* CreateHashDistributedTable creates a hash distributed table with given
* shard count and shard replication factor.
* CreateHashDistributedTable creates a hash distributed table.
*/
static void
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor)
{
Relation distributedRelation = NULL;
Relation pgDistColocation = NULL;
Var *distributionColumn = NULL;
Oid distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
Oid sourceRelationId = InvalidOid;
Oid distributionColumnType = InvalidOid;
/* get distribution column type */
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
distributedRelation = relation_open(relationId, AccessShareLock);
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumnType = distributionColumn->vartype;
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
@ -998,38 +902,68 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
*/
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
/* check for existing colocations */
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType);
/* get distribution column data type */
distributionColumnType = ColumnType(relationId, distributionColumnName);
/*
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
{
char *relationName = get_rel_name(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_HASH, colocationId);
CreateColocatedShards(relationId, colocatedTableId);
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
relationName, colocationId)));
}
else
/* check for default colocation group */
colocationId = ColocationId(shardCount, replicationFactor,
distributionColumnType);
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId = CreateColocationGroup(shardCount, replicationFactor,
distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_HASH, colocationId);
}
else
{
sourceRelationId = ColocatedTableId(colocationId);
}
}
else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0)
{
colocationId = GetNextColocationId();
}
else
{
/* get colocation group of the target table */
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
sourceRelationId = ResolveRelationId(colocateWithTableNameText);
/* use the default way to create shards */
colocationId = TableColocationId(sourceRelationId);
}
/* create distributed table metadata */
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
colocationId);
/* create shards */
if (sourceRelationId != InvalidOid)
{
/* first run checks */
CheckReplicationModel(sourceRelationId, relationId);
CheckDistributionColumnType(sourceRelationId, relationId);
CreateColocatedShards(relationId, sourceRelationId);
}
else
{
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
}
/*
* ColumnType returns the column type of the given column.
*/
static Oid
ColumnType(Oid relationId, char *columnName)
{
AttrNumber columnIndex = get_attnum(relationId, columnName);
Oid columnType = get_atttype(relationId, columnIndex);
return columnType;
}

View File

@ -10,20 +10,24 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "commands/sequence.h"
#include "distributed/colocation_utils.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -37,6 +41,8 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
static List * ColocationGroupTableList(Oid colocationId);
static void DeleteColocationGroup(uint32 colocationId);
/* exports for SQL callable functions */
@ -69,6 +75,7 @@ mark_tables_colocated(PG_FUNCTION_ARGS)
for (relationIndex = 0; relationIndex < relationCount; relationIndex++)
{
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
MarkTablesColocated(sourceRelationId, nextRelationOid);
}
@ -86,32 +93,14 @@ static void
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
{
uint32 sourceColocationId = INVALID_COLOCATION_ID;
uint32 targetColocationId = INVALID_COLOCATION_ID;
Relation pgDistColocation = NULL;
Var *sourceDistributionColumn = NULL;
Var *targetDistributionColumn = NULL;
Oid sourceDistributionColumnType = InvalidOid;
Oid targetDistributionColumnType = InvalidOid;
CheckHashPartitionedTable(sourceRelationId);
CheckHashPartitionedTable(targetRelationId);
sourceDistributionColumn = PartitionKey(sourceRelationId);
sourceDistributionColumnType = sourceDistributionColumn->vartype;
targetDistributionColumn = PartitionKey(targetRelationId);
targetDistributionColumnType = targetDistributionColumn->vartype;
if (sourceDistributionColumnType != targetDistributionColumnType)
{
char *sourceRelationName = get_rel_name(sourceRelationId);
char *targetRelationName = get_rel_name(targetRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, targetRelationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
targetRelationName)));
}
CheckReplicationModel(sourceRelationId, targetRelationId);
CheckDistributionColumnType(sourceRelationId, targetRelationId);
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
@ -133,14 +122,31 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
uint32 shardCount = ShardIntervalCount(sourceRelationId);
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
Var *sourceDistributionColumn = PartitionKey(sourceRelationId);
Oid sourceDistributionColumnType = sourceDistributionColumn->vartype;
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
sourceDistributionColumnType);
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
}
targetColocationId = TableColocationId(targetRelationId);
/* finally set colocation group for the target relation */
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
/* if there is not any remaining table in the colocation group, delete it */
if (targetColocationId != INVALID_COLOCATION_ID)
{
List *colocatedTableList = ColocationGroupTableList(targetColocationId);
int colocatedTableCount = list_length(colocatedTableList);
if (colocatedTableCount == 0)
{
DeleteColocationGroup(targetColocationId);
}
}
heap_close(pgDistColocation, NoLock);
}
@ -339,6 +345,192 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
}
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 3;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/*
* CheckReplicationModel checks if given relations are from the same
* replication model. Otherwise, it errors out.
*/
void
CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
{
DistTableCacheEntry *sourceTableEntry = NULL;
DistTableCacheEntry *targetTableEntry = NULL;
char sourceReplicationModel = 0;
char targetReplicationModel = 0;
sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
sourceReplicationModel = sourceTableEntry->replicationModel;
targetTableEntry = DistributedTableCacheEntry(targetRelationId);
targetReplicationModel = targetTableEntry->replicationModel;
if (sourceReplicationModel != targetReplicationModel)
{
char *sourceRelationName = get_rel_name(sourceRelationId);
char *targetRelationName = get_rel_name(targetRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, targetRelationName),
errdetail("Replication models don't match for %s and %s.",
sourceRelationName, targetRelationName)));
}
}
/*
* CheckDistributionColumnType checks if distribution column types of relations
* are same. Otherwise, it errors out.
*/
void
CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId)
{
Var *sourceDistributionColumn = NULL;
Var *targetDistributionColumn = NULL;
Oid sourceDistributionColumnType = InvalidOid;
Oid targetDistributionColumnType = InvalidOid;
sourceDistributionColumn = PartitionKey(sourceRelationId);
sourceDistributionColumnType = sourceDistributionColumn->vartype;
targetDistributionColumn = PartitionKey(targetRelationId);
targetDistributionColumnType = targetDistributionColumn->vartype;
if (sourceDistributionColumnType != targetDistributionColumnType)
{
char *sourceRelationName = get_rel_name(sourceRelationId);
char *targetRelationName = get_rel_name(targetRelationId);
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
sourceRelationName, targetRelationName),
errdetail("Distribution column types don't match for "
"%s and %s.", sourceRelationName,
targetRelationName)));
}
}
/*
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
* for the given relation.
@ -475,6 +667,30 @@ ColocatedTableList(Oid distributedTableId)
uint32 tableColocationId = TableColocationId(distributedTableId);
List *colocatedTableList = NIL;
/*
* If distribution type of the table is not hash, the table is only co-located
* with itself.
*/
if (tableColocationId == INVALID_COLOCATION_ID)
{
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
return colocatedTableList;
}
colocatedTableList = ColocationGroupTableList(tableColocationId);
return colocatedTableList;
}
/*
* ColocationGroupTableList returns the list of tables in the given colocation
* group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL.
*/
static List *
ColocationGroupTableList(Oid colocationId)
{
List *colocatedTableList = NIL;
Relation pgDistPartition = NULL;
TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL;
@ -487,14 +703,13 @@ ColocatedTableList(Oid distributedTableId)
* If distribution type of the table is not hash, the table is only co-located
* with itself.
*/
if (tableColocationId == INVALID_COLOCATION_ID)
if (colocationId == INVALID_COLOCATION_ID)
{
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
return colocatedTableList;
return NIL;
}
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId));
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -633,3 +848,40 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex)
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
}
/*
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation.
*/
static void
DeleteColocationGroup(uint32 colocationId)
{
Relation pgDistColocation = NULL;
SysScanDesc scanDescriptor = NULL;
int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = false;
HeapTuple heapTuple = NULL;
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
/* if a record is found, delete it */
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
simple_heap_delete(pgDistColocation, &(heapTuple->t_self));
CatalogUpdateIndexes(pgDistColocation, heapTuple);
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
CommandCounterIncrement();
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, RowExclusiveLock);
}

View File

@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId)
}
else
{
ereport(ERROR, (errmsg("relation %u is not distributed",
distributedRelationId)));
char *relationName = get_rel_name(distributedRelationId);
ereport(ERROR, (errmsg("relation %s is not distributed",
relationName)));
}
}

View File

@ -25,8 +25,12 @@ extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType);
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType);
extern uint32 GetNextColocationId(void);
extern void CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId);
extern void CheckDistributionColumnType(Oid sourceRelationId, Oid targetRelationId);
#endif /* COLOCATION_UTILS_H_ */

View File

@ -504,6 +504,115 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
(1 row)
-- test colocate_with option
CREATE TABLE table1_group_none_1 ( id int );
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_group_none_1 ( id int );
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table1_group_none_2 ( id int );
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table4_groupE ( id int );
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
create_distributed_table
--------------------------
(1 row)
SET citus.shard_count = 3;
-- check that this new configuration does not have a default group
CREATE TABLE table1_group_none_3 ( id int );
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
-- a new table does not use a non-default group
CREATE TABLE table1_group_default ( id int );
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
create_distributed_table
--------------------------
(1 row)
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
9 | 3 | 2 | 23
(5 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
----------------------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
table1_groupe | 5
table2_groupe | 5
table3_groupe | 5
schema_collocation.table4_groupe | 5
table4_groupe | 5
table1_group_none_1 | 6
table2_group_none_1 | 6
table1_group_none_2 | 7
table1_group_none_3 | 8
table1_group_default | 9
(17 rows)
-- check failing colocate_with options
CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
ERROR: relation table_postgresql is not distributed
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
ERROR: relation "no_table" does not exist
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
ERROR: invalid name syntax
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
create_distributed_table
--------------------------
(1 row)
-- check with different distribution column types
CREATE TABLE table_bigint ( id bigint );
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
ERROR: cannot colocate tables table1_groupe and table_bigint
DETAIL: Distribution column types don't match for table1_groupe and table_bigint.
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
@ -544,8 +653,9 @@ SELECT * FROM pg_dist_colocation
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
6 | 1 | 2 | 23
(5 rows)
9 | 3 | 2 | 23
10 | 1 | 2 | 23
(6 rows)
-- cross check with internal colocation API
SELECT
@ -564,7 +674,7 @@ ORDER BY
table1,
table2;
table1 | table2 | colocated
---------------+----------------------------------+-----------
----------------------------------+----------------------------------+-----------
table1_group1 | table2_group1 | t
table1_groupb | table2_groupb | t
table1_groupc | table2_groupc | t
@ -574,11 +684,16 @@ ORDER BY
table1_groupe | table2_groupe | t
table1_groupe | table3_groupe | t
table1_groupe | schema_collocation.table4_groupe | t
table1_groupe | table4_groupe | t
table2_groupe | table3_groupe | t
table2_groupe | schema_collocation.table4_groupe | t
table2_groupe | table4_groupe | t
table3_groupe | schema_collocation.table4_groupe | t
table3_groupe | table4_groupe | t
schema_collocation.table4_groupe | table4_groupe | t
table1_group_none_1 | table2_group_none_1 | t
table1_groupf | table2_groupf | t
(13 rows)
(18 rows)
-- check created shards
SELECT
@ -653,11 +768,39 @@ ORDER BY
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
(56 rows)
table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1
table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1
table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647
table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647
table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1
table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1
table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647
table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647
table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1
table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1
table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647
table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647
table4_groupe | 1300060 | t | 57637 | -2147483648 | -1
table4_groupe | 1300060 | t | 57638 | -2147483648 | -1
table4_groupe | 1300061 | t | 57638 | 0 | 2147483647
table4_groupe | 1300061 | t | 57637 | 0 | 2147483647
table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884
table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884
table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881
table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881
table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647
table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647
table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884
table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884
table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881
table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881
table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647
table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647
table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647
(84 rows)
-- reset colocation ids to test mark_tables_colocated
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
@ -675,7 +818,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
--------------+--------------
(0 rows)
@ -706,7 +849,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
--------------+--------------
(0 rows)
@ -749,6 +892,21 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
(1 row)
SET citus.shard_count = 2;
CREATE TABLE table1_group_none ( id int );
SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_group_none ( id int );
SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
-- check metadata to see colocation groups are created successfully
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
@ -764,9 +922,9 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
---------------+--------------
-------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
@ -778,5 +936,99 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
table3_groupe | 5
table1_groupf | 6
table2_groupf | 6
(11 rows)
table1_group_none | 7
table2_group_none | 8
(13 rows)
-- move the all tables in colocation group 5 to colocation group 7
SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']);
mark_tables_colocated
-----------------------
(1 row)
-- move a table with a colocation id which is already not in pg_dist_colocation
SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']);
mark_tables_colocated
-----------------------
(1 row)
-- check metadata to see that unused colocation group is deleted
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
6 | 1 | 2 | 23
(4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
-------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table1_groupf | 6
table2_groupf | 6
table1_groupe | 7
table2_groupe | 7
table3_groupe | 7
table1_group_none | 7
table2_group_none | 7
(13 rows)
-- try to colocate different replication models
CREATE TABLE table1_groupG ( id int );
SELECT create_distributed_table('table1_groupG', 'id');
create_distributed_table
--------------------------
(1 row)
-- update replication model
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid = 'table1_groupG'::regclass;
CREATE TABLE table2_groupG ( id int );
SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'table1_groupG');
ERROR: cannot colocate tables table1_groupg and table2_groupg
DETAIL: Replication models don't match for table1_groupg and table2_groupg.
CREATE TABLE table2_groupG ( id int );
ERROR: relation "table2_groupg" already exists
SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
SELECT mark_tables_colocated('table1_groupG', ARRAY['table2_groupG']);
ERROR: cannot colocate tables table1_groupg and table2_groupg
DETAIL: Replication models don't match for table1_groupg and table2_groupg.
-- drop tables to clean test space
DROP TABLE table1_groupb;
DROP TABLE table2_groupb;
DROP TABLE table1_groupc;
DROP TABLE table2_groupc;
DROP TABLE table1_groupd;
DROP TABLE table2_groupd;
DROP TABLE table1_groupf;
DROP TABLE table2_groupf;
DROP TABLE table1_groupe;
DROP TABLE table2_groupe;
DROP TABLE table3_groupe;
DROP TABLE table4_groupe;
DROP TABLE schema_collocation.table4_groupe;
DROP TABLE table1_group_none_1;
DROP TABLE table2_group_none_1;
DROP TABLE table1_group_none_2;
DROP TABLE table1_group_none_3;
DROP TABLE table1_group_none;
DROP TABLE table2_group_none;
DROP TABLE table1_group_default;

View File

@ -156,7 +156,7 @@ SELECT partition_type('events_hash');
(1 row)
SELECT partition_type('pg_type');
ERROR: relation 1247 is not distributed
ERROR: relation pg_type is not distributed
-- should see true for events_hash, false for others
SELECT is_distributed_table('events_hash');
is_distributed_table

View File

@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1';
ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
-- test colocate_with option
CREATE TABLE table1_group_none_1 ( id int );
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
CREATE TABLE table2_group_none_1 ( id int );
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
CREATE TABLE table1_group_none_2 ( id int );
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
CREATE TABLE table4_groupE ( id int );
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
SET citus.shard_count = 3;
-- check that this new configuration does not have a default group
CREATE TABLE table1_group_none_3 ( id int );
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
-- a new table does not use a non-default group
CREATE TABLE table1_group_default ( id int );
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
-- check failing colocate_with options
CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
-- check with different distribution column types
CREATE TABLE table_bigint ( id bigint );
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
@ -310,7 +357,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
-- first check failing cases
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
@ -326,7 +373,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
-- check successfully cololated tables
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);
@ -338,6 +385,14 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']);
-- check to colocate with itself
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
SET citus.shard_count = 2;
CREATE TABLE table1_group_none ( id int );
SELECT create_distributed_table('table1_group_none', 'id', colocate_with => 'NONE');
CREATE TABLE table2_group_none ( id int );
SELECT create_distributed_table('table2_group_none', 'id', colocate_with => 'NONE');
-- check metadata to see colocation groups are created successfully
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
@ -345,4 +400,56 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
ORDER BY colocationid, logicalrelid;
-- move the all tables in colocation group 5 to colocation group 7
SELECT mark_tables_colocated('table1_group_none', ARRAY['table1_groupE', 'table2_groupE', 'table3_groupE']);
-- move a table with a colocation id which is already not in pg_dist_colocation
SELECT mark_tables_colocated('table1_group_none', ARRAY['table2_group_none']);
-- check metadata to see that unused colocation group is deleted
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
-- try to colocate different replication models
CREATE TABLE table1_groupG ( id int );
SELECT create_distributed_table('table1_groupG', 'id');
-- update replication model
UPDATE pg_dist_partition SET repmodel = 's' WHERE logicalrelid = 'table1_groupG'::regclass;
CREATE TABLE table2_groupG ( id int );
SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'table1_groupG');
CREATE TABLE table2_groupG ( id int );
SELECT create_distributed_table('table2_groupG', 'id', colocate_with => 'NONE');
SELECT mark_tables_colocated('table1_groupG', ARRAY['table2_groupG']);
-- drop tables to clean test space
DROP TABLE table1_groupb;
DROP TABLE table2_groupb;
DROP TABLE table1_groupc;
DROP TABLE table2_groupc;
DROP TABLE table1_groupd;
DROP TABLE table2_groupd;
DROP TABLE table1_groupf;
DROP TABLE table2_groupf;
DROP TABLE table1_groupe;
DROP TABLE table2_groupe;
DROP TABLE table3_groupe;
DROP TABLE table4_groupe;
DROP TABLE schema_collocation.table4_groupe;
DROP TABLE table1_group_none_1;
DROP TABLE table2_group_none_1;
DROP TABLE table1_group_none_2;
DROP TABLE table1_group_none_3;
DROP TABLE table1_group_none;
DROP TABLE table2_group_none;
DROP TABLE table1_group_default;

View File

@ -62,6 +62,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-1';
ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)