Merge pull request #867 from citusdata/add_create_distributed_table

Add create_distributed_table() udf
pull/892/head
Metin Döşlü 2016-10-20 11:43:25 +03:00 committed by GitHub
commit b6a9b61d32
25 changed files with 1308 additions and 238 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -78,6 +78,8 @@ $(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql
cat $^ > $@
$(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql
cat $^ > $@
$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,112 @@
/* citus--6.0-10--6.0-11.sql */
SET search_path = 'pg_catalog';
CREATE SEQUENCE citus.pg_dist_colocationid_seq
MINVALUE 1
MAXVALUE 4294967296;
ALTER SEQUENCE citus.pg_dist_colocationid_seq SET SCHEMA pg_catalog;
/* add pg_dist_colocation */
CREATE TABLE citus.pg_dist_colocation(
colocationid int NOT NULL PRIMARY KEY,
shardcount int NOT NULL,
replicationfactor int NOT NULL,
distributioncolumntype oid NOT NULL
);
ALTER TABLE citus.pg_dist_colocation SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_colocation_configuration_index
ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype);
CREATE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type)
IS 'creates a distributed table';
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
node_names text[] := '{}';
node_ports bigint[] := '{}';
node_name text;
node_port bigint;
table_colocation_id integer;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
-- Must accumulate set of affected nodes before deleting placements, as
-- master_drop_all_shards will erase their rows, making it impossible for
-- us to know where to drop sequences (which must be dropped after shards,
-- since they have default value expressions which depend on sequences).
SELECT array_agg(sp.nodename), array_agg(sp.nodeport)
INTO node_names, node_ports
FROM pg_event_trigger_dropped_objects() AS dobj,
pg_dist_shard AS s,
pg_dist_shard_placement AS sp
WHERE dobj.object_type IN ('table', 'foreign table')
AND dobj.objid = s.logicalrelid
AND s.shardid = sp.shardid;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF v_obj.object_type NOT IN ('table', 'foreign table') THEN
CONTINUE;
END IF;
-- nothing to do if not a distributed table
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN
CONTINUE;
END IF;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- get colocation group
SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- delete partition entry
DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- drop colocation group if all referencing tables are dropped
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN
DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id;
END IF;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
FOR node_name, node_port IN
SELECT DISTINCT name, port
FROM unnest(node_names, node_ports) AS nodes(name, port)
LOOP
PERFORM master_drop_sequences(sequence_names, node_name, node_port);
END LOOP;
END;
$cdbdt$;
COMMENT ON FUNCTION citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
ALTER TABLE pg_dist_partition ALTER COLUMN colocationid TYPE integer;
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-10'
default_version = '6.0-11'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -9,6 +9,7 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/hash.h"
@ -27,14 +28,17 @@
#include "catalog/pg_trigger.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h"
#include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "executor/spi.h"
#include "distributed/multi_logical_planner.h"
#include "nodes/execnodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h"
@ -51,28 +55,38 @@
/* local function forward declarations */
static void ConvertToDistributedTable(Oid relationId,
text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId);
static char LookupDistributionMethod(Oid distributionMethodOid);
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
static void CreateTruncateTrigger(Oid relationId);
static uint32 ColocationId(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table);
/*
* master_create_distributed_table accepts a table, distribution column and
* method and performs the corresponding catalog changes.
*
* XXX: We should perform more checks here to see if this table is fit for
* partitioning. At a minimum, we should validate the following: (i) this node
* runs as the master node, (ii) table does not make use of the inheritance
* mechanism, (iii) table does not own columns that are sequences, and (iv)
* table does not have collated columns. (v) table does not have
* preexisting content.
* Note that this udf is depreciated and cannot create colocated tables, so we
* always use INVALID_COLOCATION_ID.
*/
Datum
master_create_distributed_table(PG_FUNCTION_ARGS)
@ -81,86 +95,169 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
ConvertToDistributedTable(distributedRelationId, distributionColumnText,
distributionMethodOid, INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}
/*
* create_distributed_table accepts a table, distribution column and
* distribution method, then it creates a distributed table.
*/
Datum
create_distributed_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
Relation distributedRelation = NULL;
TupleDesc relationDesc = NULL;
char *distributedRelationName = NULL;
char relationKind = '\0';
Relation pgDistPartition = NULL;
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
const char replicationModel = 'c';
char *distributionColumnName = text_to_cstring(distributionColumnText);
Node *distributionKey = NULL;
Relation pgDistColocation = NULL;
Var *distributionColumn = NULL;
char *distributionKeyString = NULL;
char *distributionColumnName = NULL;
int distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
List *indexOidList = NIL;
ListCell *indexOidCell = NULL;
/* if distribution method is not hash, just create partition metadata */
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* get distribution column type */
distributionColumnName = text_to_cstring(distributionColumnText);
distributedRelation = relation_open(relationId, AccessShareLock);
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumnType = distributionColumn->vartype;
/*
* Lock target relation with an access exclusive lock - there's no way to
* make sense of this table until we've committed, and we don't want
* multiple backends manipulating this relation.
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock);
relationDesc = RelationGetDescr(distributedRelation);
distributedRelationName = RelationGetRelationName(distributedRelation);
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
EnsureTableOwner(distributedRelationId);
/* check for existing colocations */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionColumnType);
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
/*
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
{
char *relationName = get_rel_name(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
CreateColocatedShards(relationId, colocatedTableId);
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
relationName, colocationId)));
}
else
{
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
/* use the default way to create shards */
CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor);
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
PG_RETURN_VOID();
}
/*
* ConvertToDistributedTable converts the given regular PostgreSQL table into a
* distributed table. First, it checks if the given table can be distributed,
* then it creates related tuple in pg_dist_partition.
*
* XXX: We should perform more checks here to see if this table is fit for
* partitioning. At a minimum, we should validate the following: (i) this node
* runs as the master node, (ii) table does not make use of the inheritance
* mechanism, (iii) table does not own columns that are sequences, and (iv)
* table does not have collated columns.
*/
static void
ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId)
{
Relation relation = NULL;
TupleDesc relationDesc = NULL;
char *relationName = NULL;
char relationKind = 0;
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = NULL;
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple
* backends manipulating this relation.
*/
relation = relation_open(relationId, ExclusiveLock);
relationDesc = RelationGetDescr(relation);
relationName = RelationGetRelationName(relation);
EnsureTableOwner(relationId);
/* check that the relation is not already distributed */
if (IsDistributedTable(distributedRelationId))
if (IsDistributedTable(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" is already distributed",
distributedRelationName)));
relationName)));
}
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
if (relationDesc->tdhasoid)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", distributedRelationName),
errmsg("cannot distribute relation: %s", relationName),
errdetail("Distributed relations must not specify the WITH "
"(OIDS) option in their definitions.")));
}
/* verify target relation is either regular or foreign table */
relationKind = distributedRelation->rd_rel->relkind;
relationKind = relation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot distribute relation: %s",
distributedRelationName),
relationName),
errdetail("Distributed relations must be regular or "
"foreign tables.")));
}
/* check that the relation does not contain any rows */
if (!LocalTableEmpty(distributedRelationId))
if (!LocalTableEmpty(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"",
distributedRelationName),
relationName),
errdetail("Relation \"%s\" contains data.",
distributedRelationName),
relationName),
errhint("Empty your table before distributing it.")));
}
distributionKey = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumn = BuildDistributionKeyFromColumnName(relation,
distributionColumnName);
distributionKeyString = nodeToString(distributionKey);
/* the distribution key should always be a Var for now */
Assert(IsA(distributionKey, Var));
distributionColumn = (Var *) distributionKey;
/* check for support function needed by specified partition method */
if (distributionMethod == DISTRIBUTE_BY_HASH)
@ -193,17 +290,47 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
}
}
ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn);
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId);
relation_close(relation, NoLock);
/*
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables,
* since currently there is no way of enforcing uniqueness for overlapping shards.
*
* Similarly, do not allow such constraints it they do not
* include partition column. This check is important for two reasons. First,
* currently Citus does not enforce uniqueness constraint on multiple shards.
* Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no
* further check for constraints.
* PostgreSQL supports truncate trigger for regular relations only.
* Truncate on foreign tables is not supported.
*/
indexOidList = RelationGetIndexList(distributedRelation);
if (relationKind == RELKIND_RELATION)
{
CreateTruncateTrigger(relationId);
}
}
/*
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude
* constraints.
*
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned
* tables, since currently there is no way of enforcing uniqueness for
* overlapping shards.
*
* Similarly, do not allow such constraints if they do not include partition
* column. This check is important for two reasons:
* i. First, currently Citus does not enforce uniqueness constraint on multiple
* shards.
* ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed
* with no further check for constraints.
*/
static void
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn)
{
char *relationName = RelationGetRelationName(relation);
List *indexOidList = RelationGetIndexList(relation);
ListCell *indexOidCell = NULL;
foreach(indexOidCell, indexOidList)
{
Oid indexOid = lfirst_oid(indexOidCell);
@ -233,7 +360,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
{
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint",
distributedRelationName),
relationName),
errdetail("UNIQUE constraints, EXCLUDE constraints, "
"and PRIMARY KEYs on "
"append-partitioned tables cannot be enforced."),
@ -271,7 +398,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: \"%s\"",
distributedRelationName),
relationName),
errdetail("Distributed relations cannot have UNIQUE, "
"EXCLUDE, or PRIMARY KEY constraints that do not "
"include the partition column (with an equality "
@ -280,18 +407,40 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
index_close(indexDesc, NoLock);
}
}
/*
* InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition.
*/
static void
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId)
{
Relation pgDistPartition = NULL;
const char replicationModel = 'c';
char *distributionColumnString = NULL;
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
distributionColumnString = nodeToString((Node *) distributionColumn);
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(distributedRelationId);
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionKeyString);
newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID;
CStringGetTextDatum(distributionColumnString);
newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId;
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
@ -299,25 +448,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
/* finally insert tuple, build index entries & register cache invalidation */
simple_heap_insert(pgDistPartition, newTuple);
CatalogUpdateIndexes(pgDistPartition, newTuple);
CitusInvalidateRelcacheByRelid(distributedRelationId);
CitusInvalidateRelcacheByRelid(relationId);
RecordDistributedRelationDependencies(distributedRelationId, distributionKey);
RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock);
relation_close(distributedRelation, NoLock);
/*
* PostgreSQL supports truncate trigger for regular relations only.
* Truncate on foreign tables is not supported.
*/
if (relationKind == RELKIND_RELATION)
{
CreateTruncateTrigger(distributedRelationId);
}
PG_RETURN_VOID();
}
@ -525,3 +661,126 @@ CreateTruncateTrigger(Oid relationId)
CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
internal);
}
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
static uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 3;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
static uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}

View File

@ -58,13 +58,8 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards);
/*
* master_create_worker_shards creates empty shards for the given table based
* on the specified number of initial shards. The function first gets a list of
* candidate nodes and issues DDL commands on the nodes to create empty shard
* placements on those nodes. The function then updates metadata on the master
* node to make this shard (and its placements) visible. Note that the function
* assumes the table is hash partitioned and calculates the min/max hash token
* ranges for each shard, giving them an equal split of the hash space.
* master_create_worker_shards is a user facing function to create worker shards
* for the given relation in round robin order.
*/
Datum
master_create_worker_shards(PG_FUNCTION_ARGS)
@ -74,10 +69,27 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
int32 replicationFactor = PG_GETARG_INT32(2);
Oid distributedTableId = ResolveRelationId(tableNameText);
char relationKind = get_rel_relkind(distributedTableId);
char *tableName = text_to_cstring(tableNameText);
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
PG_RETURN_VOID();
}
/*
* CreateShardsWithRoundRobinPolicy creates empty shards for the given table
* based on the specified number of initial shards. The function first gets a
* list of candidate nodes and issues DDL commands on the nodes to create empty
* shard placements on those nodes. The function then updates metadata on the
* master node to make this shard (and its placements) visible. Note that the
* function assumes the table is hash partitioned and calculates the min/max
* hash token ranges for each shard, giving them an equal split of the hash space.
*/
void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor)
{
char *relationOwner = NULL;
char shardStorageType = '\0';
char shardStorageType = 0;
List *workerNodeList = NIL;
List *ddlCommandList = NIL;
int32 workerNodeCount = 0;
@ -106,6 +118,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
existingShardList = LoadShardList(distributedTableId);
if (existingShardList != NIL)
{
char *tableName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already had shards created for it",
tableName)));
@ -156,22 +169,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
}
/* set shard storage type according to relation type */
if (relationKind == RELKIND_FOREIGN_TABLE)
{
bool cstoreTable = CStoreTable(distributedTableId);
if (cstoreTable)
{
shardStorageType = SHARD_STORAGE_COLUMNAR;
}
else
{
shardStorageType = SHARD_STORAGE_FOREIGN;
}
}
else
{
shardStorageType = SHARD_STORAGE_TABLE;
}
shardStorageType = ShardStorageType(distributedTableId);
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
@ -182,8 +180,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
text *maxHashTokenText = NULL;
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
Datum shardIdDatum = master_get_new_shardid(NULL);
int64 shardId = DatumGetInt64(shardIdDatum);
uint64 shardId = GetNextShardId();
/* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1))
@ -217,8 +214,104 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
}
RESUME_INTERRUPTS();
}
PG_RETURN_VOID();
/*
* CreateColocatedShards creates shards for the target relation colocated with
* the source relation.
*/
void
CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
{
char *targetTableRelationOwner = NULL;
char targetShardStorageType = 0;
List *existingShardList = NIL;
List *sourceShardIntervalList = NIL;
List *targetTableDDLEvents = NIL;
ListCell *sourceShardCell = NULL;
/* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId);
CheckHashPartitionedTable(sourceRelationId);
/*
* In contrast to append/range partitioned tables it makes more sense to
* require ownership privileges - shards for hash-partitioned tables are
* only created once, not continually during ingest as for the other
* partitioning types.
*/
EnsureTableOwner(targetRelationId);
/* we plan to add shards: get an exclusive metadata lock on the target relation */
LockRelationDistributionMetadata(targetRelationId, ExclusiveLock);
/* a share metadata lock is enough on the source relation */
LockRelationDistributionMetadata(sourceRelationId, ShareLock);
/* prevent placement changes of the source relation until we colocate with them */
sourceShardIntervalList = LoadShardIntervalList(sourceRelationId);
LockShardListMetadata(sourceShardIntervalList, ShareLock);
/* validate that shards haven't already been created for this table */
existingShardList = LoadShardList(targetRelationId);
if (existingShardList != NIL)
{
char *targetRelationName = get_rel_name(targetRelationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already had shards created for it",
targetRelationName)));
}
targetTableRelationOwner = TableOwner(targetRelationId);
targetTableDDLEvents = GetTableDDLEvents(targetRelationId);
targetShardStorageType = ShardStorageType(targetRelationId);
foreach(sourceShardCell, sourceShardIntervalList)
{
ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell);
uint64 sourceShardId = sourceShardInterval->shardId;
uint64 newShardId = GetNextShardId();
ListCell *sourceShardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
foreach(sourceShardPlacementCell, sourceShardPlacementList)
{
ShardPlacement *sourcePlacement =
(ShardPlacement *) lfirst(sourceShardPlacementCell);
char *sourceNodeName = sourcePlacement->nodeName;
int32 sourceNodePort = sourcePlacement->nodePort;
bool created = WorkerCreateShard(targetRelationId, sourceNodeName,
sourceNodePort, newShardId,
targetTableRelationOwner,
targetTableDDLEvents);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0;
InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState,
shardSize, sourceNodeName, sourceNodePort);
}
else
{
char *targetRelationName = get_rel_name(targetRelationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" could not be colocated with %s",
targetRelationName, sourceRelationName)));
}
}
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
shardMinValueText, shardMaxValueText);
}
}

View File

@ -55,6 +55,7 @@
/* Shard related configuration */
int ShardCount = 32;
int ShardReplicationFactor = 2; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
@ -93,8 +94,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
HeapTuple metadataTuple = NULL;
TupleDesc metadataDescriptor = NULL;
uint64 shardMaxSizeInBytes = 0;
char relationType = 0;
char storageType = 0;
char shardStorageType = 0;
Datum values[TABLE_METADATA_FIELDS];
bool isNulls[TABLE_METADATA_FIELDS];
@ -121,26 +121,10 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
/* get storage type */
relationType = get_rel_relkind(relationId);
if (relationType == RELKIND_RELATION)
{
storageType = SHARD_STORAGE_TABLE;
}
else if (relationType == RELKIND_FOREIGN_TABLE)
{
bool cstoreTable = CStoreTable(relationId);
if (cstoreTable)
{
storageType = SHARD_STORAGE_COLUMNAR;
}
else
{
storageType = SHARD_STORAGE_FOREIGN;
}
}
shardStorageType = ShardStorageType(relationId);
values[0] = ObjectIdGetDatum(relationId);
values[1] = storageType;
values[1] = shardStorageType;
values[2] = partitionEntry->partitionMethod;
values[3] = partitionKey;
values[4] = Int32GetDatum(ShardReplicationFactor);
@ -247,12 +231,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
/*
* master_get_new_shardid allocates and returns a unique shardId for the shard
* to be created. This allocation occurs both in shared memory and in write
* ahead logs; writing to logs avoids the risk of having shardId collisions.
*
* Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node.
* master_get_new_shardid is a user facing wrapper function around GetNextShardId()
* which allocates and returns a unique shardId for the shard to be created.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
@ -260,6 +240,24 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
*/
Datum
master_get_new_shardid(PG_FUNCTION_ARGS)
{
uint64 shardId = GetNextShardId();
Datum shardIdDatum = Int64GetDatum(shardId);
PG_RETURN_DATUM(shardIdDatum);
}
/*
* GetNextShardId allocates and returns a unique shardId for the shard to be
* created. This allocation occurs both in shared memory and in write ahead
* logs; writing to logs avoids the risk of having shardId collisions.
*
* Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node.
*/
uint64
GetNextShardId()
{
text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
@ -267,6 +265,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum shardIdDatum = 0;
uint64 shardId = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
@ -276,7 +275,9 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum);
shardId = DatumGetInt64(shardIdDatum);
return shardId;
}
@ -729,6 +730,41 @@ GetTableDDLEvents(Oid relationId)
}
/*
* ShardStorageType returns the shard storage type according to relation type.
*/
char
ShardStorageType(Oid relationId)
{
char shardStorageType = 0;
char relationType = get_rel_relkind(relationId);
if (relationType == RELKIND_RELATION)
{
shardStorageType = SHARD_STORAGE_TABLE;
}
else if (relationType == RELKIND_FOREIGN_TABLE)
{
bool cstoreTable = CStoreTable(relationId);
if (cstoreTable)
{
shardStorageType = SHARD_STORAGE_COLUMNAR;
}
else
{
shardStorageType = SHARD_STORAGE_FOREIGN;
}
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unexpected relation type: %c", relationType)));
}
return shardStorageType;
}
/*
* WorkerNodeGetDatum converts the worker node passed to it into its datum
* representation. To do this, the function first creates the heap tuple from

View File

@ -41,8 +41,6 @@
/* Local functions forward declarations */
static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue);
@ -66,8 +64,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
text *relationNameText = PG_GETARG_TEXT_P(0);
char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = WorkerNodeList();
Datum shardIdDatum = 0;
int64 shardId = INVALID_SHARD_ID;
uint64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0;
uint32 liveNodeCount = 0;
@ -116,8 +113,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
}
/* generate new and unique shardId from sequence */
shardIdDatum = master_get_new_shardid(NULL);
shardId = DatumGetInt64(shardIdDatum);
shardId = GetNextShardId();
/* get table DDL commands to replay on the worker node */
ddlEventList = GetTableDDLEvents(relationId);
@ -426,7 +422,7 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
* shard on the worker node. Note that this function opens a new connection for
* each DDL command, and could leave the shard in an half-initialized state.
*/
static bool
bool
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList)
{

View File

@ -215,14 +215,14 @@ DistributionCreateCommand(DistTableCacheEntry *cacheEntry)
char *qualifiedRelationName =
generate_qualified_relation_name(relationId);
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
uint64 colocationId = cacheEntry->colocationId;
uint32 colocationId = cacheEntry->colocationId;
char replicationModel = cacheEntry->replicationModel;
appendStringInfo(insertDistributionCommand,
"INSERT INTO pg_dist_partition "
"(logicalrelid, partmethod, partkey, colocationid, repmodel) "
"VALUES "
"(%s::regclass, '%c', column_name_to_column(%s,%s), %lu, '%c')",
"(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')",
quote_literal_cstr(qualifiedRelationName),
distributionMethod,
quote_literal_cstr(qualifiedRelationName),

View File

@ -332,6 +332,17 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.shard_count",
gettext_noop("Sets the number of shards for a new hash-partitioned table"
"created with create_distributed_table()."),
NULL,
&ShardCount,
32, 1, 64000,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.shard_replication_factor",
gettext_noop("Sets the replication factor for shards."),

View File

@ -35,7 +35,7 @@ Datum
get_table_colocation_id(PG_FUNCTION_ARGS)
{
Oid distributedTableId = PG_GETARG_OID(0);
int colocationId = TableColocationId(distributedTableId);
uint32 colocationId = TableColocationId(distributedTableId);
PG_RETURN_INT32(colocationId);
}

View File

@ -224,8 +224,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS)
Oid distributedTableId = PG_GETARG_OID(0);
StringInfo minInfo = makeStringInfo();
StringInfo maxInfo = makeStringInfo();
Datum newShardIdDatum = master_get_new_shardid(NULL);
int64 newShardId = DatumGetInt64(newShardIdDatum);
uint64 newShardId = GetNextShardId();
text *maxInfoText = NULL;
text *minInfoText = NULL;

View File

@ -23,10 +23,10 @@
/*
* TableColocationId function returns co-location id of given table. This function errors
* out if given table is not distributed.
* TableColocationId function returns co-location id of given table. This function
* errors out if given table is not distributed.
*/
uint64
uint32
TableColocationId(Oid distributedTableId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
@ -44,8 +44,8 @@ TableColocationId(Oid distributedTableId)
bool
TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId)
{
uint64 leftColocationId = INVALID_COLOCATION_ID;
uint64 rightColocationId = INVALID_COLOCATION_ID;
uint32 leftColocationId = INVALID_COLOCATION_ID;
uint32 rightColocationId = INVALID_COLOCATION_ID;
if (leftDistributedTableId == rightDistributedTableId)
{
@ -112,7 +112,7 @@ ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInter
List *
ColocatedTableList(Oid distributedTableId)
{
int tableColocationId = TableColocationId(distributedTableId);
uint32 tableColocationId = TableColocationId(distributedTableId);
List *colocatedTableList = NIL;
Relation pgDistPartition = NULL;
@ -134,7 +134,7 @@ ColocatedTableList(Oid distributedTableId)
}
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId));
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId));
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -219,3 +219,44 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
return colocatedShardList;
}
/*
* ColocatedTableId returns an arbitrary table which belongs to given colocation
* group. If there is not such a colocation group, it returns invalid oid.
*/
Oid
ColocatedTableId(Oid colocationId)
{
Oid colocatedTableId = InvalidOid;
Relation pgDistPartition = NULL;
TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
bool indexOK = true;
bool isNull = false;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
/* prevent DELETE statements */
pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionColocationidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
}
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, ShareLock);
return colocatedTableId;
}

View File

@ -57,7 +57,7 @@ column_name_to_column(PG_FUNCTION_ARGS)
relation = relation_open(relationId, AccessShareLock);
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
column = BuildDistributionKeyFromColumnName(relation, columnName);
columnNodeString = nodeToString(column);
columnNodeText = cstring_to_text(columnNodeString);
@ -83,7 +83,7 @@ column_name_to_column_id(PG_FUNCTION_ARGS)
relation = relation_open(distributedTableId, AccessExclusiveLock);
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
column = BuildDistributionKeyFromColumnName(relation, columnName);
relation_close(relation, NoLock);
@ -121,12 +121,12 @@ column_to_column_name(PG_FUNCTION_ARGS)
* specified column does not exist or is not suitable to be used as a
* distribution column.
*/
Node *
Var *
BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName)
{
HeapTuple columnTuple = NULL;
Form_pg_attribute columnForm = NULL;
Var *column = NULL;
Var *distributionColumn = NULL;
char *tableName = RelationGetRelationName(distributedRelation);
/* it'd probably better to downcase identifiers consistent with SQL case folding */
@ -153,12 +153,12 @@ BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnNam
}
/* build Var referencing only the chosen distribution column */
column = makeVar(1, columnForm->attnum, columnForm->atttypid,
distributionColumn = makeVar(1, columnForm->attnum, columnForm->atttypid,
columnForm->atttypmod, columnForm->attcollation, 0);
ReleaseSysCache(columnTuple);
return (Node *) column;
return distributionColumn;
}

View File

@ -55,6 +55,8 @@ static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid;
static Oid distColocationRelationId = InvalidOid;
static Oid distColocationConfigurationIndexId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid;
@ -274,7 +276,7 @@ LookupDistTableCacheEntry(Oid relationId)
HeapTuple distPartitionTuple = NULL;
char *partitionKeyString = NULL;
char partitionMethod = 0;
uint64 colocationId = INVALID_COLOCATION_ID;
uint32 colocationId = INVALID_COLOCATION_ID;
char replicationModel = 0;
List *distShardTupleList = NIL;
int shardIntervalArrayLength = 0;
@ -690,6 +692,27 @@ DistLocalGroupIdRelationId(void)
}
/* return oid of pg_dist_colocation relation */
Oid
DistColocationRelationId(void)
{
CachedRelationLookup("pg_dist_colocation", &distColocationRelationId);
return distColocationRelationId;
}
/* return oid of pg_dist_colocation_configuration_index index */
Oid
DistColocationConfigurationIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_configuration_index",
&distColocationConfigurationIndexId);
return distColocationConfigurationIndexId;
}
/* return oid of pg_dist_partition relation */
Oid
DistPartitionRelationId(void)
@ -1419,6 +1442,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid;
distNodeRelationId = InvalidOid;
distColocationRelationId = InvalidOid;
distColocationConfigurationIndexId = InvalidOid;
distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid;

View File

@ -17,11 +17,13 @@
#define INVALID_COLOCATION_ID 0
extern uint64 TableColocationId(Oid distributedTableId);
extern uint32 TableColocationId(Oid distributedTableId);
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId);
#endif /* COLOCATION_UTILS_H_ */

View File

@ -19,7 +19,7 @@
/* Remaining metadata utility functions */
extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,
extern Var * BuildDistributionKeyFromColumnName(Relation distributedRelation,
char *columnName);
extern char * ColumnNameToColumn(Oid relationId, char *columnNodeString);

View File

@ -82,6 +82,7 @@ typedef enum
/* Config variables managed via guc.c */
extern int ShardCount;
extern int ShardReplicationFactor;
extern int ShardMaxSize;
extern int ShardPlacementPolicy;
@ -89,13 +90,20 @@ extern int ShardPlacementPolicy;
/* Function declarations local to the distributed module */
extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void);
extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId);
extern char ShardStorageType(Oid relationId);
extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
/* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);

View File

@ -40,7 +40,7 @@ typedef struct
/* pg_dist_partition metadata for this table */
char *partitionKeyString;
char partitionMethod;
uint64 colocationId;
uint32 colocationId;
char replicationModel;
/* pg_dist_shard metadata (variable-length ShardInterval array) for this table */
@ -66,6 +66,8 @@ extern bool CitusHasBeenLoaded(void);
extern HTAB * GetWorkerNodeHash(void);
/* relation oids */
extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void);

View File

@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* pg_dist_colocation.h
* definition of the relation that holds the colocation information on the
* cluster (pg_dist_colocation).
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_COLOCATION_H
#define PG_DIST_COLOCATION_H
/* ----------------
* pg_dist_colocation definition.
* ----------------
*/
typedef struct FormData_pg_dist_colocation
{
uint32 colocationid;
uint32 shardcount;
uint32 replicationfactor;
Oid distributioncolumntype;
} FormData_pg_dist_colocation;
/* ----------------
* Form_pg_dist_colocation corresponds to a pointer to a tuple with
* the format of pg_dist_colocation relation.
* ----------------
*/
typedef FormData_pg_dist_colocation *Form_pg_dist_colocation;
/* ----------------
* compiler constants for pg_dist_colocation
* ----------------
*/
#define Natts_pg_dist_colocation 4
#define Anum_pg_dist_colocation_colocationid 1
#define Anum_pg_dist_colocation_shardcount 2
#define Anum_pg_dist_colocation_replicationfactor 3
#define Anum_pg_dist_colocation_distributioncolumntype 4
#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq"
#endif /* PG_DIST_COLOCATION_H */

View File

@ -25,7 +25,7 @@ typedef struct FormData_pg_dist_partition
char partmethod; /* partition method; see codes below */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text partkey; /* partition key expression */
uint64 colocationid; /* id of the co-location group of particular table belongs to */
uint32 colocationid; /* id of the co-location group of particular table belongs to */
char repmodel; /* replication model; see codes below */
#endif
} FormData_pg_dist_partition;

View File

@ -19,22 +19,22 @@ WHERE
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 3
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1000 | 3
1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows)
-- repair colocated shards
@ -55,22 +55,22 @@ WHERE
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57638 | 1000 | 1
1300000 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows)
-- test repairing NOT colocated shard
@ -179,22 +179,22 @@ WHERE
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 3
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1000 | 3
1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows)
-- repair while all placements of one shard in colocation group is unhealthy
@ -211,21 +211,21 @@ WHERE
ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3
1300000 | table1_group1 | 57637 | 1 | 3
1300001 | table1_group1 | 57637 | 1 | 1
1300001 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57638 | 1 | 1
1300002 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57637 | 1 | 1
1300003 | table1_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57638 | 1 | 1
1300004 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57637 | 1 | 1
1300005 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57638 | 1 | 1
1300006 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57637 | 1 | 1
1300007 | table2_group1 | 57638 | 1 | 1
1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1000 | 3
1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows)

View File

@ -4,7 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- create test utility function
-- ===================================================================
CREATE SEQUENCE colocation_test_seq
MINVALUE 1
MINVALUE 1000
NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same
* DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of
@ -14,7 +14,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass
RETURNS BOOL
LANGUAGE plpgsql
AS $colocate_tables$
DECLARE nextid BIGINT;
DECLARE nextid INTEGER;
BEGIN
SELECT nextval('colocation_test_seq') INTO nextid;
@ -37,7 +37,7 @@ $colocate_tables$;
-- create test functions
-- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT
RETURNS INTEGER
AS 'citus'
LANGUAGE C STRICT;
CREATE FUNCTION tables_colocated(regclass, regclass)
@ -155,7 +155,7 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1');
SELECT get_table_colocation_id('table1_group1');
get_table_colocation_id
-------------------------
1
1000
(1 row)
SELECT get_table_colocation_id('table5_groupX');
@ -339,3 +339,303 @@ SELECT find_shard_interval_index(1300016);
0
(1 row)
-- check external colocation API
SET citus.shard_count = 2;
CREATE TABLE table1_groupA ( id int );
SELECT create_distributed_table('table1_groupA', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupA ( id int );
SELECT create_distributed_table('table2_groupA', 'id');
create_distributed_table
--------------------------
(1 row)
-- change shard replication factor
SET citus.shard_replication_factor = 1;
CREATE TABLE table1_groupB ( id int );
SELECT create_distributed_table('table1_groupB', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupB ( id int );
SELECT create_distributed_table('table2_groupB', 'id');
create_distributed_table
--------------------------
(1 row)
-- revert back to default shard replication factor
SET citus.shard_replication_factor to DEFAULT;
-- change partition column type
CREATE TABLE table1_groupC ( id text );
SELECT create_distributed_table('table1_groupC', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupC ( id text );
SELECT create_distributed_table('table2_groupC', 'id');
create_distributed_table
--------------------------
(1 row)
-- change shard count
SET citus.shard_count = 4;
CREATE TABLE table1_groupD ( id int );
SELECT create_distributed_table('table1_groupD', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupD ( id int );
SELECT create_distributed_table('table2_groupD', 'id');
create_distributed_table
--------------------------
(1 row)
-- try other distribution methods
CREATE TABLE table_append ( id int );
SELECT create_distributed_table('table_append', 'id', 'append');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_range ( id int );
SELECT create_distributed_table('table_range', 'id', 'range');
create_distributed_table
--------------------------
(1 row)
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
--------------------------
(1 row)
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
(4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
logicalrelid | colocationid
---------------+--------------
table1_groupa | 1
table2_groupa | 1
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
(9 rows)
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
(1 row)
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
-- create dropped colocation group again
SET citus.shard_count = 2;
CREATE TABLE table1_groupE ( id int );
SELECT create_distributed_table('table1_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupE ( id int );
SELECT create_distributed_table('table2_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- test different table DDL
CREATE TABLE table3_groupE ( dummy_column text, id int );
SELECT create_distributed_table('table3_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- test different schema
CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
Table "public.table3_groupe_1300050"
Column | Type | Modifiers
--------------+---------+-----------
dummy_column | text |
id | integer |
\d schema_collocation.table4_groupE_1300052
Table "schema_collocation.table4_groupe_1300052"
Column | Type | Modifiers
--------+---------+-----------
id | integer |
\c - - - :master_port
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
(4 rows)
-- cross check with internal colocation API
SELECT
p1.logicalrelid::regclass AS table1,
p2.logicalrelid::regclass AS table2,
tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated
FROM
pg_dist_partition p1,
pg_dist_partition p2
WHERE
p1.logicalrelid < p2.logicalrelid AND
p1.colocationid != 0 AND
p2.colocationid != 0 AND
tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE
ORDER BY
table1,
table2;
table1 | table2 | colocated
---------------+----------------------------------+-----------
table1_group1 | table2_group1 | t
table1_groupb | table2_groupb | t
table1_groupc | table2_groupc | t
table1_groupd | table2_groupd | t
table1_groupd | table3_groupd | t
table2_groupd | table3_groupd | t
table1_groupe | table2_groupe | t
table1_groupe | table3_groupe | t
table1_groupe | schema_collocation.table4_groupe | t
table2_groupe | table3_groupe | t
table2_groupe | schema_collocation.table4_groupe | t
table3_groupe | schema_collocation.table4_groupe | t
(12 rows)
-- check created shards
SELECT
logicalrelid,
pg_dist_shard.shardid AS shardid,
shardstorage,
nodeport,
shardminvalue,
shardmaxvalue
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
pg_dist_shard.shardid >= 1300026
ORDER BY
logicalrelid,
shardmaxvalue::integer,
shardid,
placementid;
logicalrelid | shardid | shardstorage | nodeport | shardminvalue | shardmaxvalue
----------------------------------+---------+--------------+----------+---------------+---------------
table1_groupb | 1300026 | t | 57637 | -2147483648 | -1
table1_groupb | 1300027 | t | 57638 | 0 | 2147483647
table2_groupb | 1300028 | t | 57637 | -2147483648 | -1
table2_groupb | 1300029 | t | 57638 | 0 | 2147483647
table1_groupc | 1300030 | t | 57637 | -2147483648 | -1
table1_groupc | 1300030 | t | 57638 | -2147483648 | -1
table1_groupc | 1300031 | t | 57638 | 0 | 2147483647
table1_groupc | 1300031 | t | 57637 | 0 | 2147483647
table2_groupc | 1300032 | t | 57638 | -2147483648 | -1
table2_groupc | 1300032 | t | 57637 | -2147483648 | -1
table2_groupc | 1300033 | t | 57637 | 0 | 2147483647
table2_groupc | 1300033 | t | 57638 | 0 | 2147483647
table1_groupd | 1300034 | t | 57637 | -2147483648 | -1073741825
table1_groupd | 1300034 | t | 57638 | -2147483648 | -1073741825
table1_groupd | 1300035 | t | 57638 | -1073741824 | -1
table1_groupd | 1300035 | t | 57637 | -1073741824 | -1
table1_groupd | 1300036 | t | 57637 | 0 | 1073741823
table1_groupd | 1300036 | t | 57638 | 0 | 1073741823
table1_groupd | 1300037 | t | 57638 | 1073741824 | 2147483647
table1_groupd | 1300037 | t | 57637 | 1073741824 | 2147483647
table2_groupd | 1300038 | t | 57638 | -2147483648 | -1073741825
table2_groupd | 1300038 | t | 57637 | -2147483648 | -1073741825
table2_groupd | 1300039 | t | 57637 | -1073741824 | -1
table2_groupd | 1300039 | t | 57638 | -1073741824 | -1
table2_groupd | 1300040 | t | 57638 | 0 | 1073741823
table2_groupd | 1300040 | t | 57637 | 0 | 1073741823
table2_groupd | 1300041 | t | 57637 | 1073741824 | 2147483647
table2_groupd | 1300041 | t | 57638 | 1073741824 | 2147483647
table3_groupd | 1300042 | f | 57637 | -2147483648 | -1073741825
table3_groupd | 1300042 | f | 57638 | -2147483648 | -1073741825
table3_groupd | 1300043 | f | 57638 | -1073741824 | -1
table3_groupd | 1300043 | f | 57637 | -1073741824 | -1
table3_groupd | 1300044 | f | 57637 | 0 | 1073741823
table3_groupd | 1300044 | f | 57638 | 0 | 1073741823
table3_groupd | 1300045 | f | 57638 | 1073741824 | 2147483647
table3_groupd | 1300045 | f | 57637 | 1073741824 | 2147483647
table1_groupe | 1300046 | t | 57637 | -2147483648 | -1
table1_groupe | 1300046 | t | 57638 | -2147483648 | -1
table1_groupe | 1300047 | t | 57638 | 0 | 2147483647
table1_groupe | 1300047 | t | 57637 | 0 | 2147483647
table2_groupe | 1300048 | t | 57638 | -2147483648 | -1
table2_groupe | 1300048 | t | 57637 | -2147483648 | -1
table2_groupe | 1300049 | t | 57637 | 0 | 2147483647
table2_groupe | 1300049 | t | 57638 | 0 | 2147483647
table3_groupe | 1300050 | t | 57637 | -2147483648 | -1
table3_groupe | 1300050 | t | 57638 | -2147483648 | -1
table3_groupe | 1300051 | t | 57638 | 0 | 2147483647
table3_groupe | 1300051 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300052 | t | 57638 | -2147483648 | -1
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
(52 rows)

View File

@ -36,6 +36,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- ===================================================================
CREATE SEQUENCE colocation_test_seq
MINVALUE 1
MINVALUE 1000
NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same
@ -18,7 +18,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass
RETURNS BOOL
LANGUAGE plpgsql
AS $colocate_tables$
DECLARE nextid BIGINT;
DECLARE nextid INTEGER;
BEGIN
SELECT nextval('colocation_test_seq') INTO nextid;
@ -43,7 +43,7 @@ $colocate_tables$;
-- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT
RETURNS INTEGER
AS 'citus'
LANGUAGE C STRICT;
@ -154,3 +154,138 @@ SELECT find_shard_interval_index(1300001);
SELECT find_shard_interval_index(1300002);
SELECT find_shard_interval_index(1300003);
SELECT find_shard_interval_index(1300016);
-- check external colocation API
SET citus.shard_count = 2;
CREATE TABLE table1_groupA ( id int );
SELECT create_distributed_table('table1_groupA', 'id');
CREATE TABLE table2_groupA ( id int );
SELECT create_distributed_table('table2_groupA', 'id');
-- change shard replication factor
SET citus.shard_replication_factor = 1;
CREATE TABLE table1_groupB ( id int );
SELECT create_distributed_table('table1_groupB', 'id');
CREATE TABLE table2_groupB ( id int );
SELECT create_distributed_table('table2_groupB', 'id');
-- revert back to default shard replication factor
SET citus.shard_replication_factor to DEFAULT;
-- change partition column type
CREATE TABLE table1_groupC ( id text );
SELECT create_distributed_table('table1_groupC', 'id');
CREATE TABLE table2_groupC ( id text );
SELECT create_distributed_table('table2_groupC', 'id');
-- change shard count
SET citus.shard_count = 4;
CREATE TABLE table1_groupD ( id int );
SELECT create_distributed_table('table1_groupD', 'id');
CREATE TABLE table2_groupD ( id int );
SELECT create_distributed_table('table2_groupD', 'id');
-- try other distribution methods
CREATE TABLE table_append ( id int );
SELECT create_distributed_table('table_append', 'id', 'append');
CREATE TABLE table_range ( id int );
SELECT create_distributed_table('table_range', 'id', 'range');
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
-- create dropped colocation group again
SET citus.shard_count = 2;
CREATE TABLE table1_groupE ( id int );
SELECT create_distributed_table('table1_groupE', 'id');
CREATE TABLE table2_groupE ( id int );
SELECT create_distributed_table('table2_groupE', 'id');
-- test different table DDL
CREATE TABLE table3_groupE ( dummy_column text, id int );
SELECT create_distributed_table('table3_groupE', 'id');
-- test different schema
CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
\d schema_collocation.table4_groupE_1300052
\c - - - :master_port
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
-- cross check with internal colocation API
SELECT
p1.logicalrelid::regclass AS table1,
p2.logicalrelid::regclass AS table2,
tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated
FROM
pg_dist_partition p1,
pg_dist_partition p2
WHERE
p1.logicalrelid < p2.logicalrelid AND
p1.colocationid != 0 AND
p2.colocationid != 0 AND
tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE
ORDER BY
table1,
table2;
-- check created shards
SELECT
logicalrelid,
pg_dist_shard.shardid AS shardid,
shardstorage,
nodeport,
shardminvalue,
shardmaxvalue
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
pg_dist_shard.shardid >= 1300026
ORDER BY
logicalrelid,
shardmaxvalue::integer,
shardid,
placementid;

View File

@ -41,6 +41,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
-- drop extension an re-create in newest version
DROP EXTENSION citus;