diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 8db6ed6ce..070ea09ec 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -7,7 +7,7 @@ MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ - 5.2-1 5.2-2 5.2-3 + 5.2-1 5.2-2 5.2-3 5.2-4 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -55,6 +55,8 @@ $(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql cat $^ > $@ $(EXTENSION)--5.2-3.sql: $(EXTENSION)--5.2-2.sql $(EXTENSION)--5.2-2--5.2-3.sql cat $^ > $@ +$(EXTENSION)--5.2-4.sql: $(EXTENSION)--5.2-3.sql $(EXTENSION)--5.2-3--5.2-4.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.2-3--5.2-4.sql b/src/backend/distributed/citus--5.2-3--5.2-4.sql new file mode 100644 index 000000000..4f366b74b --- /dev/null +++ b/src/backend/distributed/citus--5.2-3--5.2-4.sql @@ -0,0 +1,7 @@ +/* citus--5.2-3--5.2-4.sql */ + +ALTER TABLE pg_dist_partition ADD COLUMN colocationid BIGINT DEFAULT 0 NOT NULL; + +CREATE INDEX pg_dist_partition_colocationid_index +ON pg_dist_partition using btree(colocationid); + diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ddb8ccdfe..eecbc6d01 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.2-3' +default_version = '5.2-4' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index e3e641af8..4e3d91097 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -27,6 +27,7 @@ #include "commands/defrem.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/colocation_utils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/pg_dist_partition.h" @@ -287,6 +288,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_partkey - 1] = CStringGetTextDatum(distributionKeyString); + newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID; newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); diff --git a/src/backend/distributed/test/colocation_utils.c b/src/backend/distributed/test/colocation_utils.c new file mode 100644 index 000000000..235fcb6e3 --- /dev/null +++ b/src/backend/distributed/test/colocation_utils.c @@ -0,0 +1,159 @@ +/*------------------------------------------------------------------------- + * + * test/src/colocations_utils.c + * + * This file contains functions to test co-location functionality + * within Citus. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "catalog/pg_type.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */ + + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(get_table_colocation_id); +PG_FUNCTION_INFO_V1(tables_colocated); +PG_FUNCTION_INFO_V1(shards_colocated); +PG_FUNCTION_INFO_V1(get_colocated_table_array); +PG_FUNCTION_INFO_V1(get_colocated_shard_array); +PG_FUNCTION_INFO_V1(find_shard_interval_index); + + +/* + * get_table_colocation_id returns colocation id of given distributed table. + */ +Datum +get_table_colocation_id(PG_FUNCTION_ARGS) +{ + Oid distributedTableId = PG_GETARG_OID(0); + int colocationId = TableColocationId(distributedTableId); + + PG_RETURN_INT32(colocationId); +} + + +/* + * tables_colocated checks if given two tables are co-located or not. If they are + * co-located, this function returns true. + */ +Datum +tables_colocated(PG_FUNCTION_ARGS) +{ + Oid leftDistributedTableId = PG_GETARG_OID(0); + Oid rightDistributedTableId = PG_GETARG_OID(1); + bool tablesColocated = TablesColocated(leftDistributedTableId, + rightDistributedTableId); + + PG_RETURN_BOOL(tablesColocated); +} + + +/* + * shards_colocated checks if given two shards are co-located or not. If they are + * co-located, this function returns true. + */ +Datum +shards_colocated(PG_FUNCTION_ARGS) +{ + uint32 leftShardId = PG_GETARG_UINT32(0); + uint32 rightShardId = PG_GETARG_UINT32(1); + ShardInterval *leftShard = LoadShardInterval(leftShardId); + ShardInterval *rightShard = LoadShardInterval(rightShardId); + + bool shardsColocated = ShardsColocated(leftShard, rightShard); + + PG_RETURN_BOOL(shardsColocated); +} + + +/* + * get_colocated_tables_array returns array of table oids which are co-located with given + * distributed table. + */ +Datum +get_colocated_table_array(PG_FUNCTION_ARGS) +{ + Oid distributedTableId = PG_GETARG_OID(0); + + ArrayType *colocatedTablesArrayType = NULL; + List *colocatedTableList = ColocatedTableList(distributedTableId); + ListCell *colocatedTableCell = NULL; + int colocatedTableCount = list_length(colocatedTableList); + Datum *colocatedTablesDatumArray = palloc0(colocatedTableCount * sizeof(Datum)); + Oid arrayTypeId = OIDOID; + int colocatedTableIndex = 0; + + foreach(colocatedTableCell, colocatedTableList) + { + Oid colocatedTableId = lfirst_oid(colocatedTableCell); + Datum colocatedTableDatum = ObjectIdGetDatum(colocatedTableId); + + colocatedTablesDatumArray[colocatedTableIndex] = colocatedTableDatum; + colocatedTableIndex++; + } + + colocatedTablesArrayType = DatumArrayToArrayType(colocatedTablesDatumArray, + colocatedTableCount, arrayTypeId); + + PG_RETURN_ARRAYTYPE_P(colocatedTablesArrayType); +} + + +/* + * get_colocated_shards_array returns array of shards ids which are co-located with given + * shard. + */ +Datum +get_colocated_shard_array(PG_FUNCTION_ARGS) +{ + uint32 shardId = PG_GETARG_UINT32(0); + ShardInterval *shardInterval = LoadShardInterval(shardId); + + ArrayType *colocatedShardsArrayType = NULL; + List *colocatedShardList = ColocatedShardPlacementList(shardInterval); + ListCell *colocatedShardCell = NULL; + int colocatedShardCount = list_length(colocatedShardList); + Datum *colocatedShardsDatumArray = palloc0(colocatedShardCount * sizeof(Datum)); + Oid arrayTypeId = OIDOID; + int colocatedShardIndex = 0; + + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShardInterval = (ShardInterval *) lfirst( + colocatedShardCell); + uint64 colocatedShardId = colocatedShardInterval->shardId; + + Datum colocatedShardDatum = Int64GetDatum(colocatedShardId); + + colocatedShardsDatumArray[colocatedShardIndex] = colocatedShardDatum; + colocatedShardIndex++; + } + + colocatedShardsArrayType = DatumArrayToArrayType(colocatedShardsDatumArray, + colocatedShardCount, arrayTypeId); + + PG_RETURN_ARRAYTYPE_P(colocatedShardsArrayType); +} + + +/* + * find_shard_interval_index finds index of given shard in sorted shard interval list. + */ +Datum +find_shard_interval_index(PG_FUNCTION_ARGS) +{ + uint32 shardId = PG_GETARG_UINT32(0); + ShardInterval *shardInterval = LoadShardInterval(shardId); + uint32 shardIndex = FindShardIntervalIndex(shardInterval); + + PG_RETURN_INT32(shardIndex); +} diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c new file mode 100644 index 000000000..6fa822a17 --- /dev/null +++ b/src/backend/distributed/utils/colocation_utils.c @@ -0,0 +1,221 @@ +/*------------------------------------------------------------------------- + * + * colocation_utils.c + * + * This file contains functions to perform useful operations on co-located tables. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "distributed/colocation_utils.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/shardinterval_utils.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" + + +/* + * TableColocationId function returns co-location id of given table. This function errors + * out if given table is not distributed. + */ +uint64 +TableColocationId(Oid distributedTableId) +{ + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + + return cacheEntry->colocationId; +} + + +/* + * TablesColocated function checks whether given two tables are co-located and + * returns true if they are co-located. A table is always co-located with itself. + * If given two tables are different and they are not distributed, this function + * errors out. + */ +bool +TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId) +{ + uint64 leftColocationId = INVALID_COLOCATION_ID; + uint64 rightColocationId = INVALID_COLOCATION_ID; + + if (leftDistributedTableId == rightDistributedTableId) + { + return true; + } + + leftColocationId = TableColocationId(leftDistributedTableId); + rightColocationId = TableColocationId(rightDistributedTableId); + if (leftColocationId == INVALID_COLOCATION_ID || + rightColocationId == INVALID_COLOCATION_ID) + { + return false; + } + + return leftColocationId == rightColocationId; +} + + +/* + * ShardsColocated function checks whether given two shards are co-located and + * returns true if they are co-located. Two shards are co-located either; + * - They are same (A shard is always co-located with itself). + * OR + * - Tables are hash partitioned. + * - Tables containing the shards are co-located. + * - Min/Max values of the shards are same. + */ +bool +ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval) +{ + bool tablesColocated = TablesColocated(leftShardInterval->relationId, + rightShardInterval->relationId); + + if (tablesColocated) + { + /* + * We do min/max value check here to decide whether two shards are co=located, + * instead we can simply use FindShardIntervalIndex function on both shards then + * but do index check, but we avoid it because this way it is more cheaper. + * + * Having co-located tables implies that tables are partitioned by hash partition + * therefore it is safe to use DatumGetInt32 here. + */ + int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue); + int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue); + int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue); + int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue); + + bool minValuesEqual = leftShardMinValue == rightShardMinValue; + bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue; + + return minValuesEqual && maxValuesEqual; + } + + return false; +} + + +/* + * ColocatedTableList function returns list of relation ids which are co-located + * with given table. If given table is not hash distributed, co-location is not + * valid for that table and it is only co-located with itself. + */ +List * +ColocatedTableList(Oid distributedTableId) +{ + int tableColocationId = TableColocationId(distributedTableId); + List *colocatedTableList = NIL; + + Relation pgDistPartition = NULL; + TupleDesc tupleDescriptor = NULL; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + bool indexOK = true; + int scanKeyCount = 1; + ScanKeyData scanKey[1]; + + /* + * If distribution type of the table is not hash, the table is only co-located + * with itself. + */ + if (tableColocationId == INVALID_COLOCATION_ID) + { + colocatedTableList = lappend_oid(colocatedTableList, distributedTableId); + return colocatedTableList; + } + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, + BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId)); + + pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); + tupleDescriptor = RelationGetDescr(pgDistPartition); + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionColocationidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + bool isNull = false; + Oid colocatedTableId = heap_getattr(heapTuple, + Anum_pg_dist_partition_logicalrelid, + tupleDescriptor, &isNull); + + colocatedTableList = lappend_oid(colocatedTableList, colocatedTableId); + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, AccessShareLock); + + return colocatedTableList; +} + + +/* + * ColocatedShardPlacementList function returns list of shard intervals which are + * co-located with given shard. If given shard is belong to append or range distributed + * table, co-location is not valid for that shard. Therefore such shard is only co-located + * with itself. + */ +List * +ColocatedShardPlacementList(ShardInterval *shardInterval) +{ + Oid distributedTableId = shardInterval->relationId; + List *colocatedShardList = NIL; + int shardIntervalIndex = -1; + List *colocatedTableList = NIL; + ListCell *colocatedTableCell = NULL; + + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + + /* + * If distribution type of the table is not hash, each shard of the table is only + * co-located with itself. + */ + if (partitionMethod != DISTRIBUTE_BY_HASH) + { + colocatedShardList = lappend(colocatedShardList, shardInterval); + return colocatedShardList; + } + + shardIntervalIndex = FindShardIntervalIndex(shardInterval); + colocatedTableList = ColocatedTableList(distributedTableId); + + /* FindShardIntervalIndex have to find index of given shard */ + Assert(shardIntervalIndex >= 0); + + foreach(colocatedTableCell, colocatedTableList) + { + Oid colocatedTableId = lfirst_oid(colocatedTableCell); + DistTableCacheEntry *colocatedTableCacheEntry = + DistributedTableCacheEntry(colocatedTableId); + ShardInterval *colocatedShardInterval = NULL; + + /* + * Since we iterate over co-located tables, shard count of each table should be + * same and greater than shardIntervalIndex. + */ + Assert(cacheEntry->shardIntervalArrayLength == + colocatedTableCacheEntry->shardIntervalArrayLength); + + colocatedShardInterval = + colocatedTableCacheEntry->sortedShardIntervalArray[shardIntervalIndex]; + + colocatedShardList = lappend(colocatedShardList, colocatedShardInterval); + } + + Assert(list_length(colocatedTableList) == list_length(colocatedShardList)); + + return colocatedShardList; +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 08f27f4dc..06d27fe01 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -23,6 +23,7 @@ #include "catalog/pg_type.h" #include "commands/extension.h" #include "commands/trigger.h" +#include "distributed/colocation_utils.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/pg_dist_partition.h" @@ -50,6 +51,7 @@ static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; +static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid; static Oid distShardShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid; @@ -218,6 +220,7 @@ LookupDistTableCacheEntry(Oid relationId) HeapTuple distPartitionTuple = NULL; char *partitionKeyString = NULL; char partitionMethod = 0; + uint64 colocationId = INVALID_COLOCATION_ID; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; ShardInterval **shardIntervalArray = NULL; @@ -256,14 +259,23 @@ LookupDistTableCacheEntry(Oid relationId) (Form_pg_dist_partition) GETSTRUCT(distPartitionTuple); Datum partitionKeyDatum = 0; MemoryContext oldContext = NULL; + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); bool isNull = false; partitionKeyDatum = heap_getattr(distPartitionTuple, Anum_pg_dist_partition_partkey, - RelationGetDescr(pgDistPartition), + tupleDescriptor, &isNull); Assert(!isNull); + colocationId = heap_getattr(distPartitionTuple, + Anum_pg_dist_partition_colocationid, tupleDescriptor, + &isNull); + if (isNull) + { + colocationId = INVALID_COLOCATION_ID; + } + oldContext = MemoryContextSwitchTo(CacheMemoryContext); partitionKeyString = TextDatumGetCString(partitionKeyDatum); partitionMethod = partitionForm->partmethod; @@ -378,6 +390,7 @@ LookupDistTableCacheEntry(Oid relationId) cacheEntry->isDistributedTable = true; cacheEntry->partitionKeyString = partitionKeyString; cacheEntry->partitionMethod = partitionMethod; + cacheEntry->colocationId = colocationId; cacheEntry->shardIntervalArrayLength = shardIntervalArrayLength; cacheEntry->sortedShardIntervalArray = sortedShardIntervalArray; cacheEntry->shardIntervalCompareFunction = shardIntervalCompareFunction; @@ -614,6 +627,17 @@ DistPartitionLogicalRelidIndexId(void) } +/* return oid of pg_dist_partition_colocationid_index index */ +Oid +DistPartitionColocationidIndexId(void) +{ + CachedRelationLookup("pg_dist_partition_colocationid_index", + &distPartitionColocationidIndexId); + + return distPartitionColocationidIndexId; +} + + /* return oid of pg_dist_shard_logical_relid_index index */ Oid DistShardLogicalRelidIndexId(void) @@ -1013,6 +1037,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardPlacementRelationId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; + distPartitionColocationidIndexId = InvalidOid; distShardLogicalRelidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index fe0d4899f..1bf104c43 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -14,6 +14,7 @@ #include "catalog/pg_am.h" #include "catalog/pg_collation.h" #include "catalog/pg_type.h" +#include "distributed/metadata_cache.h" #include "distributed/shardinterval_utils.h" #include "distributed/pg_dist_partition.h" #include "distributed/worker_protocol.h" @@ -106,6 +107,56 @@ CompareShardIntervalsById(const void *leftElement, const void *rightElement) } +/* + * FindShardIntervalIndex finds index of given shard in sorted shard interval array. For + * this purpose, it calculates hash value of a number in its range(e.g. min value) and + * finds which shard should contain the hashed value. Therefore this function only works + * for hash distributed tables. + */ +int +FindShardIntervalIndex(ShardInterval *shardInterval) +{ + Oid distributedTableId = shardInterval->relationId; + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); + char partitionMethod = cacheEntry->partitionMethod; + int shardCount = 0; + int32 shardMinValue = 0; + uint64 hashTokenIncrement = 0; + int shardIndex = -1; + + /* + * We can support it for other types of partitioned tables with simple binary scan + * but it is not necessary at the moment. If we need that simply check algorithm in + * FindShardInterval and SearchCachedShardInterval. + */ + if (partitionMethod != DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("finding index of given shard is not supported for " + "non-hash partitioned tables"))); + } + + shardCount = cacheEntry->shardIntervalArrayLength; + shardMinValue = DatumGetInt32(shardInterval->minValue); + hashTokenIncrement = HASH_TOKEN_COUNT / shardCount; + shardIndex = (uint32) (shardMinValue - INT32_MIN) / hashTokenIncrement; + + Assert(shardIndex <= shardCount); + + /* + * If the shard count is not power of 2, the range of the last + * shard becomes larger than others. For that extra piece of range, + * we still need to use the last shard. + */ + if (shardIndex == shardCount) + { + shardIndex = shardCount - 1; + } + + return shardIndex; +} + + /* * FindShardInterval finds a single shard interval in the cache for the * given partition column value. diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h new file mode 100644 index 000000000..9612d5f1a --- /dev/null +++ b/src/include/distributed/colocation_utils.h @@ -0,0 +1,27 @@ +/*------------------------------------------------------------------------- + * + * colocation_utils.h + * + * Declarations for public utility functions related to co-located tables. + * + * Copyright (c) 2014-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef COLOCATION_UTILS_H_ +#define COLOCATION_UTILS_H_ + +#include "distributed/shardinterval_utils.h" +#include "nodes/pg_list.h" + +#define INVALID_COLOCATION_ID 0 + +extern uint64 TableColocationId(Oid distributedTableId); +extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); +extern bool ShardsColocated(ShardInterval *leftShardInterval, + ShardInterval *rightShardInterval); +extern List * ColocatedTableList(Oid distributedTableId); +extern List * ColocatedShardPlacementList(ShardInterval *shardInterval); + +#endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index d66268a98..7986fb9f1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -38,6 +38,7 @@ typedef struct /* pg_dist_partition metadata for this table */ char *partitionKeyString; char partitionMethod; + uint64 colocationId; /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ int shardIntervalArrayLength; @@ -62,6 +63,7 @@ extern Oid DistShardPlacementRelationId(void); /* index oids */ extern Oid DistPartitionLogicalRelidIndexId(void); +extern Oid DistPartitionColocationidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void); diff --git a/src/include/distributed/pg_dist_partition.h b/src/include/distributed/pg_dist_partition.h index 56799ac20..20f526702 100644 --- a/src/include/distributed/pg_dist_partition.h +++ b/src/include/distributed/pg_dist_partition.h @@ -21,11 +21,12 @@ */ typedef struct FormData_pg_dist_partition { - Oid logicalrelid; /* logical relation id; references pg_class oid */ - char partmethod; /* partition method; see codes below */ -#ifdef CATALOG_VARLEN /* variable-length fields start here */ - text partkey; /* partition key expression */ + Oid logicalrelid; /* logical relation id; references pg_class oid */ + char partmethod; /* partition method; see codes below */ +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text partkey; /* partition key expression */ #endif + uint64 colocationid; /* id of the co-location group of particular table belongs to */ } FormData_pg_dist_partition; /* ---------------- @@ -39,10 +40,11 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition; * compiler constants for pg_dist_partitions * ---------------- */ -#define Natts_pg_dist_partition 3 +#define Natts_pg_dist_partition 4 #define Anum_pg_dist_partition_logicalrelid 1 #define Anum_pg_dist_partition_partmethod 2 #define Anum_pg_dist_partition_partkey 3 +#define Anum_pg_dist_partition_colocationid 4 /* valid values for partmethod include append, hash, and range */ #define DISTRIBUTE_BY_APPEND 'a' diff --git a/src/include/distributed/shardinterval_utils.h b/src/include/distributed/shardinterval_utils.h index 1c745f014..3df8cf5e8 100644 --- a/src/include/distributed/shardinterval_utils.h +++ b/src/include/distributed/shardinterval_utils.h @@ -26,6 +26,7 @@ typedef struct ShardIntervalCompareFunctionCacheEntry extern int CompareShardIntervals(const void *leftElement, const void *rightElement, FmgrInfo *typeCompareFunction); extern int CompareShardIntervalsById(const void *leftElement, const void *rightElement); +extern int FindShardIntervalIndex(ShardInterval *shardInterval); extern ShardInterval * FindShardInterval(Datum partitionColumnValue, ShardInterval **shardIntervalCache, int shardCount, char partitionMethod, diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out new file mode 100644 index 000000000..33493d68d --- /dev/null +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -0,0 +1,345 @@ +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; +-- =================================================================== +-- create test utility function +-- =================================================================== +CREATE SEQUENCE colocation_test_seq + MINVALUE 1 + NO CYCLE; +/* a very simple UDF that only sets the colocation ids the same + * DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of + * pg_dist_partition and it does not check anything about pyshical state about shards. + */ +CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass, target_table regclass) + RETURNS BOOL + LANGUAGE plpgsql + AS $colocate_tables$ +DECLARE nextid BIGINT; +BEGIN + SELECT nextval('colocation_test_seq') INTO nextid; + + UPDATE pg_dist_partition SET colocationId = nextid + WHERE logicalrelid IN + ( + (SELECT p1.logicalrelid + FROM pg_dist_partition p1, pg_dist_partition p2 + WHERE + p2.logicalrelid = source_table AND + (p1.logicalrelid = source_table OR + (p1.colocationId = p2.colocationId AND p1.colocationId != 0))) + UNION + (SELECT target_table) + ); + RETURN TRUE; +END; +$colocate_tables$; +-- =================================================================== +-- create test functions +-- =================================================================== +CREATE FUNCTION get_table_colocation_id(regclass) + RETURNS BIGINT + AS 'citus' + LANGUAGE C STRICT; +CREATE FUNCTION tables_colocated(regclass, regclass) + RETURNS bool + AS 'citus' + LANGUAGE C; +CREATE FUNCTION shards_colocated(bigint, bigint) + RETURNS bool + AS 'citus' + LANGUAGE C STRICT; +CREATE FUNCTION get_colocated_table_array(regclass) + RETURNS regclass[] + AS 'citus' + LANGUAGE C STRICT; +CREATE FUNCTION get_colocated_shard_array(bigint) + RETURNS BIGINT[] + AS 'citus' + LANGUAGE C STRICT; +CREATE FUNCTION find_shard_interval_index(bigint) + RETURNS int + AS 'citus' + LANGUAGE C STRICT; +-- =================================================================== +-- test co-location util functions +-- =================================================================== +-- create distributed table observe shard pruning +CREATE TABLE table1_group1 ( id int ); +SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('table1_group1', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE table2_group1 ( id int ); +SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('table2_group1', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE table3_group2 ( id int ); +SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('table3_group2', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE table4_group2 ( id int ); +SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('table4_group2', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE table5_groupX ( id int ); +SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('table5_groupX', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE table6_append ( id int ); +SELECT master_create_distributed_table('table6_append', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_empty_shard('table6_append'); + master_create_empty_shard +--------------------------- + 1300020 +(1 row) + +SELECT master_create_empty_shard('table6_append'); + master_create_empty_shard +--------------------------- + 1300021 +(1 row) + +-- make table1_group1 and table2_group1 co-located manually +SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1'); + colocation_test_colocate_tables +--------------------------------- + t +(1 row) + +-- check co-location id +SELECT get_table_colocation_id('table1_group1'); + get_table_colocation_id +------------------------- + 1 +(1 row) + +SELECT get_table_colocation_id('table5_groupX'); + get_table_colocation_id +------------------------- + 0 +(1 row) + +SELECT get_table_colocation_id('table6_append'); + get_table_colocation_id +------------------------- + 0 +(1 row) + +-- check self table co-location +SELECT tables_colocated('table1_group1', 'table1_group1'); + tables_colocated +------------------ + t +(1 row) + +SELECT tables_colocated('table5_groupX', 'table5_groupX'); + tables_colocated +------------------ + t +(1 row) + +SELECT tables_colocated('table6_append', 'table6_append'); + tables_colocated +------------------ + t +(1 row) + +-- check table co-location with same co-location group +SELECT tables_colocated('table1_group1', 'table2_group1'); + tables_colocated +------------------ + t +(1 row) + +-- check table co-location with different co-location group +SELECT tables_colocated('table1_group1', 'table3_group2'); + tables_colocated +------------------ + f +(1 row) + +-- check table co-location with invalid co-location group +SELECT tables_colocated('table1_group1', 'table5_groupX'); + tables_colocated +------------------ + f +(1 row) + +SELECT tables_colocated('table1_group1', 'table6_append'); + tables_colocated +------------------ + f +(1 row) + +-- check self shard co-location +SELECT shards_colocated(1300000, 1300000); + shards_colocated +------------------ + t +(1 row) + +SELECT shards_colocated(1300016, 1300016); + shards_colocated +------------------ + t +(1 row) + +SELECT shards_colocated(1300020, 1300020); + shards_colocated +------------------ + t +(1 row) + +-- check shard co-location with same co-location group +SELECT shards_colocated(1300000, 1300004); + shards_colocated +------------------ + t +(1 row) + +-- check shard co-location with same table different co-location group +SELECT shards_colocated(1300000, 1300001); + shards_colocated +------------------ + f +(1 row) + +-- check shard co-location with different co-location group +SELECT shards_colocated(1300000, 1300005); + shards_colocated +------------------ + f +(1 row) + +-- check shard co-location with invalid co-location group +SELECT shards_colocated(1300000, 1300016); + shards_colocated +------------------ + f +(1 row) + +SELECT shards_colocated(1300000, 1300020); + shards_colocated +------------------ + f +(1 row) + +-- check co-located table list +SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass; + unnest +--------------- + table2_group1 + table1_group1 +(2 rows) + +SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass; + unnest +--------------- + table5_groupx +(1 row) + +SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass; + unnest +--------------- + table6_append +(1 row) + +-- check co-located shard list +SELECT get_colocated_shard_array(1300000); + get_colocated_shard_array +--------------------------- + {1300004,1300000} +(1 row) + +SELECT get_colocated_shard_array(1300016); + get_colocated_shard_array +--------------------------- + {1300016} +(1 row) + +SELECT get_colocated_shard_array(1300020); + get_colocated_shard_array +--------------------------- + {1300020} +(1 row) + +-- check FindShardIntervalIndex function +SELECT find_shard_interval_index(1300000); + find_shard_interval_index +--------------------------- + 0 +(1 row) + +SELECT find_shard_interval_index(1300001); + find_shard_interval_index +--------------------------- + 1 +(1 row) + +SELECT find_shard_interval_index(1300002); + find_shard_interval_index +--------------------------- + 2 +(1 row) + +SELECT find_shard_interval_index(1300003); + find_shard_interval_index +--------------------------- + 3 +(1 row) + +SELECT find_shard_interval_index(1300016); + find_shard_interval_index +--------------------------- + 0 +(1 row) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5a583be41..17c35dac6 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -23,6 +23,9 @@ ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.2-1'; +ALTER EXTENSION citus UPDATE TO '5.2-2'; +ALTER EXTENSION citus UPDATE TO '5.2-3'; +ALTER EXTENSION citus UPDATE TO '5.2-4'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index a97163883..4322f40c8 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1797,7 +1797,7 @@ DROP MATERIALIZED VIEW mv_articles_hash; DEBUG: drop auto-cascades to type mv_articles_hash DEBUG: drop auto-cascades to type mv_articles_hash[] DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash -DEBUG: EventTriggerInvoke 16733 +DEBUG: EventTriggerInvoke 16761 CREATE MATERIALIZED VIEW mv_articles_hash_error AS SELECT * FROM articles_hash WHERE author_id in (1,2); NOTICE: cannot use shard pruning with ANY/ALL (array expression) diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 59f398bb4..afa5dd065 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -48,8 +48,8 @@ SELECT 1 FROM master_create_empty_shard('testtableddl'); DROP TABLE testtableddl; -- ensure no metadata of distributed tables are remaining SELECT * FROM pg_dist_partition; - logicalrelid | partmethod | partkey ---------------+------------+--------- + logicalrelid | partmethod | partkey | colocationid +--------------+------------+---------+-------------- (0 rows) SELECT * FROM pg_dist_shard; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index c21b5e5ac..043b75f1c 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -165,7 +165,13 @@ test: multi_function_evaluation # multi_truncate tests truncate functionality for distributed tables # ---------- test: multi_truncate + # ---------- # multi_expire_table_cache tests for broadcast tables # ---------- test: multi_expire_table_cache + +# ---------- +# multi_colocation_utils tests utility functions written for co-location feature & internal API +# ---------- +test: multi_colocation_utils diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql new file mode 100644 index 000000000..5d0afe27b --- /dev/null +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -0,0 +1,161 @@ + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; + +-- =================================================================== +-- create test utility function +-- =================================================================== + +CREATE SEQUENCE colocation_test_seq + MINVALUE 1 + NO CYCLE; + +/* a very simple UDF that only sets the colocation ids the same + * DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of + * pg_dist_partition and it does not check anything about pyshical state about shards. + */ +CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass, target_table regclass) + RETURNS BOOL + LANGUAGE plpgsql + AS $colocate_tables$ +DECLARE nextid BIGINT; +BEGIN + SELECT nextval('colocation_test_seq') INTO nextid; + + UPDATE pg_dist_partition SET colocationId = nextid + WHERE logicalrelid IN + ( + (SELECT p1.logicalrelid + FROM pg_dist_partition p1, pg_dist_partition p2 + WHERE + p2.logicalrelid = source_table AND + (p1.logicalrelid = source_table OR + (p1.colocationId = p2.colocationId AND p1.colocationId != 0))) + UNION + (SELECT target_table) + ); + RETURN TRUE; +END; +$colocate_tables$; + +-- =================================================================== +-- create test functions +-- =================================================================== + +CREATE FUNCTION get_table_colocation_id(regclass) + RETURNS BIGINT + AS 'citus' + LANGUAGE C STRICT; + +CREATE FUNCTION tables_colocated(regclass, regclass) + RETURNS bool + AS 'citus' + LANGUAGE C; + +CREATE FUNCTION shards_colocated(bigint, bigint) + RETURNS bool + AS 'citus' + LANGUAGE C STRICT; + +CREATE FUNCTION get_colocated_table_array(regclass) + RETURNS regclass[] + AS 'citus' + LANGUAGE C STRICT; + +CREATE FUNCTION get_colocated_shard_array(bigint) + RETURNS BIGINT[] + AS 'citus' + LANGUAGE C STRICT; + +CREATE FUNCTION find_shard_interval_index(bigint) + RETURNS int + AS 'citus' + LANGUAGE C STRICT; + +-- =================================================================== +-- test co-location util functions +-- =================================================================== + +-- create distributed table observe shard pruning +CREATE TABLE table1_group1 ( id int ); +SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); +SELECT master_create_worker_shards('table1_group1', 4, 1); + +CREATE TABLE table2_group1 ( id int ); +SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); +SELECT master_create_worker_shards('table2_group1', 4, 1); + +CREATE TABLE table3_group2 ( id int ); +SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); +SELECT master_create_worker_shards('table3_group2', 4, 1); + +CREATE TABLE table4_group2 ( id int ); +SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); +SELECT master_create_worker_shards('table4_group2', 4, 1); + +CREATE TABLE table5_groupX ( id int ); +SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); +SELECT master_create_worker_shards('table5_groupX', 4, 1); + +CREATE TABLE table6_append ( id int ); +SELECT master_create_distributed_table('table6_append', 'id', 'append'); +SELECT master_create_empty_shard('table6_append'); +SELECT master_create_empty_shard('table6_append'); + +-- make table1_group1 and table2_group1 co-located manually +SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1'); + +-- check co-location id +SELECT get_table_colocation_id('table1_group1'); +SELECT get_table_colocation_id('table5_groupX'); +SELECT get_table_colocation_id('table6_append'); + +-- check self table co-location +SELECT tables_colocated('table1_group1', 'table1_group1'); +SELECT tables_colocated('table5_groupX', 'table5_groupX'); +SELECT tables_colocated('table6_append', 'table6_append'); + +-- check table co-location with same co-location group +SELECT tables_colocated('table1_group1', 'table2_group1'); + +-- check table co-location with different co-location group +SELECT tables_colocated('table1_group1', 'table3_group2'); + +-- check table co-location with invalid co-location group +SELECT tables_colocated('table1_group1', 'table5_groupX'); +SELECT tables_colocated('table1_group1', 'table6_append'); + +-- check self shard co-location +SELECT shards_colocated(1300000, 1300000); +SELECT shards_colocated(1300016, 1300016); +SELECT shards_colocated(1300020, 1300020); + +-- check shard co-location with same co-location group +SELECT shards_colocated(1300000, 1300004); + +-- check shard co-location with same table different co-location group +SELECT shards_colocated(1300000, 1300001); + +-- check shard co-location with different co-location group +SELECT shards_colocated(1300000, 1300005); + +-- check shard co-location with invalid co-location group +SELECT shards_colocated(1300000, 1300016); +SELECT shards_colocated(1300000, 1300020); + +-- check co-located table list +SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass; +SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass; +SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass; + +-- check co-located shard list +SELECT get_colocated_shard_array(1300000); +SELECT get_colocated_shard_array(1300016); +SELECT get_colocated_shard_array(1300020); + +-- check FindShardIntervalIndex function +SELECT find_shard_interval_index(1300000); +SELECT find_shard_interval_index(1300001); +SELECT find_shard_interval_index(1300002); +SELECT find_shard_interval_index(1300003); +SELECT find_shard_interval_index(1300016); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 745cc5f0a..ea19fbb11 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -28,6 +28,9 @@ ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.2-1'; +ALTER EXTENSION citus UPDATE TO '5.2-2'; +ALTER EXTENSION citus UPDATE TO '5.2-3'; +ALTER EXTENSION citus UPDATE TO '5.2-4'; -- drop extension an re-create in newest version DROP EXTENSION citus;