Add create_distributed_table()

create_distributed_table() creates a hash distributed table with default values
of shard count and shard replication factor.
pull/867/head
Metin Doslu 2016-10-05 17:59:24 +03:00
parent d04f4f5935
commit 40bdafa8d1
18 changed files with 1253 additions and 214 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -78,6 +78,8 @@ $(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,106 @@
SET search_path = 'pg_catalog';
CREATE SEQUENCE citus.pg_dist_colocationid_seq
MINVALUE 1
MAXVALUE 4294967296;
ALTER SEQUENCE citus.pg_dist_colocationid_seq SET SCHEMA pg_catalog;
/* add pg_dist_colocation */
CREATE TABLE citus.pg_dist_colocation(
colocationid int NOT NULL PRIMARY KEY,
shardcount int NOT NULL,
replicationfactor int NOT NULL,
distributioncolumntype oid NOT NULL
);
ALTER TABLE citus.pg_dist_colocation SET SCHEMA pg_catalog;
CREATE INDEX pg_dist_colocation_configuration_index
ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype);
CREATE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type)
IS 'creates a distributed table';
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SECURITY DEFINER
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
node_names text[] := '{}';
node_ports bigint[] := '{}';
node_name text;
node_port bigint;
table_colocation_id integer;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
-- Must accumulate set of affected nodes before deleting placements, as
-- master_drop_all_shards will erase their rows, making it impossible for
-- us to know where to drop sequences (which must be dropped after shards,
-- since they have default value expressions which depend on sequences).
SELECT array_agg(sp.nodename), array_agg(sp.nodeport)
INTO node_names, node_ports
FROM pg_event_trigger_dropped_objects() AS dobj,
pg_dist_shard AS s,
pg_dist_shard_placement AS sp
WHERE dobj.object_type IN ('table', 'foreign table')
AND dobj.objid = s.logicalrelid
AND s.shardid = sp.shardid;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP
IF v_obj.object_type NOT IN ('table', 'foreign table') THEN
CONTINUE;
END IF;
-- nothing to do if not a distributed table
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN
CONTINUE;
END IF;
-- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- delete partition entry
DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN
DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id;
END IF;
END LOOP;
IF cardinality(sequence_names) = 0 THEN
RETURN;
END IF;
FOR node_name, node_port IN
SELECT DISTINCT name, port
FROM unnest(node_names, node_ports) AS nodes(name, port)
LOOP
PERFORM master_drop_sequences(sequence_names, node_name, node_port);
END LOOP;
END;
$cdbdt$;
COMMENT ON FUNCTION citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.0-10' default_version = '6.0-11'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -9,6 +9,7 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/hash.h" #include "access/hash.h"
@ -27,14 +28,17 @@
#include "catalog/pg_trigger.h" #include "catalog/pg_trigger.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h" #include "distributed/distribution_column.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "distributed/multi_logical_planner.h"
#include "nodes/execnodes.h" #include "nodes/execnodes.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -51,28 +55,35 @@
/* local function forward declarations */ /* local function forward declarations */
static void ConvertToDistributedTable(Oid relationId,
text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId);
static char LookupDistributionMethod(Oid distributionMethodOid); static char LookupDistributionMethod(Oid distributionMethodOid);
static void RecordDistributedRelationDependencies(Oid distributedRelationId, static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey); Node *distributionKey);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber); int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId); static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn);
static void InsertPgDistPartition(Oid relationId, char distributionMethod,
Node *distributionKey, uint32 colocationId);
static void CreateTruncateTrigger(Oid relationId); static void CreateTruncateTrigger(Oid relationId);
static uint32 ColocationId(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table);
/* /*
* master_create_distributed_table accepts a table, distribution column and * master_create_distributed_table accepts a table, distribution column and
* method and performs the corresponding catalog changes. * method and performs the corresponding catalog changes.
*
* XXX: We should perform more checks here to see if this table is fit for
* partitioning. At a minimum, we should validate the following: (i) this node
* runs as the master node, (ii) table does not make use of the inheritance
* mechanism, (iii) table does not own columns that are sequences, and (iv)
* table does not have collated columns. (v) table does not have
* preexisting content.
*/ */
Datum Datum
master_create_distributed_table(PG_FUNCTION_ARGS) master_create_distributed_table(PG_FUNCTION_ARGS)
@ -81,82 +92,172 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
text *distributionColumnText = PG_GETARG_TEXT_P(1); text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2); Oid distributionMethodOid = PG_GETARG_OID(2);
Relation distributedRelation = NULL; ConvertToDistributedTable(distributedRelationId, distributionColumnText,
TupleDesc relationDesc = NULL; distributionMethodOid, INVALID_COLOCATION_ID);
char *distributedRelationName = NULL; PG_RETURN_VOID();
char relationKind = '\0'; }
/*
* create_distributed_table accepts a table, distribution column and
* distribution method, then it creates a distributed table.
*/
Datum
create_distributed_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
Relation distributedRelation = NULL;
Relation pgDistColocation = NULL;
Node *distributionKey = NULL;
Var *distributionColumn = NULL;
char *distributionColumnName = NULL;
int distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
/* if distribution method is not hash, just create partition metadata */
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}
/* get distribution column type */
distributionColumnName = text_to_cstring(distributionColumnText);
distributedRelation = relation_open(relationId, AccessShareLock);
distributionKey = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumn = (Var *) distributionKey;
distributionColumnType = distributionColumn->vartype;
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the table until this
* transaction is committed.
*/
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
/* check for existing colocations */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionColumnType);
/*
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
{
char *relationName = get_rel_name(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
CreateColocatedShards(relationId, colocatedTableId);
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
relationName, colocationId)));
}
else
{
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
/* use the default way to create shards */
CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor);
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
PG_RETURN_VOID();
}
/*
* ConvertToDistributedTable converts the given regular PostgreSQL table into a
* distributed table. First, it checks if the given table can be distributed,
* then it creates related tuple in pg_dist_partition.
*
* XXX: We should perform more checks here to see if this table is fit for
* partitioning. At a minimum, we should validate the following: (i) this node
* runs as the master node, (ii) table does not make use of the inheritance
* mechanism, (iii) table does not own columns that are sequences, and (iv)
* table does not have collated columns.
*/
static void
ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId)
{
Relation relation = NULL;
TupleDesc relationDesc = NULL;
char *relationName = NULL;
char relationKind = 0;
Relation pgDistPartition = NULL;
char distributionMethod = LookupDistributionMethod(distributionMethodOid); char distributionMethod = LookupDistributionMethod(distributionMethodOid);
const char replicationModel = 'c';
char *distributionColumnName = text_to_cstring(distributionColumnText); char *distributionColumnName = text_to_cstring(distributionColumnText);
Node *distributionKey = NULL; Node *distributionKey = NULL;
Var *distributionColumn = NULL; Var *distributionColumn = NULL;
char *distributionKeyString = NULL;
List *indexOidList = NIL;
ListCell *indexOidCell = NULL;
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* /*
* Lock target relation with an access exclusive lock - there's no way to * Lock target relation with an access exclusive lock - there's no way to
* make sense of this table until we've committed, and we don't want * make sense of this table until we've committed, and we don't want
* multiple backends manipulating this relation. * multiple backends manipulating this relation.
*/ */
distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock); relation = relation_open(relationId, AccessExclusiveLock);
relationDesc = RelationGetDescr(distributedRelation); relationDesc = RelationGetDescr(relation);
distributedRelationName = RelationGetRelationName(distributedRelation); relationName = RelationGetRelationName(relation);
EnsureTableOwner(distributedRelationId); EnsureTableOwner(relationId);
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
/* check that the relation is not already distributed */ /* check that the relation is not already distributed */
if (IsDistributedTable(distributedRelationId)) if (IsDistributedTable(relationId))
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" is already distributed", errmsg("table \"%s\" is already distributed",
distributedRelationName))); relationName)));
} }
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */ /* verify target relation does not use WITH (OIDS) PostgreSQL feature */
if (relationDesc->tdhasoid) if (relationDesc->tdhasoid)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: %s", distributedRelationName), errmsg("cannot distribute relation: %s", relationName),
errdetail("Distributed relations must not specify the WITH " errdetail("Distributed relations must not specify the WITH "
"(OIDS) option in their definitions."))); "(OIDS) option in their definitions.")));
} }
/* verify target relation is either regular or foreign table */ /* verify target relation is either regular or foreign table */
relationKind = distributedRelation->rd_rel->relkind; relationKind = relation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
{ {
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("cannot distribute relation: %s", errmsg("cannot distribute relation: %s",
distributedRelationName), relationName),
errdetail("Distributed relations must be regular or " errdetail("Distributed relations must be regular or "
"foreign tables."))); "foreign tables.")));
} }
/* check that the relation does not contain any rows */ /* check that the relation does not contain any rows */
if (!LocalTableEmpty(distributedRelationId)) if (!LocalTableEmpty(relationId))
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"", errmsg("cannot distribute relation \"%s\"",
distributedRelationName), relationName),
errdetail("Relation \"%s\" contains data.", errdetail("Relation \"%s\" contains data.",
distributedRelationName), relationName),
errhint("Empty your table before distributing it."))); errhint("Empty your table before distributing it.")));
} }
distributionKey = BuildDistributionKeyFromColumnName(distributedRelation, distributionKey = BuildDistributionKeyFromColumnName(relation,
distributionColumnName); distributionColumnName);
distributionKeyString = nodeToString(distributionKey);
/* the distribution key should always be a Var for now */ /* the distribution key should always be a Var for now */
Assert(IsA(distributionKey, Var)); Assert(IsA(distributionKey, Var));
@ -193,17 +294,91 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
} }
} }
ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn);
InsertPgDistPartition(relationId, distributionMethod, distributionKey, colocationId);
relation_close(relation, NoLock);
/* /*
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, * PostgreSQL supports truncate trigger for regular relations only.
* since currently there is no way of enforcing uniqueness for overlapping shards. * Truncate on foreign tables is not supported.
*
* Similarly, do not allow such constraints it they do not
* include partition column. This check is important for two reasons. First,
* currently Citus does not enforce uniqueness constraint on multiple shards.
* Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no
* further check for constraints.
*/ */
indexOidList = RelationGetIndexList(distributedRelation); if (relationKind == RELKIND_RELATION)
{
CreateTruncateTrigger(relationId);
}
}
/*
* InsertPgDistPartition inserts a new tuple into pg_dist_partition.
*/
static void
InsertPgDistPartition(Oid relationId, char distributionMethod, Node *distributionKey,
uint32 colocationId)
{
Relation pgDistPartition = NULL;
const char replicationModel = 'c';
char *distributionKeyString = NULL;
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
distributionKeyString = nodeToString(distributionKey);
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionKeyString);
newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId;
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
simple_heap_insert(pgDistPartition, newTuple);
CatalogUpdateIndexes(pgDistPartition, newTuple);
CitusInvalidateRelcacheByRelid(relationId);
RecordDistributedRelationDependencies(relationId, distributionKey);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock);
}
/*
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude
* constraints.
*
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables,
* since currently there is no way of enforcing uniqueness for overlapping shards.
*
* Similarly, do not allow such constraints if they do not include partition column.
* This check is important for two reasons. First, currently Citus does not enforce
* uniqueness constraint on multiple shards.
* Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no
* further check for constraints.
*/
static void
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn)
{
char *relationName = RelationGetRelationName(relation);
List *indexOidList = RelationGetIndexList(relation);
ListCell *indexOidCell = NULL;
foreach(indexOidCell, indexOidList) foreach(indexOidCell, indexOidList)
{ {
Oid indexOid = lfirst_oid(indexOidCell); Oid indexOid = lfirst_oid(indexOidCell);
@ -233,7 +408,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
{ {
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint", errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint",
distributedRelationName), relationName),
errdetail("UNIQUE constraints, EXCLUDE constraints, " errdetail("UNIQUE constraints, EXCLUDE constraints, "
"and PRIMARY KEYs on " "and PRIMARY KEYs on "
"append-partitioned tables cannot be enforced."), "append-partitioned tables cannot be enforced."),
@ -271,7 +446,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot distribute relation: \"%s\"", errmsg("cannot distribute relation: \"%s\"",
distributedRelationName), relationName),
errdetail("Distributed relations cannot have UNIQUE, " errdetail("Distributed relations cannot have UNIQUE, "
"EXCLUDE, or PRIMARY KEY constraints that do not " "EXCLUDE, or PRIMARY KEY constraints that do not "
"include the partition column (with an equality " "include the partition column (with an equality "
@ -280,44 +455,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
index_close(indexDesc, NoLock); index_close(indexDesc, NoLock);
} }
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(distributedRelationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionKeyString);
newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID;
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
simple_heap_insert(pgDistPartition, newTuple);
CatalogUpdateIndexes(pgDistPartition, newTuple);
CitusInvalidateRelcacheByRelid(distributedRelationId);
RecordDistributedRelationDependencies(distributedRelationId, distributionKey);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock);
relation_close(distributedRelation, NoLock);
/*
* PostgreSQL supports truncate trigger for regular relations only.
* Truncate on foreign tables is not supported.
*/
if (relationKind == RELKIND_RELATION)
{
CreateTruncateTrigger(distributedRelationId);
}
PG_RETURN_VOID();
} }
@ -525,3 +662,128 @@ CreateTruncateTrigger(Oid relationId)
CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid,
internal); internal);
} }
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
static uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
ScanKeyData scanKey[3];
int scanKeyCount = 3;
bool indexOK = true;
/* acquire a lock, so that no one can do this concurrently */
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration.
*/
static uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
colocationId = GetNextColocationId();
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, NoLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}

View File

@ -58,13 +58,8 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards);
/* /*
* master_create_worker_shards creates empty shards for the given table based * master_create_worker_shards is a user facing function to create worker shards
* on the specified number of initial shards. The function first gets a list of * for the given relation in round robin order.
* candidate nodes and issues DDL commands on the nodes to create empty shard
* placements on those nodes. The function then updates metadata on the master
* node to make this shard (and its placements) visible. Note that the function
* assumes the table is hash partitioned and calculates the min/max hash token
* ranges for each shard, giving them an equal split of the hash space.
*/ */
Datum Datum
master_create_worker_shards(PG_FUNCTION_ARGS) master_create_worker_shards(PG_FUNCTION_ARGS)
@ -74,10 +69,27 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
int32 replicationFactor = PG_GETARG_INT32(2); int32 replicationFactor = PG_GETARG_INT32(2);
Oid distributedTableId = ResolveRelationId(tableNameText); Oid distributedTableId = ResolveRelationId(tableNameText);
char relationKind = get_rel_relkind(distributedTableId); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor);
char *tableName = text_to_cstring(tableNameText);
PG_RETURN_VOID();
}
/*
* CreateShardsWithRoundRobinPolicy creates empty shards for the given table
* based on the specified number of initial shards. The function first gets a
* list of candidate nodes and issues DDL commands on the nodes to create empty
* shard placements on those nodes. The function then updates metadata on the
* master node to make this shard (and its placements) visible. Note that the
* function assumes the table is hash partitioned and calculates the min/max
* hash token ranges for each shard, giving them an equal split of the hash space.
*/
void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor)
{
char *relationOwner = NULL; char *relationOwner = NULL;
char shardStorageType = '\0'; char shardStorageType = 0;
List *workerNodeList = NIL; List *workerNodeList = NIL;
List *ddlCommandList = NIL; List *ddlCommandList = NIL;
int32 workerNodeCount = 0; int32 workerNodeCount = 0;
@ -106,6 +118,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
existingShardList = LoadShardList(distributedTableId); existingShardList = LoadShardList(distributedTableId);
if (existingShardList != NIL) if (existingShardList != NIL)
{ {
char *tableName = get_rel_name(distributedTableId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already had shards created for it", errmsg("table \"%s\" has already had shards created for it",
tableName))); tableName)));
@ -156,22 +169,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
} }
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
if (relationKind == RELKIND_FOREIGN_TABLE) shardStorageType = ShardStorageType(distributedTableId);
{
bool cstoreTable = CStoreTable(distributedTableId);
if (cstoreTable)
{
shardStorageType = SHARD_STORAGE_COLUMNAR;
}
else
{
shardStorageType = SHARD_STORAGE_FOREIGN;
}
}
else
{
shardStorageType = SHARD_STORAGE_TABLE;
}
for (shardIndex = 0; shardIndex < shardCount; shardIndex++) for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{ {
@ -182,8 +180,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
text *maxHashTokenText = NULL; text *maxHashTokenText = NULL;
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
Datum shardIdDatum = master_get_new_shardid(NULL); uint64 shardId = GetNextShardId();
int64 shardId = DatumGetInt64(shardIdDatum);
/* if we are at the last shard, make sure the max token value is INT_MAX */ /* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1)) if (shardIndex == (shardCount - 1))
@ -217,8 +214,104 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
} }
RESUME_INTERRUPTS(); RESUME_INTERRUPTS();
}
PG_RETURN_VOID();
/*
* CreateColocatedShards creates shards for the target relation colocated with
* the source relation.
*/
void
CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
{
char *targetTableRelationOwner = NULL;
char targetShardStorageType = 0;
List *existingShardList = NIL;
List *sourceShardIntervalList = NIL;
List *targetTableDDLEvents = NIL;
ListCell *sourceShardCell = NULL;
/* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId);
CheckHashPartitionedTable(sourceRelationId);
/*
* In contrast to append/range partitioned tables it makes more sense to
* require ownership privileges - shards for hash-partitioned tables are
* only created once, not continually during ingest as for the other
* partitioning types.
*/
EnsureTableOwner(targetRelationId);
/* we plan to add shards: get an exclusive metadata lock on the target relation */
LockRelationDistributionMetadata(targetRelationId, ExclusiveLock);
/* a share metadata lock is enough on the source relation */
LockRelationDistributionMetadata(sourceRelationId, ShareLock);
/* prevent concurrent placement changes */
sourceShardIntervalList = LoadShardIntervalList(sourceRelationId);
LockShardListMetadata(sourceShardIntervalList, ShareLock);
/* validate that shards haven't already been created for this table */
existingShardList = LoadShardList(targetRelationId);
if (existingShardList != NIL)
{
char *targetRelationName = get_rel_name(targetRelationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" has already had shards created for it",
targetRelationName)));
}
targetTableRelationOwner = TableOwner(targetRelationId);
targetTableDDLEvents = GetTableDDLEvents(targetRelationId);
targetShardStorageType = ShardStorageType(targetRelationId);
foreach(sourceShardCell, sourceShardIntervalList)
{
ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell);
uint64 sourceShardId = sourceShardInterval->shardId;
uint64 newShardId = GetNextShardId();
ListCell *sourceShardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue);
ListCell *sourceShardPlacementCell = NULL;
foreach(sourceShardPlacementCell, sourceShardPlacementList)
{
ShardPlacement *sourcePlacement =
(ShardPlacement *) lfirst(sourceShardPlacementCell);
char *sourceNodeName = sourcePlacement->nodeName;
int32 sourceNodePort = sourcePlacement->nodePort;
bool created = WorkerCreateShard(targetRelationId, sourceNodeName,
sourceNodePort, newShardId,
targetTableRelationOwner,
targetTableDDLEvents);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0;
InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState,
shardSize, sourceNodeName, sourceNodePort);
}
else
{
char *targetRelationName = get_rel_name(targetRelationId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("table \"%s\" could not be colocated with %s",
targetRelationName, sourceRelationName)));
}
}
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
shardMinValueText, shardMaxValueText);
}
} }

View File

@ -94,8 +94,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
HeapTuple metadataTuple = NULL; HeapTuple metadataTuple = NULL;
TupleDesc metadataDescriptor = NULL; TupleDesc metadataDescriptor = NULL;
uint64 shardMaxSizeInBytes = 0; uint64 shardMaxSizeInBytes = 0;
char relationType = 0; char shardStorageType = 0;
char storageType = 0;
Datum values[TABLE_METADATA_FIELDS]; Datum values[TABLE_METADATA_FIELDS];
bool isNulls[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS];
@ -122,26 +121,10 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
/* get storage type */ /* get storage type */
relationType = get_rel_relkind(relationId); shardStorageType = ShardStorageType(relationId);
if (relationType == RELKIND_RELATION)
{
storageType = SHARD_STORAGE_TABLE;
}
else if (relationType == RELKIND_FOREIGN_TABLE)
{
bool cstoreTable = CStoreTable(relationId);
if (cstoreTable)
{
storageType = SHARD_STORAGE_COLUMNAR;
}
else
{
storageType = SHARD_STORAGE_FOREIGN;
}
}
values[0] = ObjectIdGetDatum(relationId); values[0] = ObjectIdGetDatum(relationId);
values[1] = storageType; values[1] = shardStorageType;
values[2] = partitionEntry->partitionMethod; values[2] = partitionEntry->partitionMethod;
values[3] = partitionKey; values[3] = partitionKey;
values[4] = Int32GetDatum(ShardReplicationFactor); values[4] = Int32GetDatum(ShardReplicationFactor);
@ -730,6 +713,41 @@ GetTableDDLEvents(Oid relationId)
} }
/*
* ShardStorageType returns the shard storage type according to relation type.
*/
char
ShardStorageType(Oid relationId)
{
char shardStorageType = 0;
char relationType = get_rel_relkind(relationId);
if (relationType == RELKIND_RELATION)
{
shardStorageType = SHARD_STORAGE_TABLE;
}
else if (relationType == RELKIND_FOREIGN_TABLE)
{
bool cstoreTable = CStoreTable(relationId);
if (cstoreTable)
{
shardStorageType = SHARD_STORAGE_COLUMNAR;
}
else
{
shardStorageType = SHARD_STORAGE_FOREIGN;
}
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unexpected relation type: %c", relationType)));
}
return shardStorageType;
}
/* /*
* WorkerNodeGetDatum converts the worker node passed to it into its datum * WorkerNodeGetDatum converts the worker node passed to it into its datum
* representation. To do this, the function first creates the heap tuple from * representation. To do this, the function first creates the heap tuple from

View File

@ -41,8 +41,6 @@
/* Local functions forward declarations */ /* Local functions forward declarations */
static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId,
char *shardName, uint64 *shardSize, char *shardName, uint64 *shardSize,
text **shardMinValue, text **shardMaxValue); text **shardMinValue, text **shardMaxValue);
@ -426,7 +424,7 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
* shard on the worker node. Note that this function opens a new connection for * shard on the worker node. Note that this function opens a new connection for
* each DDL command, and could leave the shard in an half-initialized state. * each DDL command, and could leave the shard in an half-initialized state.
*/ */
static bool bool
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList) uint64 shardId, char *newShardOwner, List *ddlCommandList)
{ {

View File

@ -134,7 +134,7 @@ ColocatedTableList(Oid distributedTableId)
} }
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId)); BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId));
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition); tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -219,3 +219,44 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
return colocatedShardList; return colocatedShardList;
} }
/*
* ColocatedTableId returns an arbitrary table which belongs to given colocation
* group. If there is not such a colocation group, it returns invalid oid.
*/
Oid
ColocatedTableId(Oid colocationId)
{
Oid colocatedTableId = InvalidOid;
Relation pgDistPartition = NULL;
TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
bool indexOK = true;
bool isNull = false;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
/* prevent DELETE statements */
pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionColocationidIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
}
systable_endscan(scanDescriptor);
heap_close(pgDistPartition, ShareLock);
return colocatedTableId;
}

View File

@ -55,6 +55,8 @@ static Oid distShardRelationId = InvalidOid;
static Oid distShardPlacementRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid;
static Oid distNodeRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid;
static Oid distLocalGroupRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid;
static Oid distColocationRelationId = InvalidOid;
static Oid distColocationConfigurationIndexId = InvalidOid;
static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionRelationId = InvalidOid;
static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid;
static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid;
@ -690,6 +692,27 @@ DistLocalGroupIdRelationId(void)
} }
/* return oid of pg_dist_colocation relation */
Oid
DistColocationRelationId(void)
{
CachedRelationLookup("pg_dist_colocation", &distColocationRelationId);
return distColocationRelationId;
}
/* return oid of pg_dist_colocation_configuration_index index */
Oid
DistColocationConfigurationIndexId(void)
{
CachedRelationLookup("pg_dist_colocation_configuration_index",
&distColocationConfigurationIndexId);
return distColocationConfigurationIndexId;
}
/* return oid of pg_dist_partition relation */ /* return oid of pg_dist_partition relation */
Oid Oid
DistPartitionRelationId(void) DistPartitionRelationId(void)
@ -1419,6 +1442,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
distShardPlacementRelationId = InvalidOid; distShardPlacementRelationId = InvalidOid;
distLocalGroupRelationId = InvalidOid; distLocalGroupRelationId = InvalidOid;
distNodeRelationId = InvalidOid; distNodeRelationId = InvalidOid;
distColocationRelationId = InvalidOid;
distColocationConfigurationIndexId = InvalidOid;
distPartitionRelationId = InvalidOid; distPartitionRelationId = InvalidOid;
distPartitionLogicalRelidIndexId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid;
distPartitionColocationidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid;

View File

@ -23,5 +23,7 @@ extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval); ShardInterval *rightShardInterval);
extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId);
#endif /* COLOCATION_UTILS_H_ */ #endif /* COLOCATION_UTILS_H_ */

View File

@ -92,11 +92,17 @@ extern int ShardPlacementPolicy;
extern bool CStoreTable(Oid relationId); extern bool CStoreTable(Oid relationId);
extern Oid ResolveRelationId(text *relationName); extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableDDLEvents(Oid relationId);
extern char ShardStorageType(Oid relationId);
extern void CheckDistributedTable(Oid relationId); extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
char *newPlacementOwner, List *workerNodeList, char *newPlacementOwner, List *workerNodeList,
int workerStartIndex, int replicationFactor); int workerStartIndex, int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList);
/* Function declarations for generating metadata for shard and placement creation */ /* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);

View File

@ -66,6 +66,8 @@ extern bool CitusHasBeenLoaded(void);
extern HTAB * GetWorkerNodeHash(void); extern HTAB * GetWorkerNodeHash(void);
/* relation oids */ /* relation oids */
extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
extern Oid DistPartitionRelationId(void); extern Oid DistPartitionRelationId(void);
extern Oid DistShardRelationId(void); extern Oid DistShardRelationId(void);
extern Oid DistShardPlacementRelationId(void); extern Oid DistShardPlacementRelationId(void);

View File

@ -0,0 +1,47 @@
/*-------------------------------------------------------------------------
*
* pg_dist_colocation.h
* definition of the relation that holds the colocation information on the
* cluster (pg_dist_colocation).
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_COLOCATION_H
#define PG_DIST_COLOCATION_H
/* ----------------
* pg_dist_colocation definition.
* ----------------
*/
typedef struct FormData_pg_dist_colocation
{
int colocationid;
int shardcount;
int replicationfactor;
Oid distributioncolumntype;
} FormData_pg_dist_colocation;
/* ----------------
* Form_pg_dist_colocation corresponds to a pointer to a tuple with
* the format of pg_dist_colocation relation.
* ----------------
*/
typedef FormData_pg_dist_colocation *Form_pg_dist_colocation;
/* ----------------
* compiler constants for pg_dist_colocation
* ----------------
*/
#define Natts_pg_dist_colocation 4
#define Anum_pg_dist_colocation_colocationid 1
#define Anum_pg_dist_colocation_shardcount 2
#define Anum_pg_dist_colocation_replicationfactor 3
#define Anum_pg_dist_colocation_distributioncolumntype 4
#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq"
#endif /* PG_DIST_COLOCATION_H */

View File

@ -19,22 +19,22 @@ WHERE
ORDER BY s.shardid; ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------ ---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3 1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1 | 1 1300000 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57637 | 1 | 1 1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1 | 1 1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1 | 1 1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1 | 1 1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1 | 1 1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1 | 1 1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1 | 3 1300004 | table2_group1 | 57638 | 1000 | 3
1300004 | table2_group1 | 57637 | 1 | 1 1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1 | 1 1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1 | 1 1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1 | 1 1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1 | 1 1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1 | 1 1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1 | 1 1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows) (16 rows)
-- repair colocated shards -- repair colocated shards
@ -55,22 +55,22 @@ WHERE
ORDER BY s.shardid; ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------ ---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 1 1300000 | table1_group1 | 57638 | 1000 | 1
1300000 | table1_group1 | 57637 | 1 | 1 1300000 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57637 | 1 | 1 1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1 | 1 1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1 | 1 1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1 | 1 1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1 | 1 1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1 | 1 1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1 | 1 1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1 | 1 1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1 | 1 1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1 | 1 1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1 | 1 1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1 | 1 1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1 | 1 1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1 | 1 1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows) (16 rows)
-- test repairing NOT colocated shard -- test repairing NOT colocated shard
@ -179,22 +179,22 @@ WHERE
ORDER BY s.shardid; ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------ ---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3 1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1 | 3 1300000 | table1_group1 | 57637 | 1000 | 3
1300001 | table1_group1 | 57637 | 1 | 1 1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1 | 1 1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1 | 1 1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1 | 1 1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1 | 1 1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1 | 1 1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1 | 1 1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1 | 1 1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1 | 1 1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1 | 1 1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1 | 1 1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1 | 1 1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1 | 1 1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1 | 1 1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows) (16 rows)
-- repair while all placements of one shard in colocation group is unhealthy -- repair while all placements of one shard in colocation group is unhealthy
@ -211,21 +211,21 @@ WHERE
ORDER BY s.shardid; ORDER BY s.shardid;
shardid | logicalrelid | nodeport | colocationid | shardstate shardid | logicalrelid | nodeport | colocationid | shardstate
---------+---------------+----------+--------------+------------ ---------+---------------+----------+--------------+------------
1300000 | table1_group1 | 57638 | 1 | 3 1300000 | table1_group1 | 57638 | 1000 | 3
1300000 | table1_group1 | 57637 | 1 | 3 1300000 | table1_group1 | 57637 | 1000 | 3
1300001 | table1_group1 | 57637 | 1 | 1 1300001 | table1_group1 | 57637 | 1000 | 1
1300001 | table1_group1 | 57638 | 1 | 1 1300001 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57638 | 1 | 1 1300002 | table1_group1 | 57638 | 1000 | 1
1300002 | table1_group1 | 57637 | 1 | 1 1300002 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57637 | 1 | 1 1300003 | table1_group1 | 57637 | 1000 | 1
1300003 | table1_group1 | 57638 | 1 | 1 1300003 | table1_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57638 | 1 | 1 1300004 | table2_group1 | 57638 | 1000 | 1
1300004 | table2_group1 | 57637 | 1 | 1 1300004 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57637 | 1 | 1 1300005 | table2_group1 | 57637 | 1000 | 1
1300005 | table2_group1 | 57638 | 1 | 1 1300005 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57638 | 1 | 1 1300006 | table2_group1 | 57638 | 1000 | 1
1300006 | table2_group1 | 57637 | 1 | 1 1300006 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57637 | 1 | 1 1300007 | table2_group1 | 57637 | 1000 | 1
1300007 | table2_group1 | 57638 | 1 | 1 1300007 | table2_group1 | 57638 | 1000 | 1
(16 rows) (16 rows)

View File

@ -4,7 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- create test utility function -- create test utility function
-- =================================================================== -- ===================================================================
CREATE SEQUENCE colocation_test_seq CREATE SEQUENCE colocation_test_seq
MINVALUE 1 MINVALUE 1000
NO CYCLE; NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same /* a very simple UDF that only sets the colocation ids the same
* DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of * DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of
@ -14,7 +14,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass
RETURNS BOOL RETURNS BOOL
LANGUAGE plpgsql LANGUAGE plpgsql
AS $colocate_tables$ AS $colocate_tables$
DECLARE nextid BIGINT; DECLARE nextid INTEGER;
BEGIN BEGIN
SELECT nextval('colocation_test_seq') INTO nextid; SELECT nextval('colocation_test_seq') INTO nextid;
@ -155,7 +155,7 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1');
SELECT get_table_colocation_id('table1_group1'); SELECT get_table_colocation_id('table1_group1');
get_table_colocation_id get_table_colocation_id
------------------------- -------------------------
1 1000
(1 row) (1 row)
SELECT get_table_colocation_id('table5_groupX'); SELECT get_table_colocation_id('table5_groupX');
@ -189,7 +189,7 @@ SELECT tables_colocated('table6_append', 'table6_append');
t t
(1 row) (1 row)
-- check table co-location with same co-location group -- check table co-location with same co-location group
SELECT tables_colocated('table1_group1', 'table2_group1'); SELECT tables_colocated('table1_group1', 'table2_group1');
tables_colocated tables_colocated
------------------ ------------------
@ -339,3 +339,303 @@ SELECT find_shard_interval_index(1300016);
0 0
(1 row) (1 row)
-- check external colocation API
SET citus.shard_count = 2;
CREATE TABLE table1_groupA ( id int );
SELECT create_distributed_table('table1_groupA', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupA ( id int );
SELECT create_distributed_table('table2_groupA', 'id');
create_distributed_table
--------------------------
(1 row)
-- change shard replication factor
SET citus.shard_replication_factor = 1;
CREATE TABLE table1_groupB ( id int );
SELECT create_distributed_table('table1_groupB', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupB ( id int );
SELECT create_distributed_table('table2_groupB', 'id');
create_distributed_table
--------------------------
(1 row)
-- revert back to default shard replication factor
SET citus.shard_replication_factor to DEFAULT;
-- change partition column type
CREATE TABLE table1_groupC ( id text );
SELECT create_distributed_table('table1_groupC', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupC ( id text );
SELECT create_distributed_table('table2_groupC', 'id');
create_distributed_table
--------------------------
(1 row)
-- change shard count
SET citus.shard_count = 4;
CREATE TABLE table1_groupD ( id int );
SELECT create_distributed_table('table1_groupD', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupD ( id int );
SELECT create_distributed_table('table2_groupD', 'id');
create_distributed_table
--------------------------
(1 row)
-- try other distribution methods
CREATE TABLE table_append ( id int );
SELECT create_distributed_table('table_append', 'id', 'append');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table_range ( id int );
SELECT create_distributed_table('table_range', 'id', 'range');
create_distributed_table
--------------------------
(1 row)
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
create_distributed_table
--------------------------
(1 row)
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
(4 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
logicalrelid | colocationid
---------------+--------------
table1_groupa | 1
table2_groupa | 1
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table3_groupd | 4
(9 rows)
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1 | 2 | 2 | 23
(1 row)
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
(0 rows)
-- create dropped colocation group again
SET citus.shard_count = 2;
CREATE TABLE table1_groupE ( id int );
SELECT create_distributed_table('table1_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE table2_groupE ( id int );
SELECT create_distributed_table('table2_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- test different table DDL
CREATE TABLE table3_groupE ( dummy_column text, id int );
SELECT create_distributed_table('table3_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- test different schema
CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
create_distributed_table
--------------------------
(1 row)
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
Table "public.table3_groupe_1300050"
Column | Type | Modifiers
--------------+---------+-----------
dummy_column | text |
id | integer |
\d schema_collocation.table4_groupE_1300052
Table "schema_collocation.table4_groupe_1300052"
Column | Type | Modifiers
--------+---------+-----------
id | integer |
\c - - - :master_port
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
2 | 2 | 1 | 23
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
(4 rows)
-- cross check with internal colocation API
SELECT
p1.logicalrelid::regclass AS table1,
p2.logicalrelid::regclass AS table2,
tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated
FROM
pg_dist_partition p1,
pg_dist_partition p2
WHERE
p1.logicalrelid < p2.logicalrelid AND
p1.colocationid != 0 AND
p2.colocationid != 0 AND
tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE
ORDER BY
table1,
table2;
table1 | table2 | colocated
---------------+----------------------------------+-----------
table1_group1 | table2_group1 | t
table1_groupb | table2_groupb | t
table1_groupc | table2_groupc | t
table1_groupd | table2_groupd | t
table1_groupd | table3_groupd | t
table2_groupd | table3_groupd | t
table1_groupe | table2_groupe | t
table1_groupe | table3_groupe | t
table1_groupe | schema_collocation.table4_groupe | t
table2_groupe | table3_groupe | t
table2_groupe | schema_collocation.table4_groupe | t
table3_groupe | schema_collocation.table4_groupe | t
(12 rows)
-- check created shards
SELECT
logicalrelid,
pg_dist_shard.shardid AS shardid,
shardstorage,
nodeport,
shardminvalue,
shardmaxvalue
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
pg_dist_shard.shardid >= 1300026
ORDER BY
logicalrelid,
shardmaxvalue::integer,
shardid,
placementid;
logicalrelid | shardid | shardstorage | nodeport | shardminvalue | shardmaxvalue
----------------------------------+---------+--------------+----------+---------------+---------------
table1_groupb | 1300026 | t | 57637 | -2147483648 | -1
table1_groupb | 1300027 | t | 57638 | 0 | 2147483647
table2_groupb | 1300028 | t | 57637 | -2147483648 | -1
table2_groupb | 1300029 | t | 57638 | 0 | 2147483647
table1_groupc | 1300030 | t | 57637 | -2147483648 | -1
table1_groupc | 1300030 | t | 57638 | -2147483648 | -1
table1_groupc | 1300031 | t | 57638 | 0 | 2147483647
table1_groupc | 1300031 | t | 57637 | 0 | 2147483647
table2_groupc | 1300032 | t | 57638 | -2147483648 | -1
table2_groupc | 1300032 | t | 57637 | -2147483648 | -1
table2_groupc | 1300033 | t | 57637 | 0 | 2147483647
table2_groupc | 1300033 | t | 57638 | 0 | 2147483647
table1_groupd | 1300034 | t | 57637 | -2147483648 | -1073741825
table1_groupd | 1300034 | t | 57638 | -2147483648 | -1073741825
table1_groupd | 1300035 | t | 57638 | -1073741824 | -1
table1_groupd | 1300035 | t | 57637 | -1073741824 | -1
table1_groupd | 1300036 | t | 57637 | 0 | 1073741823
table1_groupd | 1300036 | t | 57638 | 0 | 1073741823
table1_groupd | 1300037 | t | 57638 | 1073741824 | 2147483647
table1_groupd | 1300037 | t | 57637 | 1073741824 | 2147483647
table2_groupd | 1300038 | t | 57638 | -2147483648 | -1073741825
table2_groupd | 1300038 | t | 57637 | -2147483648 | -1073741825
table2_groupd | 1300039 | t | 57637 | -1073741824 | -1
table2_groupd | 1300039 | t | 57638 | -1073741824 | -1
table2_groupd | 1300040 | t | 57638 | 0 | 1073741823
table2_groupd | 1300040 | t | 57637 | 0 | 1073741823
table2_groupd | 1300041 | t | 57637 | 1073741824 | 2147483647
table2_groupd | 1300041 | t | 57638 | 1073741824 | 2147483647
table3_groupd | 1300042 | f | 57637 | -2147483648 | -1073741825
table3_groupd | 1300042 | f | 57638 | -2147483648 | -1073741825
table3_groupd | 1300043 | f | 57638 | -1073741824 | -1
table3_groupd | 1300043 | f | 57637 | -1073741824 | -1
table3_groupd | 1300044 | f | 57637 | 0 | 1073741823
table3_groupd | 1300044 | f | 57638 | 0 | 1073741823
table3_groupd | 1300045 | f | 57638 | 1073741824 | 2147483647
table3_groupd | 1300045 | f | 57637 | 1073741824 | 2147483647
table1_groupe | 1300046 | t | 57637 | -2147483648 | -1
table1_groupe | 1300046 | t | 57638 | -2147483648 | -1
table1_groupe | 1300047 | t | 57638 | 0 | 2147483647
table1_groupe | 1300047 | t | 57637 | 0 | 2147483647
table2_groupe | 1300048 | t | 57638 | -2147483648 | -1
table2_groupe | 1300048 | t | 57637 | -2147483648 | -1
table2_groupe | 1300049 | t | 57637 | 0 | 2147483647
table2_groupe | 1300049 | t | 57638 | 0 | 2147483647
table3_groupe | 1300050 | t | 57637 | -2147483648 | -1
table3_groupe | 1300050 | t | 57638 | -2147483648 | -1
table3_groupe | 1300051 | t | 57638 | 0 | 2147483647
table3_groupe | 1300051 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300052 | t | 57638 | -2147483648 | -1
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
(52 rows)

View File

@ -36,6 +36,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
-- =================================================================== -- ===================================================================
CREATE SEQUENCE colocation_test_seq CREATE SEQUENCE colocation_test_seq
MINVALUE 1 MINVALUE 1000
NO CYCLE; NO CYCLE;
/* a very simple UDF that only sets the colocation ids the same /* a very simple UDF that only sets the colocation ids the same
@ -18,7 +18,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass
RETURNS BOOL RETURNS BOOL
LANGUAGE plpgsql LANGUAGE plpgsql
AS $colocate_tables$ AS $colocate_tables$
DECLARE nextid BIGINT; DECLARE nextid INTEGER;
BEGIN BEGIN
SELECT nextval('colocation_test_seq') INTO nextid; SELECT nextval('colocation_test_seq') INTO nextid;
@ -110,7 +110,7 @@ SELECT tables_colocated('table1_group1', 'table1_group1');
SELECT tables_colocated('table5_groupX', 'table5_groupX'); SELECT tables_colocated('table5_groupX', 'table5_groupX');
SELECT tables_colocated('table6_append', 'table6_append'); SELECT tables_colocated('table6_append', 'table6_append');
-- check table co-location with same co-location group -- check table co-location with same co-location group
SELECT tables_colocated('table1_group1', 'table2_group1'); SELECT tables_colocated('table1_group1', 'table2_group1');
-- check table co-location with different co-location group -- check table co-location with different co-location group
@ -154,3 +154,138 @@ SELECT find_shard_interval_index(1300001);
SELECT find_shard_interval_index(1300002); SELECT find_shard_interval_index(1300002);
SELECT find_shard_interval_index(1300003); SELECT find_shard_interval_index(1300003);
SELECT find_shard_interval_index(1300016); SELECT find_shard_interval_index(1300016);
-- check external colocation API
SET citus.shard_count = 2;
CREATE TABLE table1_groupA ( id int );
SELECT create_distributed_table('table1_groupA', 'id');
CREATE TABLE table2_groupA ( id int );
SELECT create_distributed_table('table2_groupA', 'id');
-- change shard replication factor
SET citus.shard_replication_factor = 1;
CREATE TABLE table1_groupB ( id int );
SELECT create_distributed_table('table1_groupB', 'id');
CREATE TABLE table2_groupB ( id int );
SELECT create_distributed_table('table2_groupB', 'id');
-- revert back to default shard replication factor
SET citus.shard_replication_factor to DEFAULT;
-- change partition column type
CREATE TABLE table1_groupC ( id text );
SELECT create_distributed_table('table1_groupC', 'id');
CREATE TABLE table2_groupC ( id text );
SELECT create_distributed_table('table2_groupC', 'id');
-- change shard count
SET citus.shard_count = 4;
CREATE TABLE table1_groupD ( id int );
SELECT create_distributed_table('table1_groupD', 'id');
CREATE TABLE table2_groupD ( id int );
SELECT create_distributed_table('table2_groupD', 'id');
-- try other distribution methods
CREATE TABLE table_append ( id int );
SELECT create_distributed_table('table_append', 'id', 'append');
CREATE TABLE table_range ( id int );
SELECT create_distributed_table('table_range', 'id', 'range');
-- test foreign table creation
CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server;
SELECT create_distributed_table('table3_groupD', 'id');
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
-- check effects of dropping tables
DROP TABLE table1_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
-- dropping all tables in a colocation group also deletes the colocation group
DROP TABLE table2_groupA;
SELECT * FROM pg_dist_colocation WHERE colocationid = 1;
-- create dropped colocation group again
SET citus.shard_count = 2;
CREATE TABLE table1_groupE ( id int );
SELECT create_distributed_table('table1_groupE', 'id');
CREATE TABLE table2_groupE ( id int );
SELECT create_distributed_table('table2_groupE', 'id');
-- test different table DDL
CREATE TABLE table3_groupE ( dummy_column text, id int );
SELECT create_distributed_table('table3_groupE', 'id');
-- test different schema
CREATE SCHEMA schema_collocation;
CREATE TABLE schema_collocation.table4_groupE ( id int );
SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
-- check worker table schemas
\c - - - :worker_1_port
\d table3_groupE_1300050
\d schema_collocation.table4_groupE_1300052
\c - - - :master_port
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
-- cross check with internal colocation API
SELECT
p1.logicalrelid::regclass AS table1,
p2.logicalrelid::regclass AS table2,
tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated
FROM
pg_dist_partition p1,
pg_dist_partition p2
WHERE
p1.logicalrelid < p2.logicalrelid AND
p1.colocationid != 0 AND
p2.colocationid != 0 AND
tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE
ORDER BY
table1,
table2;
-- check created shards
SELECT
logicalrelid,
pg_dist_shard.shardid AS shardid,
shardstorage,
nodeport,
shardminvalue,
shardmaxvalue
FROM
pg_dist_shard,
pg_dist_shard_placement
WHERE
pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND
pg_dist_shard.shardid >= 1300026
ORDER BY
logicalrelid,
shardmaxvalue::integer,
shardid,
placementid;

View File

@ -41,6 +41,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7';
ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;