Merge pull request #785 from citusdata/colocation_features

Internal co-location API
pull/818/head
Burak Yücesoy 2016-09-29 12:43:02 +03:00 committed by GitHub
commit e2f720dbe7
19 changed files with 1028 additions and 11 deletions

View File

@ -7,7 +7,7 @@ MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3
5.2-1 5.2-2 5.2-3 5.2-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -55,6 +55,8 @@ $(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql
cat $^ > $@
$(EXTENSION)--5.2-3.sql: $(EXTENSION)--5.2-2.sql $(EXTENSION)--5.2-2--5.2-3.sql
cat $^ > $@
$(EXTENSION)--5.2-4.sql: $(EXTENSION)--5.2-3.sql $(EXTENSION)--5.2-3--5.2-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,7 @@
/* citus--5.2-3--5.2-4.sql */
ALTER TABLE pg_dist_partition ADD COLUMN colocationid BIGINT DEFAULT 0 NOT NULL;
CREATE INDEX pg_dist_partition_colocationid_index
ON pg_dist_partition using btree(colocationid);

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.2-3'
default_version = '5.2-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -27,6 +27,7 @@
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/trigger.h"
#include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_partition.h"
@ -287,6 +288,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionKeyString);
newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID;
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);

View File

@ -0,0 +1,159 @@
/*-------------------------------------------------------------------------
*
* test/src/colocations_utils.c
*
* This file contains functions to test co-location functionality
* within Citus.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "catalog/pg_type.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(get_table_colocation_id);
PG_FUNCTION_INFO_V1(tables_colocated);
PG_FUNCTION_INFO_V1(shards_colocated);
PG_FUNCTION_INFO_V1(get_colocated_table_array);
PG_FUNCTION_INFO_V1(get_colocated_shard_array);
PG_FUNCTION_INFO_V1(find_shard_interval_index);
/*
* get_table_colocation_id returns colocation id of given distributed table.
*/
Datum
get_table_colocation_id(PG_FUNCTION_ARGS)
{
Oid distributedTableId = PG_GETARG_OID(0);
int colocationId = TableColocationId(distributedTableId);
PG_RETURN_INT32(colocationId);
}
/*
* tables_colocated checks if given two tables are co-located or not. If they are
* co-located, this function returns true.
*/
Datum
tables_colocated(PG_FUNCTION_ARGS)
{
Oid leftDistributedTableId = PG_GETARG_OID(0);
Oid rightDistributedTableId = PG_GETARG_OID(1);
bool tablesColocated = TablesColocated(leftDistributedTableId,
rightDistributedTableId);
PG_RETURN_BOOL(tablesColocated);
}
/*
* shards_colocated checks if given two shards are co-located or not. If they are
* co-located, this function returns true.
*/
Datum
shards_colocated(PG_FUNCTION_ARGS)
{
uint32 leftShardId = PG_GETARG_UINT32(0);
uint32 rightShardId = PG_GETARG_UINT32(1);
ShardInterval *leftShard = LoadShardInterval(leftShardId);
ShardInterval *rightShard = LoadShardInterval(rightShardId);
bool shardsColocated = ShardsColocated(leftShard, rightShard);
PG_RETURN_BOOL(shardsColocated);
}
/*
* get_colocated_tables_array returns array of table oids which are co-located with given
* distributed table.
*/
Datum
get_colocated_table_array(PG_FUNCTION_ARGS)
{
Oid distributedTableId = PG_GETARG_OID(0);
ArrayType *colocatedTablesArrayType = NULL;
List *colocatedTableList = ColocatedTableList(distributedTableId);
ListCell *colocatedTableCell = NULL;
int colocatedTableCount = list_length(colocatedTableList);
Datum *colocatedTablesDatumArray = palloc0(colocatedTableCount * sizeof(Datum));
Oid arrayTypeId = OIDOID;
int colocatedTableIndex = 0;
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
Datum colocatedTableDatum = ObjectIdGetDatum(colocatedTableId);
colocatedTablesDatumArray[colocatedTableIndex] = colocatedTableDatum;
colocatedTableIndex++;
}
colocatedTablesArrayType = DatumArrayToArrayType(colocatedTablesDatumArray,
colocatedTableCount, arrayTypeId);
PG_RETURN_ARRAYTYPE_P(colocatedTablesArrayType);
}
/*
* get_colocated_shards_array returns array of shards ids which are co-located with given
* shard.
*/
Datum
get_colocated_shard_array(PG_FUNCTION_ARGS)
{
uint32 shardId = PG_GETARG_UINT32(0);
ShardInterval *shardInterval = LoadShardInterval(shardId);
ArrayType *colocatedShardsArrayType = NULL;
List *colocatedShardList = ColocatedShardPlacementList(shardInterval);
ListCell *colocatedShardCell = NULL;
int colocatedShardCount = list_length(colocatedShardList);
Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum));
Oid arrayTypeId = OIDOID;
int colocatedShardIndex = 0;
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShardInterval = (ShardInterval *) lfirst(
colocatedShardCell);
uint64 colocatedShardId = colocatedShardInterval->shardId;
Datum colocatedShardDatum = Int64GetDatum(colocatedShardId);
colocatedShardsDatumArray[colocatedShardIndex] = colocatedShardDatum;
colocatedShardIndex++;
}
colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray,
colocatedShardCount, arrayTypeId);
PG_RETURN_ARRAYTYPE_P(colocatedShardsArrayType);
}
/*
* find_shard_interval_index finds index of given shard in sorted shard interval list.
*/
Datum
find_shard_interval_index(PG_FUNCTION_ARGS)
{
uint32 shardId = PG_GETARG_UINT32(0);
ShardInterval *shardInterval = LoadShardInterval(shardId);
uint32 shardIndex = FindShardIntervalIndex(shardInterval);
PG_RETURN_INT32(shardIndex);
}

View File

@ -0,0 +1,221 @@
/*-------------------------------------------------------------------------
*
* colocation_utils.c
*
* This file contains functions to perform useful operations on co-located tables.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "distributed/colocation_utils.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/shardinterval_utils.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
/*
* TableColocationId function returns co-location id of given table. This function errors
* out if given table is not distributed.
*/
uint64
TableColocationId(Oid distributedTableId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
return cacheEntry->colocationId;
}
/*
* TablesColocated function checks whether given two tables are co-located and
* returns true if they are co-located. A table is always co-located with itself.
* If given two tables are different and they are not distributed, this function
* errors out.
*/
bool
TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId)
{
uint64 leftColocationId = INVALID_COLOCATION_ID;
uint64 rightColocationId = INVALID_COLOCATION_ID;
if (leftDistributedTableId == rightDistributedTableId)
{
return true;
}
leftColocationId = TableColocationId(leftDistributedTableId);
rightColocationId = TableColocationId(rightDistributedTableId);
if (leftColocationId == INVALID_COLOCATION_ID ||
rightColocationId == INVALID_COLOCATION_ID)
{
return false;
}
return leftColocationId == rightColocationId;
}
/*
* ShardsColocated function checks whether given two shards are co-located and
* returns true if they are co-located. Two shards are co-located either;
* - They are same (A shard is always co-located with itself).
* OR
* - Tables are hash partitioned.
* - Tables containing the shards are co-located.
* - Min/Max values of the shards are same.
*/
bool
ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval)
{
bool tablesColocated = TablesColocated(leftShardInterval->relationId,
rightShardInterval->relationId);
if (tablesColocated)
{
/*
* We do min/max value check here to decide whether two shards are co=located,
* instead we can simply use FindShardIntervalIndex function on both shards then
* but do index check, but we avoid it because this way it is more cheaper.
*
* Having co-located tables implies that tables are partitioned by hash partition
* therefore it is safe to use DatumGetInt32 here.
*/
int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue);
int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue);
int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue);
int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue);
bool minValuesEqual = leftShardMinValue == rightShardMinValue;
bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue;
return minValuesEqual && maxValuesEqual;
}
return false;
}
/*
* ColocatedTableList function returns list of relation ids which are co-located
* with given table. If given table is not hash distributed, co-location is not
* valid for that table and it is only co-located with itself.
*/
List *
ColocatedTableList(Oid distributedTableId)
{
int tableColocationId = TableColocationId(distributedTableId);
List *colocatedTableList = NIL;
Relation pgDistPartition = NULL;
TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
bool indexOK = true;
int scanKeyCount = 1;
ScanKeyData scanKey[1];
/*
* If distribution type of the table is not hash, the table is only co-located
* with itself.
*/
if (tableColocationId == INVALID_COLOCATION_ID)
{
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
return colocatedTableList;
}
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId));
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionColocationidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Oid colocatedTableId = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, AccessShareLock);
return colocatedTableList;
}
/*
* ColocatedShardPlacementList function returns list of shard intervals which are
* co-located with given shard. If given shard is belong to append or range distributed
* table, co-location is not valid for that shard. Therefore such shard is only co-located
* with itself.
*/
List *
ColocatedShardPlacementList(ShardInterval *shardInterval)
{
Oid distributedTableId = shardInterval->relationId;
List *colocatedShardList = NIL;
int shardIntervalIndex = -1;
List *colocatedTableList = NIL;
ListCell *colocatedTableCell = NULL;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
/*
* If distribution type of the table is not hash, each shard of the table is only
* co-located with itself.
*/
if (partitionMethod != DISTRIBUTE_BY_HASH)
{
colocatedShardList = lappend(colocatedShardList, shardInterval);
return colocatedShardList;
}
shardIntervalIndex = FindShardIntervalIndex(shardInterval);
colocatedTableList = ColocatedTableList(distributedTableId);
/* FindShardIntervalIndex have to find index of given shard */
Assert(shardIntervalIndex >= 0);
foreach(colocatedTableCell, colocatedTableList)
{
Oid colocatedTableId = lfirst_oid(colocatedTableCell);
DistTableCacheEntry *colocatedTableCacheEntry =
DistributedTableCacheEntry(colocatedTableId);
ShardInterval *colocatedShardInterval = NULL;
/*
* Since we iterate over co-located tables, shard count of each table should be
* same and greater than shardIntervalIndex.
*/
Assert(cacheEntry->shardIntervalArrayLength ==
colocatedTableCacheEntry->shardIntervalArrayLength);
colocatedShardInterval =
colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex];
colocatedShardList = lappend(colocatedShardList, colocatedShardInterval);
}
Assert(list_length(colocatedTableList) == list_length(colocatedShardList));
return colocatedShardList;
}

View File

@ -23,6 +23,7 @@
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "commands/trigger.h"
#include "distributed/colocation_utils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/pg_dist_partition.h"
@ -50,6 +51,7 @@ static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid;
static Oid distShardLogicalRelidIndexId = InvalidOid;
static Oid distShardShardidIndexId = InvalidOid;
static Oid distShardPlacementShardidIndexId = InvalidOid;
@ -218,6 +220,7 @@ LookupDistTableCacheEntry(Oid relationId)
HeapTuple distPartitionTuple = NULL;
char *partitionKeyString = NULL;
char partitionMethod = 0;
uint64 colocationId = INVALID_COLOCATION_ID;
List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0;
ShardInterval **shardIntervalArray = NULL;
@ -256,14 +259,23 @@ LookupDistTableCacheEntry(Oid relationId)
(Form_pg_dist_partition) GETSTRUCT(distPartitionTuple);
Datum partitionKeyDatum = 0;
MemoryContext oldContext = NULL;
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
bool isNull = false;
partitionKeyDatum = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_partkey,
RelationGetDescr(pgDistPartition),
tupleDescriptor,
&isNull);
Assert(!isNull);
colocationId = heap_getattr(distPartitionTuple,
Anum_pg_dist_partition_colocationid, tupleDescriptor,
&isNull);
if (isNull)
{
colocationId = INVALID_COLOCATION_ID;
}
oldContext = MemoryContextSwitchTo(CacheMemoryContext);
partitionKeyString = TextDatumGetCString(partitionKeyDatum);
partitionMethod = partitionForm->partmethod;
@ -378,6 +390,7 @@ LookupDistTableCacheEntry(Oid relationId)
cacheEntry->isDistributedTable = true;
cacheEntry->partitionKeyString = partitionKeyString;
cacheEntry->partitionMethod = partitionMethod;
cacheEntry->colocationId = colocationId;
cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength;
cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray;
cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction;
@ -614,6 +627,17 @@ DistPartitionLogicalRelidIndexId(void)
}
/* return oid of pg_dist_partition_colocationid_index index */
Oid
DistPartitionColocationidIndexId(void)
{
CachedRelationLookup("pg_dist_partition_colocationid_index",
&distPartitionColocationidIndexId);
return distPartitionColocationidIndexId;
}
/* return oid of pg_dist_shard_logical_relid_index index */
Oid
DistShardLogicalRelidIndexId(void)
@ -1013,6 +1037,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardPlacementRelationId = InvalidOid;
distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid;
distShardLogicalRelidIndexId = InvalidOid;
distShardShardidIndexId = InvalidOid;
distShardPlacementShardidIndexId = InvalidOid;

View File

@ -14,6 +14,7 @@
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "catalog/pg_type.h"
#include "distributed/metadata_cache.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/worker_protocol.h"
@ -106,6 +107,56 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement)
}
/*
* FindShardIntervalIndex finds index of given shard in sorted shard interval array. For
* this purpose, it calculates hash value of a number in its range(e.g. min value) and
* finds which shard should contain the hashed value. Therefore this function only works
* for hash distributed tables.
*/
int
FindShardIntervalIndex(ShardInterval *shardInterval)
{
Oid distributedTableId = shardInterval->relationId;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
char partitionMethod = cacheEntry->partitionMethod;
int shardCount = 0;
int32 shardMinValue = 0;
uint64 hashTokenIncrement = 0;
int shardIndex = -1;
/*
* We can support it for other types of partitioned tables with simple binary scan
* but it is not necessary at the moment. If we need that simply check algorithm in
* FindShardInterval and SearchCachedShardInterval.
*/
if (partitionMethod != DISTRIBUTE_BY_HASH)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("finding index of given shard is not supported for "
"non-hash partitioned tables")));
}
shardCount = cacheEntry->shardIntervalArrayLength;
shardMinValue = DatumGetInt32(shardInterval->minValue);
hashTokenIncrement = HASH_TOKEN_COUNT / shardCount;
shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement;
Assert(shardIndex <= shardCount);
/*
* If the shard count is not power of 2, the range of the last
* shard becomes larger than others. For that extra piece of range,
* we still need to use the last shard.
*/
if (shardIndex == shardCount)
{
shardIndex = shardCount - 1;
}
return shardIndex;
}
/*
* FindShardInterval finds a single shard interval in the cache for the
* given partition column value.

View File

@ -0,0 +1,27 @@
/*-------------------------------------------------------------------------
*
* colocation_utils.h
*
* Declarations for public utility functions related to co-located tables.
*
* Copyright (c) 2014-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef COLOCATION_UTILS_H_
#define COLOCATION_UTILS_H_
#include "distributed/shardinterval_utils.h"
#include "nodes/pg_list.h"
#define INVALID_COLOCATION_ID 0
extern uint64 TableColocationId(Oid distributedTableId);
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardPlacementList(ShardInterval *shardInterval);
#endif /* COLOCATION_UTILS_H_ */

View File

@ -38,6 +38,7 @@ typedef struct
/* pg_dist_partition metadata for this table */
char *partitionKeyString;
char partitionMethod;
uint64 colocationId;
/* pg_dist_shard metadata (variable-length ShardInterval array) for this table */
int shardIntervalArrayLength;
@ -62,6 +63,7 @@ extern Oid DistShardPlacementRelationId(void);
/* index oids */
extern Oid DistPartitionLogicalRelidIndexId(void);
extern Oid DistPartitionColocationidIndexId(void);
extern Oid DistShardLogicalRelidIndexId(void);
extern Oid DistShardShardidIndexId(void);
extern Oid DistShardPlacementShardidIndexId(void);

View File

@ -21,11 +21,12 @@
*/
typedef struct FormData_pg_dist_partition
{
Oid logicalrelid; /* logical relation id; references pg_class oid */
char partmethod; /* partition method; see codes below */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text partkey; /* partition key expression */
Oid logicalrelid; /* logical relation id; references pg_class oid */
char partmethod; /* partition method; see codes below */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text partkey; /* partition key expression */
#endif
uint64 colocationid; /* id of the co-location group of particular table belongs to */
} FormData_pg_dist_partition;
/* ----------------
@ -39,10 +40,11 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
* compiler constants for pg_dist_partitions
* ----------------
*/
#define Natts_pg_dist_partition 3
#define Natts_pg_dist_partition 4
#define Anum_pg_dist_partition_logicalrelid 1
#define Anum_pg_dist_partition_partmethod 2
#define Anum_pg_dist_partition_partkey 3
#define Anum_pg_dist_partition_colocationid 4
/* valid values for partmethod include append, hash, and range */
#define DISTRIBUTE_BY_APPEND 'a'

View File

@ -26,6 +26,7 @@ typedef struct ShardIntervalCompareFunctionCacheEntry
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
FmgrInfo *typeCompareFunction);
extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
extern int FindShardIntervalIndex(ShardInterval *shardInterval);
extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
ShardInterval **shardIntervalCache,
int shardCount, char partitionMethod,

View File

@ -0,0 +1,345 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- ===================================================================
-- create test utility function
-- ===================================================================
CREATE SEQUENCE colocation_test_seq
MINVALUE 1
NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same
* DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of
* pg_dist_partition and it does not check anything about pyshical state about shards.
*/
CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass, target_table regclass)
RETURNS BOOL
LANGUAGE plpgsql
AS $colocate_tables$
DECLARE nextid BIGINT;
BEGIN
SELECT nextval('colocation_test_seq') INTO nextid;
UPDATE pg_dist_partition SET colocationId = nextid
WHERE logicalrelid IN
(
(SELECT p1.logicalrelid
FROM pg_dist_partition p1, pg_dist_partition p2
WHERE
p2.logicalrelid = source_table AND
(p1.logicalrelid = source_table OR
(p1.colocationId = p2.colocationId AND p1.colocationId != 0)))
UNION
(SELECT target_table)
);
RETURN TRUE;
END;
$colocate_tables$;
-- ===================================================================
-- create test functions
-- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION tables_colocated(regclass, regclass)
RETURNS bool
AS 'citus'
LANGUAGE C;
CREATE FUNCTION shards_colocated(bigint, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_table_array(regclass)
RETURNS regclass[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_shard_array(bigint)
RETURNS BIGINT[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION find_shard_interval_index(bigint)
RETURNS int
AS 'citus'
LANGUAGE C STRICT;
-- ===================================================================
-- test co-location util functions
-- ===================================================================
-- create distributed table observe shard pruning
CREATE TABLE table1_group1 ( id int );
SELECT master_create_distributed_table('table1_group1', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('table1_group1', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE table2_group1 ( id int );
SELECT master_create_distributed_table('table2_group1', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('table2_group1', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE table3_group2 ( id int );
SELECT master_create_distributed_table('table3_group2', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('table3_group2', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE table4_group2 ( id int );
SELECT master_create_distributed_table('table4_group2', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('table4_group2', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE table5_groupX ( id int );
SELECT master_create_distributed_table('table5_groupX', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('table5_groupX', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE table6_append ( id int );
SELECT master_create_distributed_table('table6_append', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_empty_shard('table6_append');
master_create_empty_shard
---------------------------
1300020
(1 row)
SELECT master_create_empty_shard('table6_append');
master_create_empty_shard
---------------------------
1300021
(1 row)
-- make table1_group1 and table2_group1 co-located manually
SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1');
colocation_test_colocate_tables
---------------------------------
t
(1 row)
-- check co-location id
SELECT get_table_colocation_id('table1_group1');
get_table_colocation_id
-------------------------
1
(1 row)
SELECT get_table_colocation_id('table5_groupX');
get_table_colocation_id
-------------------------
0
(1 row)
SELECT get_table_colocation_id('table6_append');
get_table_colocation_id
-------------------------
0
(1 row)
-- check self table co-location
SELECT tables_colocated('table1_group1', 'table1_group1');
tables_colocated
------------------
t
(1 row)
SELECT tables_colocated('table5_groupX', 'table5_groupX');
tables_colocated
------------------
t
(1 row)
SELECT tables_colocated('table6_append', 'table6_append');
tables_colocated
------------------
t
(1 row)
-- check table co-location with same co-location group
SELECT tables_colocated('table1_group1', 'table2_group1');
tables_colocated
------------------
t
(1 row)
-- check table co-location with different co-location group
SELECT tables_colocated('table1_group1', 'table3_group2');
tables_colocated
------------------
f
(1 row)
-- check table co-location with invalid co-location group
SELECT tables_colocated('table1_group1', 'table5_groupX');
tables_colocated
------------------
f
(1 row)
SELECT tables_colocated('table1_group1', 'table6_append');
tables_colocated
------------------
f
(1 row)
-- check self shard co-location
SELECT shards_colocated(1300000, 1300000);
shards_colocated
------------------
t
(1 row)
SELECT shards_colocated(1300016, 1300016);
shards_colocated
------------------
t
(1 row)
SELECT shards_colocated(1300020, 1300020);
shards_colocated
------------------
t
(1 row)
-- check shard co-location with same co-location group
SELECT shards_colocated(1300000, 1300004);
shards_colocated
------------------
t
(1 row)
-- check shard co-location with same table different co-location group
SELECT shards_colocated(1300000, 1300001);
shards_colocated
------------------
f
(1 row)
-- check shard co-location with different co-location group
SELECT shards_colocated(1300000, 1300005);
shards_colocated
------------------
f
(1 row)
-- check shard co-location with invalid co-location group
SELECT shards_colocated(1300000, 1300016);
shards_colocated
------------------
f
(1 row)
SELECT shards_colocated(1300000, 1300020);
shards_colocated
------------------
f
(1 row)
-- check co-located table list
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass;
unnest
---------------
table2_group1
table1_group1
(2 rows)
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass;
unnest
---------------
table5_groupx
(1 row)
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass;
unnest
---------------
table6_append
(1 row)
-- check co-located shard list
SELECT get_colocated_shard_array(1300000);
get_colocated_shard_array
---------------------------
{1300004,1300000}
(1 row)
SELECT get_colocated_shard_array(1300016);
get_colocated_shard_array
---------------------------
{1300016}
(1 row)
SELECT get_colocated_shard_array(1300020);
get_colocated_shard_array
---------------------------
{1300020}
(1 row)
-- check FindShardIntervalIndex function
SELECT find_shard_interval_index(1300000);
find_shard_interval_index
---------------------------
0
(1 row)
SELECT find_shard_interval_index(1300001);
find_shard_interval_index
---------------------------
1
(1 row)
SELECT find_shard_interval_index(1300002);
find_shard_interval_index
---------------------------
2
(1 row)
SELECT find_shard_interval_index(1300003);
find_shard_interval_index
---------------------------
3
(1 row)
SELECT find_shard_interval_index(1300016);
find_shard_interval_index
---------------------------
0
(1 row)

View File

@ -23,6 +23,9 @@ ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
ALTER EXTENSION citus UPDATE TO '5.2-1';
ALTER EXTENSION citus UPDATE TO '5.2-2';
ALTER EXTENSION citus UPDATE TO '5.2-3';
ALTER EXTENSION citus UPDATE TO '5.2-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -1797,7 +1797,7 @@ DROP MATERIALIZED VIEW mv_articles_hash;
DEBUG: drop auto-cascades to type mv_articles_hash
DEBUG: drop auto-cascades to type mv_articles_hash[]
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
DEBUG: EventTriggerInvoke 16733
DEBUG: EventTriggerInvoke 16761
CREATE MATERIALIZED VIEW mv_articles_hash_error AS
SELECT * FROM articles_hash WHERE author_id in (1,2);
NOTICE: cannot use shard pruning with ANY/ALL (array expression)

View File

@ -48,8 +48,8 @@ SELECT 1 FROM master_create_empty_shard('testtableddl');
DROP TABLE testtableddl;
-- ensure no metadata of distributed tables are remaining
SELECT * FROM pg_dist_partition;
logicalrelid | partmethod | partkey
--------------+------------+---------
logicalrelid | partmethod | partkey | colocationid
--------------+------------+---------+--------------
(0 rows)
SELECT * FROM pg_dist_shard;

View File

@ -165,7 +165,13 @@ test: multi_function_evaluation
# multi_truncate tests truncate functionality for distributed tables
# ----------
test: multi_truncate
# ----------
# multi_expire_table_cache tests for broadcast tables
# ----------
test: multi_expire_table_cache
# ----------
# multi_colocation_utils tests utility functions written for co-location feature & internal API
# ----------
test: multi_colocation_utils

View File

@ -0,0 +1,161 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- ===================================================================
-- create test utility function
-- ===================================================================
CREATE SEQUENCE colocation_test_seq
MINVALUE 1
NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same
* DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of
* pg_dist_partition and it does not check anything about pyshical state about shards.
*/
CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass, target_table regclass)
RETURNS BOOL
LANGUAGE plpgsql
AS $colocate_tables$
DECLARE nextid BIGINT;
BEGIN
SELECT nextval('colocation_test_seq') INTO nextid;
UPDATE pg_dist_partition SET colocationId = nextid
WHERE logicalrelid IN
(
(SELECT p1.logicalrelid
FROM pg_dist_partition p1, pg_dist_partition p2
WHERE
p2.logicalrelid = source_table AND
(p1.logicalrelid = source_table OR
(p1.colocationId = p2.colocationId AND p1.colocationId != 0)))
UNION
(SELECT target_table)
);
RETURN TRUE;
END;
$colocate_tables$;
-- ===================================================================
-- create test functions
-- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION tables_colocated(regclass, regclass)
RETURNS bool
AS 'citus'
LANGUAGE C;
CREATE FUNCTION shards_colocated(bigint, bigint)
RETURNS bool
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_table_array(regclass)
RETURNS regclass[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION get_colocated_shard_array(bigint)
RETURNS BIGINT[]
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION find_shard_interval_index(bigint)
RETURNS int
AS 'citus'
LANGUAGE C STRICT;
-- ===================================================================
-- test co-location util functions
-- ===================================================================
-- create distributed table observe shard pruning
CREATE TABLE table1_group1 ( id int );
SELECT master_create_distributed_table('table1_group1', 'id', 'hash');
SELECT master_create_worker_shards('table1_group1', 4, 1);
CREATE TABLE table2_group1 ( id int );
SELECT master_create_distributed_table('table2_group1', 'id', 'hash');
SELECT master_create_worker_shards('table2_group1', 4, 1);
CREATE TABLE table3_group2 ( id int );
SELECT master_create_distributed_table('table3_group2', 'id', 'hash');
SELECT master_create_worker_shards('table3_group2', 4, 1);
CREATE TABLE table4_group2 ( id int );
SELECT master_create_distributed_table('table4_group2', 'id', 'hash');
SELECT master_create_worker_shards('table4_group2', 4, 1);
CREATE TABLE table5_groupX ( id int );
SELECT master_create_distributed_table('table5_groupX', 'id', 'hash');
SELECT master_create_worker_shards('table5_groupX', 4, 1);
CREATE TABLE table6_append ( id int );
SELECT master_create_distributed_table('table6_append', 'id', 'append');
SELECT master_create_empty_shard('table6_append');
SELECT master_create_empty_shard('table6_append');
-- make table1_group1 and table2_group1 co-located manually
SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1');
-- check co-location id
SELECT get_table_colocation_id('table1_group1');
SELECT get_table_colocation_id('table5_groupX');
SELECT get_table_colocation_id('table6_append');
-- check self table co-location
SELECT tables_colocated('table1_group1', 'table1_group1');
SELECT tables_colocated('table5_groupX', 'table5_groupX');
SELECT tables_colocated('table6_append', 'table6_append');
-- check table co-location with same co-location group
SELECT tables_colocated('table1_group1', 'table2_group1');
-- check table co-location with different co-location group
SELECT tables_colocated('table1_group1', 'table3_group2');
-- check table co-location with invalid co-location group
SELECT tables_colocated('table1_group1', 'table5_groupX');
SELECT tables_colocated('table1_group1', 'table6_append');
-- check self shard co-location
SELECT shards_colocated(1300000, 1300000);
SELECT shards_colocated(1300016, 1300016);
SELECT shards_colocated(1300020, 1300020);
-- check shard co-location with same co-location group
SELECT shards_colocated(1300000, 1300004);
-- check shard co-location with same table different co-location group
SELECT shards_colocated(1300000, 1300001);
-- check shard co-location with different co-location group
SELECT shards_colocated(1300000, 1300005);
-- check shard co-location with invalid co-location group
SELECT shards_colocated(1300000, 1300016);
SELECT shards_colocated(1300000, 1300020);
-- check co-located table list
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass;
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass;
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass;
-- check co-located shard list
SELECT get_colocated_shard_array(1300000);
SELECT get_colocated_shard_array(1300016);
SELECT get_colocated_shard_array(1300020);
-- check FindShardIntervalIndex function
SELECT find_shard_interval_index(1300000);
SELECT find_shard_interval_index(1300001);
SELECT find_shard_interval_index(1300002);
SELECT find_shard_interval_index(1300003);
SELECT find_shard_interval_index(1300016);

View File

@ -28,6 +28,9 @@ ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8';
ALTER EXTENSION citus UPDATE TO '5.2-1';
ALTER EXTENSION citus UPDATE TO '5.2-2';
ALTER EXTENSION citus UPDATE TO '5.2-3';
ALTER EXTENSION citus UPDATE TO '5.2-4';
-- drop extension an re-create in newest version
DROP EXTENSION citus;