Add colocate_with option to create_distributed_table()

With this commit, we support three versions of colocate_with: i.default, ii.none
and iii. a specific table name.
pull/1012/head
Metin Doslu 2016-12-01 18:06:43 +02:00
parent 9d4e586457
commit be5b633e30
14 changed files with 506 additions and 120 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-1 6.1-2 6.1-3 6.1-4
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -101,6 +101,8 @@ $(EXTENSION)--6.1-2.sql: $(EXTENSION)--6.1-1.sql $(EXTENSION)--6.1-1--6.1-2.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql $(EXTENSION)--6.1-3.sql: $(EXTENSION)--6.1-2.sql $(EXTENSION)--6.1-2--6.1-3.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.1-4.sql: $(EXTENSION)--6.1-3.sql $(EXTENSION)--6.1-3--6.1-4.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,29 @@
/* citus--6.1-3--6.1-4.sql */
SET search_path = 'pg_catalog';
ALTER TABLE pg_dist_colocation ADD COLUMN defaultgroup BOOLEAN;
UPDATE pg_dist_colocation SET defaultgroup = TRUE;
DROP INDEX pg_dist_colocation_configuration_index;
CREATE INDEX pg_dist_colocation_configuration_index
ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype, defaultgroup);
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type);
CREATE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text)
IS 'creates a distributed table';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.1-3' default_version = '6.1-4'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -63,6 +63,7 @@
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, uint32 colocationId); char distributionMethod, uint32 colocationId);
static char LookupDistributionMethod(Oid distributionMethodOid); static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid ColumnType(Oid relationId, char *columnName);
static void RecordDistributedRelationDependencies(Oid distributedRelationId, static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey); Node *distributionKey);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
@ -76,10 +77,9 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
uint32 colocationId); uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);
static uint32 ColocationId(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void); static uint32 GetNextColocationId(void);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor); int shardCount, int replicationFactor);
@ -123,10 +123,29 @@ create_distributed_table(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1); text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2); Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *distributionColumnName = text_to_cstring(distributionColumnText); char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid); char distributionMethod = LookupDistributionMethod(distributionMethodOid);
/* check if we try to colocate with hash distributed tables */
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
Oid colocateWithTableOid = ResolveRelationId(colocateWithTableNameText);
char colocateWithTableDistributionMethod = PartitionMethod(colocateWithTableOid);
if (colocateWithTableDistributionMethod != DISTRIBUTE_BY_HASH ||
distributionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation"),
errdetail("Currently, colocate_with option is only supported "
"for hash distributed tables.")));
}
}
/* if distribution method is not hash, just create partition metadata */ /* if distribution method is not hash, just create partition metadata */
if (distributionMethod != DISTRIBUTE_BY_HASH) if (distributionMethod != DISTRIBUTE_BY_HASH)
{ {
@ -135,8 +154,9 @@ create_distributed_table(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/* use configuration values for shard count and shard replication factor*/ /* use configuration values for shard count and shard replication factor */
CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, CreateHashDistributedTable(relationId, distributionColumnName,
colocateWithTableName, ShardCount,
ShardReplicationFactor); ShardReplicationFactor);
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -154,6 +174,7 @@ create_reference_table(PG_FUNCTION_ARGS)
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
int shardCount = 1; int shardCount = 1;
AttrNumber firstColumnAttrNumber = 1; AttrNumber firstColumnAttrNumber = 1;
char *colocateWithTableName = "default";
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
if (firstColumnName == NULL) if (firstColumnName == NULL)
@ -164,8 +185,8 @@ create_reference_table(PG_FUNCTION_ARGS)
"least one column", relationName))); "least one column", relationName)));
} }
CreateHashDistributedTable(relationId, firstColumnName, shardCount, CreateHashDistributedTable(relationId, firstColumnName, colocateWithTableName,
ShardReplicationFactor); shardCount, ShardReplicationFactor);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -725,6 +746,19 @@ LookupDistributionMethod(Oid distributionMethodOid)
} }
/*
* ColumnType returns the column type of the given column.
*/
static Oid
ColumnType(Oid relationId, char *columnName)
{
AttrNumber columnIndex = get_attnum(relationId, columnName);
Oid columnType = get_atttype(relationId, columnIndex);
return columnType;
}
/* /*
* SupportFunctionForColumn locates a support function given a column, an access method, * SupportFunctionForColumn locates a support function given a column, an access method,
* and and id of a support function. This function returns InvalidOid if there is no * and and id of a support function. This function returns InvalidOid if there is no
@ -849,19 +883,22 @@ CreateTruncateTrigger(Oid relationId)
/* /*
* ColocationId searches pg_dist_colocation for shard count, replication factor * DefaultColocationId searches pg_dist_colocation for the default colocation group
* and distribution column type. If a matching entry is found, it returns the * with the given configuration: shard count, replication factor and distribution
* colocation id, otherwise it returns INVALID_COLOCATION_ID. * column type. If a matching entry is found, it returns the colocation id,
* otherwise it returns INVALID_COLOCATION_ID.
*/ */
static uint32 uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) DefaultColocationGroupId(int shardCount, int replicationFactor,
Oid distributionColumnType)
{ {
uint32 colocationId = INVALID_COLOCATION_ID; uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL; HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor; SysScanDesc scanDescriptor;
const int scanKeyCount = 3; const int scanKeyCount = 4;
ScanKeyData scanKey[scanKeyCount]; ScanKeyData scanKey[scanKeyCount];
bool indexOK = true; bool indexOK = true;
bool defaulColocationGroup = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
@ -872,6 +909,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaulColocationGroup));
scanDescriptor = systable_beginscan(pgDistColocation, scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(), DistColocationConfigurationIndexId(),
@ -899,7 +938,8 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
* colocation id. * colocation id.
*/ */
uint32 uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType,
bool defaultColocationGroup)
{ {
uint32 colocationId = GetNextColocationId(); uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL; Relation pgDistColocation = NULL;
@ -918,6 +958,8 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
UInt32GetDatum(replicationFactor); UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] = values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType); ObjectIdGetDatum(distributionColumnType);
values[Anum_pg_dist_colocation_defaultgroup - 1] =
BoolGetDatum(defaultColocationGroup);
/* open colocation relation and insert the new tuple */ /* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
@ -947,7 +989,7 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
* sequence created in initdb to generate unique identifiers. * sequence created in initdb to generate unique identifiers.
*/ */
static uint32 static uint32
GetNextColocationId() GetNextColocationId(void)
{ {
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName); Oid sequenceId = ResolveRelationId(sequenceName);
@ -977,19 +1019,17 @@ GetNextColocationId()
*/ */
static void static void
CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor) int shardCount, int replicationFactor)
{ {
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
Relation pgDistColocation = NULL; Relation pgDistColocation = NULL;
Var *distributionColumn = NULL;
Oid distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID; uint32 colocationId = INVALID_COLOCATION_ID;
Oid colocationTableId = InvalidOid;
Oid distributionColumnType = InvalidOid;
/* get distribution column type */ /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
distributedRelation = relation_open(relationId, AccessShareLock); distributedRelation = relation_open(relationId, AccessShareLock);
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumnType = distributionColumn->vartype;
/* /*
* Get an exclusive lock on the colocation system catalog. Therefore, we * Get an exclusive lock on the colocation system catalog. Therefore, we
@ -998,35 +1038,66 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
*/ */
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
/* check for existing colocations */ /* get distribution column data type */
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); distributionColumnType = ColumnType(relationId, distributionColumnName);
/* if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
{ {
char *relationName = get_rel_name(relationId); /* check for default colocation group */
colocationId = DefaultColocationGroupId(shardCount, replicationFactor,
Oid colocatedTableId = ColocatedTableId(colocationId); distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnName, if (colocationId == INVALID_COLOCATION_ID)
DISTRIBUTE_BY_HASH, colocationId); {
bool defaultColocationGroup = true;
CreateColocatedShards(relationId, colocatedTableId); colocationId = CreateColocationGroup(shardCount, replicationFactor,
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", distributionColumnType,
relationName, colocationId))); defaultColocationGroup);
}
else
{
colocationTableId = ColocatedTableId(colocationId);
}
}
else if (pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) == 0)
{
bool defaultColocationGroup = false;
colocationId = CreateColocationGroup(shardCount, replicationFactor,
distributionColumnType,
defaultColocationGroup);
} }
else else
{ {
colocationId = CreateColocationGroup(shardCount, replicationFactor, Var *colocationTablePartitionColumn = NULL;
distributionColumnType); Oid colocationTablePartitionColumnType = InvalidOid;
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_HASH, colocationId);
/* use the default way to create shards */ /* get colocation group of the target table */
text *colocateWithTableNameText = cstring_to_text(colocateWithTableName);
colocationTableId = ResolveRelationId(colocateWithTableNameText);
colocationId = TableColocationId(colocationTableId);
colocationTablePartitionColumn = PartitionKey(colocationTableId);
colocationTablePartitionColumnType = colocationTablePartitionColumn->vartype;
if (colocationTablePartitionColumnType != distributionColumnType)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot colocate with %s", colocateWithTableName),
errdetail("Distribution column types are different.")));
}
}
/* create distributed table metadata */
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
colocationId);
/* create shards */
if (colocationTableId != InvalidOid)
{
CreateColocatedShards(relationId, colocationTableId);
}
else
{
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
} }

View File

@ -91,6 +91,7 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
Var *targetDistributionColumn = NULL; Var *targetDistributionColumn = NULL;
Oid sourceDistributionColumnType = InvalidOid; Oid sourceDistributionColumnType = InvalidOid;
Oid targetDistributionColumnType = InvalidOid; Oid targetDistributionColumnType = InvalidOid;
bool defaultColocationGroup = false;
CheckHashPartitionedTable(sourceRelationId); CheckHashPartitionedTable(sourceRelationId);
CheckHashPartitionedTable(targetRelationId); CheckHashPartitionedTable(targetRelationId);
@ -132,9 +133,19 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
{ {
uint32 shardCount = ShardIntervalCount(sourceRelationId); uint32 shardCount = ShardIntervalCount(sourceRelationId);
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId); uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
uint32 defaultColocationId = INVALID_COLOCATION_ID;
/* check if there is a default colocation group */
defaultColocationId = DefaultColocationGroupId(shardCount, shardReplicationFactor,
sourceDistributionColumnType);
if (defaultColocationId == INVALID_COLOCATION_ID)
{
defaultColocationGroup = true;
}
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor, sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
sourceDistributionColumnType); sourceDistributionColumnType,
defaultColocationGroup);
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId); UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
} }

View File

@ -260,8 +260,9 @@ DistributedTableCacheEntry(Oid distributedRelationId)
} }
else else
{ {
ereport(ERROR, (errmsg("relation %u is not distributed", char *relationName = get_rel_name(distributedRelationId);
distributedRelationId))); ereport(ERROR, (errmsg("relation %s is not distributed",
relationName)));
} }
} }

View File

@ -18,6 +18,8 @@
#define INVALID_COLOCATION_ID 0 #define INVALID_COLOCATION_ID 0
extern uint32 TableColocationId(Oid distributedTableId); extern uint32 TableColocationId(Oid distributedTableId);
extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor,
Oid distributionColumnType);
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
extern bool ShardsColocated(ShardInterval *leftShardInterval, extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval); ShardInterval *rightShardInterval);
@ -26,7 +28,8 @@ extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId); extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType); Oid distributionColumnType,
bool defaultColocationGroup);
#endif /* COLOCATION_UTILS_H_ */ #endif /* COLOCATION_UTILS_H_ */

View File

@ -22,6 +22,7 @@ typedef struct FormData_pg_dist_colocation
uint32 shardcount; uint32 shardcount;
uint32 replicationfactor; uint32 replicationfactor;
Oid distributioncolumntype; Oid distributioncolumntype;
bool defaultgroup;
} FormData_pg_dist_colocation; } FormData_pg_dist_colocation;
/* ---------------- /* ----------------
@ -35,11 +36,12 @@ typedef FormData_pg_dist_colocation *Form_pg_dist_colocation;
* compiler constants for pg_dist_colocation * compiler constants for pg_dist_colocation
* ---------------- * ----------------
*/ */
#define Natts_pg_dist_colocation 4 #define Natts_pg_dist_colocation 5
#define Anum_pg_dist_colocation_colocationid 1 #define Anum_pg_dist_colocation_colocationid 1
#define Anum_pg_dist_colocation_shardcount 2 #define Anum_pg_dist_colocation_shardcount 2
#define Anum_pg_dist_colocation_replicationfactor 3 #define Anum_pg_dist_colocation_replicationfactor 3
#define Anum_pg_dist_colocation_distributioncolumntype 4 #define Anum_pg_dist_colocation_distributioncolumntype 4
#define Anum_pg_dist_colocation_defaultgroup 5
#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq" #define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq"

View File

@ -432,12 +432,12 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
1 | 2 | 2 | 23 1 | 2 | 2 | 23 | t
2 | 2 | 1 | 23 2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 4 | 4 | 2 | 23 | t
(4 rows) (4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition
@ -459,16 +459,16 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
-- check effects of dropping tables -- check effects of dropping tables
DROP TABLE table1_groupA; DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1; SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
1 | 2 | 2 | 23 1 | 2 | 2 | 23 | t
(1 row) (1 row)
-- dropping all tables in a colocation group also deletes the colocation group -- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA; DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1; SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
(0 rows) (0 rows)
-- create dropped colocation group again -- create dropped colocation group again
@ -504,6 +504,118 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
(1 row) (1 row)
-- test colocate_with option
CREATE TABLE table1_group_none_1 ( id int );
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_group_none_1 ( id int );
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table1_group_none_2 ( id int );
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table4_groupE ( id int );
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
create_distributed_table
--------------------------
(1 row)
SET citus.shard_count = 3;
-- check that this new configuration does not have a default group
CREATE TABLE table1_group_none_3 ( id int );
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
-- a new table does not use a non-default group
CREATE TABLE table1_group_default ( id int );
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
create_distributed_table
--------------------------
(1 row)
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------+--------------
2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 | t
6 | 2 | 2 | 23 | f
7 | 2 | 2 | 23 | f
8 | 3 | 2 | 23 | f
9 | 3 | 2 | 23 | t
(8 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
----------------------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
table1_groupe | 5
table2_groupe | 5
table3_groupe | 5
schema_collocation.table4_groupe | 5
table4_groupe | 5
table1_group_none_1 | 6
table2_group_none_1 | 6
table1_group_none_2 | 7
table1_group_none_3 | 8
table1_group_default | 9
(17 rows)
-- check failing colocate_with options
CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
ERROR: cannot distribute relation
DETAIL: Currently, colocate_with option is only supported for hash distributed tables.
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
ERROR: relation table_postgresql is not distributed
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
ERROR: relation "no_table" does not exist
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
ERROR: invalid name syntax
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
create_distributed_table
--------------------------
(1 row)
-- check with different distribution column types
CREATE TABLE table_bigint ( id bigint );
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
ERROR: cannot colocate with table1_groupE
DETAIL: Distribution column types are different.
-- check worker table schemas -- check worker table schemas
\c - - - :worker_1_port \c - - - :worker_1_port
\d table3_groupE_1300050 \d table3_groupE_1300050
@ -538,14 +650,74 @@ SELECT create_reference_table('table2_groupF');
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
2 | 2 | 1 | 23 2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 5 | 2 | 2 | 23 | t
6 | 1 | 2 | 23 6 | 2 | 2 | 23 | f
(5 rows) 7 | 2 | 2 | 23 | f
8 | 3 | 2 | 23 | f
9 | 3 | 2 | 23 | t
10 | 1 | 2 | 23 | t
(9 rows)
-- test mark_colocation_group_default()
SELECT mark_colocation_group_default(7);
mark_colocation_group_default
-------------------------------
(1 row)
SELECT mark_colocation_group_default(8);
mark_colocation_group_default
-------------------------------
(1 row)
-- check metadata after mark_colocation_group_default() is run
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------+--------------
2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 | f
6 | 2 | 2 | 23 | f
7 | 2 | 2 | 23 | t
8 | 3 | 2 | 23 | t
9 | 3 | 2 | 23 | f
10 | 1 | 2 | 23 | t
(9 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
----------------------------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
table1_groupe | 5
table2_groupe | 5
table3_groupe | 5
schema_collocation.table4_groupe | 5
table4_groupe | 5
table1_group_none_1 | 6
table2_group_none_1 | 6
table1_group_none_2 | 7
table1_group_none_3 | 8
table1_group_default | 9
table1_groupf | 10
table2_groupf | 10
(19 rows)
-- cross check with internal colocation API -- cross check with internal colocation API
SELECT SELECT
@ -563,22 +735,27 @@ WHERE
ORDER BY ORDER BY
table1, table1,
table2; table2;
table1 | table2 | colocated table1 | table2 | colocated
---------------+----------------------------------+----------- ----------------------------------+----------------------------------+-----------
table1_group1 | table2_group1 | t table1_group1 | table2_group1 | t
table1_groupb | table2_groupb | t table1_groupb | table2_groupb | t
table1_groupc | table2_groupc | t table1_groupc | table2_groupc | t
table1_groupd | table2_groupd | t table1_groupd | table2_groupd | t
table1_groupd | table3_groupd | t table1_groupd | table3_groupd | t
table2_groupd | table3_groupd | t table2_groupd | table3_groupd | t
table1_groupe | table2_groupe | t table1_groupe | table2_groupe | t
table1_groupe | table3_groupe | t table1_groupe | table3_groupe | t
table1_groupe | schema_collocation.table4_groupe | t table1_groupe | schema_collocation.table4_groupe | t
table2_groupe | table3_groupe | t table1_groupe | table4_groupe | t
table2_groupe | schema_collocation.table4_groupe | t table2_groupe | table3_groupe | t
table3_groupe | schema_collocation.table4_groupe | t table2_groupe | schema_collocation.table4_groupe | t
table1_groupf | table2_groupf | t table2_groupe | table4_groupe | t
(13 rows) table3_groupe | schema_collocation.table4_groupe | t
table3_groupe | table4_groupe | t
schema_collocation.table4_groupe | table4_groupe | t
table1_group_none_1 | table2_group_none_1 | t
table1_groupf | table2_groupf | t
(18 rows)
-- check created shards -- check created shards
SELECT SELECT
@ -653,11 +830,39 @@ ORDER BY
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 table1_group_none_1 | 1300054 | t | 57637 | -2147483648 | -1
table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 table1_group_none_1 | 1300054 | t | 57638 | -2147483648 | -1
table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 table1_group_none_1 | 1300055 | t | 57638 | 0 | 2147483647
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 table1_group_none_1 | 1300055 | t | 57637 | 0 | 2147483647
(56 rows) table2_group_none_1 | 1300056 | t | 57638 | -2147483648 | -1
table2_group_none_1 | 1300056 | t | 57637 | -2147483648 | -1
table2_group_none_1 | 1300057 | t | 57637 | 0 | 2147483647
table2_group_none_1 | 1300057 | t | 57638 | 0 | 2147483647
table1_group_none_2 | 1300058 | t | 57637 | -2147483648 | -1
table1_group_none_2 | 1300058 | t | 57638 | -2147483648 | -1
table1_group_none_2 | 1300059 | t | 57638 | 0 | 2147483647
table1_group_none_2 | 1300059 | t | 57637 | 0 | 2147483647
table4_groupe | 1300060 | t | 57637 | -2147483648 | -1
table4_groupe | 1300060 | t | 57638 | -2147483648 | -1
table4_groupe | 1300061 | t | 57638 | 0 | 2147483647
table4_groupe | 1300061 | t | 57637 | 0 | 2147483647
table1_group_none_3 | 1300062 | t | 57637 | -2147483648 | -715827884
table1_group_none_3 | 1300062 | t | 57638 | -2147483648 | -715827884
table1_group_none_3 | 1300063 | t | 57638 | -715827883 | 715827881
table1_group_none_3 | 1300063 | t | 57637 | -715827883 | 715827881
table1_group_none_3 | 1300064 | t | 57637 | 715827882 | 2147483647
table1_group_none_3 | 1300064 | t | 57638 | 715827882 | 2147483647
table1_group_default | 1300065 | t | 57637 | -2147483648 | -715827884
table1_group_default | 1300065 | t | 57638 | -2147483648 | -715827884
table1_group_default | 1300066 | t | 57638 | -715827883 | 715827881
table1_group_default | 1300066 | t | 57637 | -715827883 | 715827881
table1_group_default | 1300067 | t | 57637 | 715827882 | 2147483647
table1_group_default | 1300067 | t | 57638 | 715827882 | 2147483647
table1_groupf | 1300068 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300068 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300069 | t | 57637 | -2147483648 | 2147483647
(84 rows)
-- reset colocation ids to test mark_tables_colocated -- reset colocation ids to test mark_tables_colocated
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
@ -669,13 +874,13 @@ UPDATE pg_dist_partition SET colocationid = 0
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
(0 rows) (0 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid; ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid logicalrelid | colocationid
--------------+-------------- --------------+--------------
(0 rows) (0 rows)
@ -700,13 +905,13 @@ DETAIL: Shard counts don't match for table1_groupb and table1_groupd.
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
(0 rows) (0 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid; ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid logicalrelid | colocationid
--------------+-------------- --------------+--------------
(0 rows) (0 rows)
@ -753,13 +958,13 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
SELECT * FROM pg_dist_colocation SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
2 | 2 | 1 | 23 2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 5 | 2 | 2 | 23 | t
6 | 1 | 2 | 23 6 | 1 | 2 | 23 | t
(5 rows) (5 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition

View File

@ -156,7 +156,7 @@ SELECT partition_type('events_hash');
(1 row) (1 row)
SELECT partition_type('pg_type'); SELECT partition_type('pg_type');
ERROR: relation 1247 is not distributed ERROR: relation pg_type is not distributed
-- should see true for events_hash, false for others -- should see true for events_hash, false for others
SELECT is_distributed_table('events_hash'); SELECT is_distributed_table('events_hash');
is_distributed_table is_distributed_table

View File

@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18';
ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-1';
ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)
FROM pg_depend AS pgd, FROM pg_depend AS pgd,

View File

@ -238,8 +238,8 @@ Indexes:
-- Check that pg_dist_colocation is not synced -- Check that pg_dist_colocation is not synced
SELECT * FROM pg_dist_colocation ORDER BY colocationid; SELECT * FROM pg_dist_colocation ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------+--------------
(0 rows) (0 rows)
-- Make sure that truncate trigger has been set for the MX table on worker -- Make sure that truncate trigger has been set for the MX table on worker

View File

@ -241,6 +241,53 @@ CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int ); CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
-- test colocate_with option
CREATE TABLE table1_group_none_1 ( id int );
SELECT create_distributed_table('table1_group_none_1', 'id', colocate_with => 'none');
CREATE TABLE table2_group_none_1 ( id int );
SELECT create_distributed_table('table2_group_none_1', 'id', colocate_with => 'table1_group_none_1');
CREATE TABLE table1_group_none_2 ( id int );
SELECT create_distributed_table('table1_group_none_2', 'id', colocate_with => 'none');
CREATE TABLE table4_groupE ( id int );
SELECT create_distributed_table('table4_groupE', 'id', colocate_with => 'default');
SET citus.shard_count = 3;
-- check that this new configuration does not have a default group
CREATE TABLE table1_group_none_3 ( id int );
SELECT create_distributed_table('table1_group_none_3', 'id', colocate_with => 'NONE');
-- a new table does not use a non-default group
CREATE TABLE table1_group_default ( id int );
SELECT create_distributed_table('table1_group_default', 'id', colocate_with => 'DEFAULT');
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
-- check failing colocate_with options
CREATE TABLE table_postgresql( id int );
CREATE TABLE table_failing ( id int );
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_append');
SELECT create_distributed_table('table_failing', 'id', 'append', 'table1_groupE');
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'table_postgresql');
SELECT create_distributed_table('table_failing', 'id', colocate_with => 'no_table');
SELECT create_distributed_table('table_failing', 'id', colocate_with => '');
SELECT create_distributed_table('table_failing', 'id', colocate_with => NULL);
-- check with different distribution column types
CREATE TABLE table_bigint ( id bigint );
SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE');
-- check worker table schemas -- check worker table schemas
\c - - - :worker_1_port \c - - - :worker_1_port
\d table3_groupE_1300050 \d table3_groupE_1300050
@ -259,6 +306,19 @@ SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid; ORDER BY colocationid;
-- test mark_colocation_group_default()
SELECT mark_colocation_group_default(7);
SELECT mark_colocation_group_default(8);
-- check metadata after mark_colocation_group_default() is run
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
-- cross check with internal colocation API -- cross check with internal colocation API
SELECT SELECT
p1.logicalrelid::regclass AS table1, p1.logicalrelid::regclass AS table1,
@ -310,7 +370,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid; ORDER BY colocationid, logicalrelid;
-- first check failing cases -- first check failing cases
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
@ -326,7 +386,7 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000 WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid; ORDER BY colocationid, logicalrelid;
-- check successfully cololated tables -- check successfully cololated tables
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']); SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);

View File

@ -61,6 +61,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-18';
ALTER EXTENSION citus UPDATE TO '6.1-1'; ALTER EXTENSION citus UPDATE TO '6.1-1';
ALTER EXTENSION citus UPDATE TO '6.1-2'; ALTER EXTENSION citus UPDATE TO '6.1-2';
ALTER EXTENSION citus UPDATE TO '6.1-3'; ALTER EXTENSION citus UPDATE TO '6.1-3';
ALTER EXTENSION citus UPDATE TO '6.1-4';
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog
SELECT COUNT(*) SELECT COUNT(*)