mirror of https://github.com/citusdata/citus.git
Merge pull request #915 from citusdata/add_mark_tables_colocated
Add mark_tables_colocated() to update colocation groupspull/919/head
commit
9969594e10
|
@ -8,7 +8,7 @@ EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -88,6 +88,8 @@ $(EXTENSION)--6.0-14.sql: $(EXTENSION)--6.0-13.sql $(EXTENSION)--6.0-13--6.0-14.
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.0-15.sql: $(EXTENSION)--6.0-14.sql $(EXTENSION)--6.0-14--6.0-15.sql
|
$(EXTENSION)--6.0-15.sql: $(EXTENSION)--6.0-14.sql $(EXTENSION)--6.0-14--6.0-15.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.0-16.sql: $(EXTENSION)--6.0-15.sql $(EXTENSION)--6.0-15--6.0-16.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
/* citus--6.0-15--6.0-16.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE FUNCTION mark_tables_colocated(source_table_name regclass, target_table_names regclass[])
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$mark_tables_colocated$$;
|
||||||
|
COMMENT ON FUNCTION mark_tables_colocated(source_table_name regclass, target_table_names regclass[])
|
||||||
|
IS 'mark target distributed tables as colocated with the source table';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.0-15'
|
default_version = '6.0-16'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -79,8 +79,6 @@ static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
static void CreateTruncateTrigger(Oid relationId);
|
static void CreateTruncateTrigger(Oid relationId);
|
||||||
static uint32 ColocationId(int shardCount, int replicationFactor,
|
static uint32 ColocationId(int shardCount, int replicationFactor,
|
||||||
Oid distributionColumnType);
|
Oid distributionColumnType);
|
||||||
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
|
||||||
Oid distributionColumnType);
|
|
||||||
static uint32 GetNextColocationId(void);
|
static uint32 GetNextColocationId(void);
|
||||||
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
int shardCount, int replicationFactor);
|
int shardCount, int replicationFactor);
|
||||||
|
@ -610,7 +608,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
|
||||||
CharGetDatum(distributionMethod);
|
CharGetDatum(distributionMethod);
|
||||||
newValues[Anum_pg_dist_partition_partkey - 1] =
|
newValues[Anum_pg_dist_partition_partkey - 1] =
|
||||||
CStringGetTextDatum(distributionColumnString);
|
CStringGetTextDatum(distributionColumnString);
|
||||||
newValues[Anum_pg_dist_partition_colocationid - 1] = Int32GetDatum(colocationId);
|
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
|
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
|
||||||
|
|
||||||
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
|
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
|
||||||
|
@ -883,7 +881,7 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
|
||||||
* pg_dist_colocation with the given configuration. It also returns the created
|
* pg_dist_colocation with the given configuration. It also returns the created
|
||||||
* colocation id.
|
* colocation id.
|
||||||
*/
|
*/
|
||||||
static uint32
|
uint32
|
||||||
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
|
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
|
||||||
{
|
{
|
||||||
uint32 colocationId = GetNextColocationId();
|
uint32 colocationId = GetNextColocationId();
|
||||||
|
@ -967,7 +965,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
Relation pgDistColocation = NULL;
|
Relation pgDistColocation = NULL;
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
int distributionColumnType = 0;
|
Oid distributionColumnType = 0;
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
/* get distribution column type */
|
/* get distribution column type */
|
||||||
|
|
|
@ -50,7 +50,6 @@
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void CheckHashPartitionedTable(Oid distributedTableId);
|
|
||||||
static text * IntegerToText(int32 value);
|
static text * IntegerToText(int32 value);
|
||||||
|
|
||||||
|
|
||||||
|
@ -326,7 +325,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
|
||||||
* tableId and checks if the table is hash partitioned. If not, the function
|
* tableId and checks if the table is hash partitioned. If not, the function
|
||||||
* throws an error.
|
* throws an error.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
CheckHashPartitionedTable(Oid distributedTableId)
|
CheckHashPartitionedTable(Oid distributedTableId)
|
||||||
{
|
{
|
||||||
char partitionType = PartitionMethod(distributedTableId);
|
char partitionType = PartitionMethod(distributedTableId);
|
||||||
|
|
|
@ -52,6 +52,61 @@ PG_FUNCTION_INFO_V1(master_stage_shard_row);
|
||||||
PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
|
PG_FUNCTION_INFO_V1(master_stage_shard_placement_row);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TableShardReplicationFactor returns the current replication factor of the
|
||||||
|
* given relation by looking into shard placements. It errors out if there
|
||||||
|
* are different number of shard placements for different shards. It also
|
||||||
|
* errors out if the table does not have any shards.
|
||||||
|
*/
|
||||||
|
uint32
|
||||||
|
TableShardReplicationFactor(Oid relationId)
|
||||||
|
{
|
||||||
|
uint32 replicationCount = 0;
|
||||||
|
ListCell *shardCell = NULL;
|
||||||
|
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
foreach(shardCell, shardIntervalList)
|
||||||
|
{
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardCell);
|
||||||
|
uint64 shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
List *shardPlacementList = ShardPlacementList(shardId);
|
||||||
|
uint32 shardPlacementCount = list_length(shardPlacementList);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get the replication count of the first shard in the list, and error
|
||||||
|
* out if there is a shard with different replication count.
|
||||||
|
*/
|
||||||
|
if (replicationCount == 0)
|
||||||
|
{
|
||||||
|
replicationCount = shardPlacementCount;
|
||||||
|
}
|
||||||
|
else if (replicationCount != shardPlacementCount)
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot find the replication factor of the "
|
||||||
|
"table %s", relationName),
|
||||||
|
errdetail("The shard %ld has different shards replication "
|
||||||
|
"counts from other shards.", shardId)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* error out if the table does not have any shards */
|
||||||
|
if (replicationCount == 0)
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot find the replication factor of the "
|
||||||
|
"table %s", relationName),
|
||||||
|
errdetail("The table %s does not have any shards.",
|
||||||
|
relationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return replicationCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LoadShardIntervalList returns a list of shard intervals related for a given
|
* LoadShardIntervalList returns a list of shard intervals related for a given
|
||||||
* distributed table. The function returns an empty list if no shards can be
|
* distributed table. The function returns an empty list if no shards can be
|
||||||
|
|
|
@ -14,14 +14,377 @@
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "catalog/indexing.h"
|
||||||
#include "distributed/colocation_utils.h"
|
#include "distributed/colocation_utils.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* local function forward declarations */
|
||||||
|
static void MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId);
|
||||||
|
static void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
|
||||||
|
static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
|
||||||
|
ShardInterval *rightShardInterval);
|
||||||
|
static int CompareShardPlacementsByNode(const void *leftElement,
|
||||||
|
const void *rightElement);
|
||||||
|
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
|
||||||
|
|
||||||
|
|
||||||
|
/* exports for SQL callable functions */
|
||||||
|
PG_FUNCTION_INFO_V1(mark_tables_colocated);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* mark_tables_colocated puts target tables to same colocation group with the
|
||||||
|
* source table. If the source table is in INVALID_COLOCATION_ID group, then it
|
||||||
|
* creates a new colocation group and assigns all tables to this new colocation
|
||||||
|
* group.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
mark_tables_colocated(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid sourceRelationId = PG_GETARG_OID(0);
|
||||||
|
ArrayType *relationIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
|
||||||
|
Datum *relationIdDatumArray = NULL;
|
||||||
|
int relationIndex = 0;
|
||||||
|
|
||||||
|
int relationCount = ArrayObjectCount(relationIdArrayObject);
|
||||||
|
if (relationCount < 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("at least one target table is required for this "
|
||||||
|
"operation")));
|
||||||
|
}
|
||||||
|
|
||||||
|
relationIdDatumArray = DeconstructArrayObject(relationIdArrayObject);
|
||||||
|
|
||||||
|
for (relationIndex = 0; relationIndex < relationCount; relationIndex++)
|
||||||
|
{
|
||||||
|
Oid nextRelationOid = DatumGetObjectId(relationIdDatumArray[relationIndex]);
|
||||||
|
MarkTablesColocated(sourceRelationId, nextRelationOid);
|
||||||
|
}
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MarkTablesColocated puts both tables to same colocation group. If the
|
||||||
|
* source table is in INVALID_COLOCATION_ID group, then it creates a new
|
||||||
|
* colocation group and assigns both tables to same colocation group. Otherwise,
|
||||||
|
* it adds the target table to colocation group of the source table.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
|
||||||
|
{
|
||||||
|
uint32 sourceColocationId = INVALID_COLOCATION_ID;
|
||||||
|
Relation pgDistColocation = NULL;
|
||||||
|
Var *sourceDistributionColumn = NULL;
|
||||||
|
Var *targetDistributionColumn = NULL;
|
||||||
|
Oid sourceDistributionColumnType = InvalidOid;
|
||||||
|
Oid targetDistributionColumnType = InvalidOid;
|
||||||
|
|
||||||
|
CheckHashPartitionedTable(sourceRelationId);
|
||||||
|
CheckHashPartitionedTable(targetRelationId);
|
||||||
|
|
||||||
|
sourceDistributionColumn = PartitionKey(sourceRelationId);
|
||||||
|
sourceDistributionColumnType = sourceDistributionColumn->vartype;
|
||||||
|
|
||||||
|
targetDistributionColumn = PartitionKey(targetRelationId);
|
||||||
|
targetDistributionColumnType = targetDistributionColumn->vartype;
|
||||||
|
|
||||||
|
if (sourceDistributionColumnType != targetDistributionColumnType)
|
||||||
|
{
|
||||||
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
|
char *targetRelationName = get_rel_name(targetRelationId);
|
||||||
|
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
sourceRelationName, targetRelationName),
|
||||||
|
errdetail("Distribution column types don't match for "
|
||||||
|
"%s and %s.", sourceRelationName,
|
||||||
|
targetRelationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||||
|
* can be sure that there will no modifications on the colocation table
|
||||||
|
* until this transaction is committed.
|
||||||
|
*/
|
||||||
|
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
|
/* check if shard placements are colocated */
|
||||||
|
ErrorIfShardPlacementsNotColocated(sourceRelationId, targetRelationId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get colocation group of the source table, if the source table does not
|
||||||
|
* have a colocation group, create a new one, and set it for the source table.
|
||||||
|
*/
|
||||||
|
sourceColocationId = TableColocationId(sourceRelationId);
|
||||||
|
if (sourceColocationId == INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
uint32 shardCount = ShardIntervalCount(sourceRelationId);
|
||||||
|
uint32 shardReplicationFactor = TableShardReplicationFactor(sourceRelationId);
|
||||||
|
|
||||||
|
sourceColocationId = CreateColocationGroup(shardCount, shardReplicationFactor,
|
||||||
|
sourceDistributionColumnType);
|
||||||
|
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* finally set colocation group for the target relation */
|
||||||
|
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
|
||||||
|
|
||||||
|
heap_close(pgDistColocation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ErrorIfShardPlacementsNotColocated checks if the shard placements of the
|
||||||
|
* given two relations are physically colocated. It errors out in any of
|
||||||
|
* following cases:
|
||||||
|
* 1.Shard counts are different,
|
||||||
|
* 2.Shard intervals don't match
|
||||||
|
* 3.Shard placements are not colocated (not on the same node)
|
||||||
|
* 4.Shard placements have different health states
|
||||||
|
*
|
||||||
|
* Note that, this functions assumes that both tables are hash distributed.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId)
|
||||||
|
{
|
||||||
|
List *leftShardIntervalList = NIL;
|
||||||
|
List *rightShardIntervalList = NIL;
|
||||||
|
ListCell *leftShardIntervalCell = NULL;
|
||||||
|
ListCell *rightShardIntervalCell = NULL;
|
||||||
|
char *leftRelationName = NULL;
|
||||||
|
char *rightRelationName = NULL;
|
||||||
|
uint32 leftShardCount = 0;
|
||||||
|
uint32 rightShardCount = 0;
|
||||||
|
|
||||||
|
/* get sorted shard interval lists for both tables */
|
||||||
|
leftShardIntervalList = LoadShardIntervalList(leftRelationId);
|
||||||
|
rightShardIntervalList = LoadShardIntervalList(rightRelationId);
|
||||||
|
|
||||||
|
/* prevent concurrent placement changes */
|
||||||
|
LockShardListMetadata(leftShardIntervalList, ShareLock);
|
||||||
|
LockShardListMetadata(rightShardIntervalList, ShareLock);
|
||||||
|
|
||||||
|
leftRelationName = get_rel_name(leftRelationId);
|
||||||
|
rightRelationName = get_rel_name(rightRelationId);
|
||||||
|
|
||||||
|
leftShardCount = list_length(leftShardIntervalList);
|
||||||
|
rightShardCount = list_length(rightShardIntervalList);
|
||||||
|
|
||||||
|
if (leftShardCount != rightShardCount)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
leftRelationName, rightRelationName),
|
||||||
|
errdetail("Shard counts don't match for %s and %s.",
|
||||||
|
leftRelationName, rightRelationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* compare shard intervals one by one */
|
||||||
|
forboth(leftShardIntervalCell, leftShardIntervalList,
|
||||||
|
rightShardIntervalCell, rightShardIntervalList)
|
||||||
|
{
|
||||||
|
ShardInterval *leftInterval = (ShardInterval *) lfirst(leftShardIntervalCell);
|
||||||
|
ShardInterval *rightInterval = (ShardInterval *) lfirst(rightShardIntervalCell);
|
||||||
|
|
||||||
|
List *leftPlacementList = NIL;
|
||||||
|
List *rightPlacementList = NIL;
|
||||||
|
List *sortedLeftPlacementList = NIL;
|
||||||
|
List *sortedRightPlacementList = NIL;
|
||||||
|
ListCell *leftPlacementCell = NULL;
|
||||||
|
ListCell *rightPlacementCell = NULL;
|
||||||
|
|
||||||
|
uint64 leftShardId = leftInterval->shardId;
|
||||||
|
uint64 rightShardId = rightInterval->shardId;
|
||||||
|
|
||||||
|
bool shardsIntervalsEqual = ShardsIntervalsEqual(leftInterval, rightInterval);
|
||||||
|
if (!shardsIntervalsEqual)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
leftRelationName, rightRelationName),
|
||||||
|
errdetail("Shard intervals don't match for %s and %s.",
|
||||||
|
leftRelationName, rightRelationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
leftPlacementList = ShardPlacementList(leftShardId);
|
||||||
|
rightPlacementList = ShardPlacementList(rightShardId);
|
||||||
|
|
||||||
|
/* sort shard placements according to the node */
|
||||||
|
sortedLeftPlacementList = SortList(leftPlacementList,
|
||||||
|
CompareShardPlacementsByNode);
|
||||||
|
sortedRightPlacementList = SortList(rightPlacementList,
|
||||||
|
CompareShardPlacementsByNode);
|
||||||
|
|
||||||
|
/* compare shard placements one by one */
|
||||||
|
forboth(leftPlacementCell, sortedLeftPlacementList,
|
||||||
|
rightPlacementCell, sortedRightPlacementList)
|
||||||
|
{
|
||||||
|
ShardPlacement *leftPlacement =
|
||||||
|
(ShardPlacement *) lfirst(leftPlacementCell);
|
||||||
|
ShardPlacement *rightPlacement =
|
||||||
|
(ShardPlacement *) lfirst(rightPlacementCell);
|
||||||
|
int nodeCompare = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If shard placements are on different nodes, these shard
|
||||||
|
* placements are not colocated.
|
||||||
|
*/
|
||||||
|
nodeCompare = CompareShardPlacementsByNode((void *) &leftPlacement,
|
||||||
|
(void *) &rightPlacement);
|
||||||
|
if (nodeCompare != 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
leftRelationName, rightRelationName),
|
||||||
|
errdetail("Shard %ld of %s and shard %ld of %s "
|
||||||
|
"are not colocated.",
|
||||||
|
leftShardId, leftRelationName,
|
||||||
|
rightShardId, rightRelationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* we also don't allow colocated shards to be in different shard states */
|
||||||
|
if (leftPlacement->shardState != rightPlacement->shardState)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("cannot colocate tables %s and %s",
|
||||||
|
leftRelationName, rightRelationName),
|
||||||
|
errdetail("%s and %s have shard placements in "
|
||||||
|
"different shard states.",
|
||||||
|
leftRelationName, rightRelationName)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShardsIntervalsEqual checks if two shard intervals of hash distributed
|
||||||
|
* tables are equal. Note that, this function doesn't work with non-hash
|
||||||
|
* partitioned table's shards.
|
||||||
|
*
|
||||||
|
* We do min/max value check here to decide whether two shards are colocated,
|
||||||
|
* instead we can simply use FindShardIntervalIndex function on both shards then
|
||||||
|
* but do index check, but we avoid it because this way it is more cheaper.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval)
|
||||||
|
{
|
||||||
|
int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue);
|
||||||
|
int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue);
|
||||||
|
int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue);
|
||||||
|
int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue);
|
||||||
|
|
||||||
|
bool minValuesEqual = leftShardMinValue == rightShardMinValue;
|
||||||
|
bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue;
|
||||||
|
|
||||||
|
return minValuesEqual && maxValuesEqual;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CompareShardPlacementsByNode compares two shard placements by their nodename
|
||||||
|
* and nodeport.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
|
||||||
|
{
|
||||||
|
const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement);
|
||||||
|
const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement);
|
||||||
|
|
||||||
|
char *leftNodeName = leftPlacement->nodeName;
|
||||||
|
char *rightNodeName = rightPlacement->nodeName;
|
||||||
|
|
||||||
|
uint32 leftNodePort = leftPlacement->nodePort;
|
||||||
|
uint32 rightNodePort = rightPlacement->nodePort;
|
||||||
|
|
||||||
|
/* first compare node names */
|
||||||
|
int nodeNameCompare = strncmp(leftNodeName, rightNodeName, WORKER_LENGTH);
|
||||||
|
if (nodeNameCompare != 0)
|
||||||
|
{
|
||||||
|
return nodeNameCompare;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* if node names are same, check node ports */
|
||||||
|
if (leftNodePort < rightNodePort)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (leftNodePort > rightNodePort)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
|
||||||
|
* for the given relation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
|
||||||
|
{
|
||||||
|
Relation pgDistPartition = NULL;
|
||||||
|
HeapTuple heapTuple = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
bool indexOK = true;
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
ScanKeyData scanKey[scanKeyCount];
|
||||||
|
Datum values[Natts_pg_dist_partition];
|
||||||
|
bool isNull[Natts_pg_dist_partition];
|
||||||
|
bool replace[Natts_pg_dist_partition];
|
||||||
|
|
||||||
|
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||||
|
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(pgDistPartition,
|
||||||
|
DistPartitionLogicalRelidIndexId(), indexOK,
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
if (!HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
char *distributedRelationName = get_rel_name(distributedRelationId);
|
||||||
|
ereport(ERROR, (errmsg("could not find valid entry for relation %s",
|
||||||
|
distributedRelationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
memset(values, 0, sizeof(replace));
|
||||||
|
memset(isNull, false, sizeof(isNull));
|
||||||
|
memset(replace, false, sizeof(replace));
|
||||||
|
|
||||||
|
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
|
||||||
|
isNull[Anum_pg_dist_partition_colocationid - 1] = false;
|
||||||
|
replace[Anum_pg_dist_partition_colocationid - 1] = true;
|
||||||
|
|
||||||
|
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isNull, replace);
|
||||||
|
simple_heap_update(pgDistPartition, &heapTuple->t_self, heapTuple);
|
||||||
|
|
||||||
|
CatalogUpdateIndexes(pgDistPartition, heapTuple);
|
||||||
|
CitusInvalidateRelcacheByRelid(distributedRelationId);
|
||||||
|
|
||||||
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(pgDistPartition, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TableColocationId function returns co-location id of given table. This function
|
* TableColocationId function returns co-location id of given table. This function
|
||||||
* errors out if given table is not distributed.
|
* errors out if given table is not distributed.
|
||||||
|
@ -81,23 +444,9 @@ ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInter
|
||||||
|
|
||||||
if (tablesColocated)
|
if (tablesColocated)
|
||||||
{
|
{
|
||||||
/*
|
bool shardIntervalEqual = ShardsIntervalsEqual(leftShardInterval,
|
||||||
* We do min/max value check here to decide whether two shards are co=located,
|
rightShardInterval);
|
||||||
* instead we can simply use FindShardIntervalIndex function on both shards then
|
return shardIntervalEqual;
|
||||||
* but do index check, but we avoid it because this way it is more cheaper.
|
|
||||||
*
|
|
||||||
* Having co-located tables implies that tables are partitioned by hash partition
|
|
||||||
* therefore it is safe to use DatumGetInt32 here.
|
|
||||||
*/
|
|
||||||
int32 leftShardMinValue = DatumGetInt32(leftShardInterval->minValue);
|
|
||||||
int32 leftShardMaxValue = DatumGetInt32(leftShardInterval->maxValue);
|
|
||||||
int32 rightShardMinValue = DatumGetInt32(rightShardInterval->minValue);
|
|
||||||
int32 rightShardMaxValue = DatumGetInt32(rightShardInterval->maxValue);
|
|
||||||
|
|
||||||
bool minValuesEqual = leftShardMinValue == rightShardMinValue;
|
|
||||||
bool maxValuesEqual = leftShardMaxValue == rightShardMaxValue;
|
|
||||||
|
|
||||||
return minValuesEqual && maxValuesEqual;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -25,5 +25,8 @@ extern List * ColocatedTableList(Oid distributedTableId);
|
||||||
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
|
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
|
||||||
extern Oid ColocatedTableId(Oid colocationId);
|
extern Oid ColocatedTableId(Oid colocationId);
|
||||||
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
|
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
|
||||||
|
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||||
|
Oid distributionColumnType);
|
||||||
|
|
||||||
|
|
||||||
#endif /* COLOCATION_UTILS_H_ */
|
#endif /* COLOCATION_UTILS_H_ */
|
||||||
|
|
|
@ -57,6 +57,7 @@ typedef struct ShardPlacement
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations to read shard and shard placement data */
|
/* Function declarations to read shard and shard placement data */
|
||||||
|
extern uint32 TableShardReplicationFactor(Oid relationId);
|
||||||
extern List * LoadShardIntervalList(Oid relationId);
|
extern List * LoadShardIntervalList(Oid relationId);
|
||||||
extern int ShardIntervalCount(Oid relationId);
|
extern int ShardIntervalCount(Oid relationId);
|
||||||
extern List * LoadShardList(Oid relationId);
|
extern List * LoadShardList(Oid relationId);
|
||||||
|
|
|
@ -110,6 +110,7 @@ extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
||||||
int shardIndex, uint64 shardId, char *newShardOwner,
|
int shardIndex, uint64 shardId, char *newShardOwner,
|
||||||
List *ddlCommandList, List *foreignConstraintCommadList);
|
List *ddlCommandList, List *foreignConstraintCommadList);
|
||||||
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
|
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
|
||||||
|
extern void CheckHashPartitionedTable(Oid distributedTableId);
|
||||||
|
|
||||||
/* Function declarations for generating metadata for shard and placement creation */
|
/* Function declarations for generating metadata for shard and placement creation */
|
||||||
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
||||||
|
|
|
@ -442,7 +442,7 @@ SELECT * FROM pg_dist_colocation
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY logicalrelid;
|
||||||
logicalrelid | colocationid
|
logicalrelid | colocationid
|
||||||
---------------+--------------
|
---------------+--------------
|
||||||
table1_groupa | 1
|
table1_groupa | 1
|
||||||
|
@ -659,3 +659,124 @@ ORDER BY
|
||||||
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
|
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
|
||||||
(56 rows)
|
(56 rows)
|
||||||
|
|
||||||
|
-- reset colocation ids to test mark_tables_colocated
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
||||||
|
DELETE FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||||
|
UPDATE pg_dist_partition SET colocationid = 0
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||||
|
-- check metadata
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype
|
||||||
|
--------------+------------+-------------------+------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
--------------+--------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- first check failing cases
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
|
||||||
|
ERROR: cannot colocate tables table1_groupb and table1_groupc
|
||||||
|
DETAIL: Distribution column types don't match for table1_groupb and table1_groupc.
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupD']);
|
||||||
|
ERROR: cannot colocate tables table1_groupb and table1_groupd
|
||||||
|
DETAIL: Shard counts don't match for table1_groupb and table1_groupd.
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupE']);
|
||||||
|
ERROR: cannot colocate tables table1_groupb and table1_groupe
|
||||||
|
DETAIL: Shard 1300027 of table1_groupb and shard 1300047 of table1_groupe are not colocated.
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']);
|
||||||
|
ERROR: cannot colocate tables table1_groupb and table1_groupf
|
||||||
|
DETAIL: Shard counts don't match for table1_groupb and table1_groupf.
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']);
|
||||||
|
ERROR: cannot colocate tables table1_groupb and table1_groupd
|
||||||
|
DETAIL: Shard counts don't match for table1_groupb and table1_groupd.
|
||||||
|
-- check metadata to see failing calls didn't have any side effects
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype
|
||||||
|
--------------+------------+-------------------+------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
--------------+--------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
-- check successfully cololated tables
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT mark_tables_colocated('table1_groupC', ARRAY['table2_groupC']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT mark_tables_colocated('table1_groupD', ARRAY['table2_groupD']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT mark_tables_colocated('table1_groupE', ARRAY['table2_groupE', 'table3_groupE']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check to colocate with itself
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
|
||||||
|
mark_tables_colocated
|
||||||
|
-----------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- check metadata to see colocation groups are created successfully
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
colocationid | shardcount | replicationfactor | distributioncolumntype
|
||||||
|
--------------+------------+-------------------+------------------------
|
||||||
|
2 | 2 | 1 | 23
|
||||||
|
3 | 2 | 2 | 25
|
||||||
|
4 | 4 | 2 | 23
|
||||||
|
5 | 2 | 2 | 23
|
||||||
|
6 | 1 | 2 | 23
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
logicalrelid | colocationid
|
||||||
|
---------------+--------------
|
||||||
|
table1_groupb | 2
|
||||||
|
table2_groupb | 2
|
||||||
|
table1_groupc | 3
|
||||||
|
table2_groupc | 3
|
||||||
|
table1_groupd | 4
|
||||||
|
table2_groupd | 4
|
||||||
|
table1_groupe | 5
|
||||||
|
table2_groupe | 5
|
||||||
|
table3_groupe | 5
|
||||||
|
table1_groupf | 6
|
||||||
|
table2_groupf | 6
|
||||||
|
(11 rows)
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-16';
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
FROM pg_depend AS pgd,
|
FROM pg_depend AS pgd,
|
||||||
|
|
|
@ -212,7 +212,7 @@ SELECT * FROM pg_dist_colocation
|
||||||
|
|
||||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
ORDER BY colocationid;
|
ORDER BY logicalrelid;
|
||||||
|
|
||||||
-- check effects of dropping tables
|
-- check effects of dropping tables
|
||||||
DROP TABLE table1_groupA;
|
DROP TABLE table1_groupA;
|
||||||
|
@ -295,3 +295,54 @@ ORDER BY
|
||||||
shardmaxvalue::integer,
|
shardmaxvalue::integer,
|
||||||
shardid,
|
shardid,
|
||||||
placementid;
|
placementid;
|
||||||
|
|
||||||
|
-- reset colocation ids to test mark_tables_colocated
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
||||||
|
DELETE FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||||
|
UPDATE pg_dist_partition SET colocationid = 0
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||||
|
|
||||||
|
-- check metadata
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
|
||||||
|
-- first check failing cases
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupC']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupD']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupE']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupF']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB', 'table1_groupD']);
|
||||||
|
|
||||||
|
-- check metadata to see failing calls didn't have any side effects
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
|
||||||
|
-- check successfully cololated tables
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table2_groupB']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupC', ARRAY['table2_groupC']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupD', ARRAY['table2_groupD']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupE', ARRAY['table2_groupE', 'table3_groupE']);
|
||||||
|
SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']);
|
||||||
|
|
||||||
|
-- check to colocate with itself
|
||||||
|
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
|
||||||
|
|
||||||
|
-- check metadata to see colocation groups are created successfully
|
||||||
|
SELECT * FROM pg_dist_colocation
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY colocationid;
|
||||||
|
|
||||||
|
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||||
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
ORDER BY logicalrelid;
|
||||||
|
|
|
@ -55,6 +55,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
ALTER EXTENSION citus UPDATE TO '6.0-13';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
ALTER EXTENSION citus UPDATE TO '6.0-14';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
ALTER EXTENSION citus UPDATE TO '6.0-15';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-16';
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
SELECT COUNT(*)
|
SELECT COUNT(*)
|
||||||
|
|
Loading…
Reference in New Issue