From 40bdafa8d11e15cdd61fd6a0cee807664e92a4d1 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Wed, 5 Oct 2016 17:59:24 +0300 Subject: [PATCH] Add create_distributed_table() create_distributed_table() creates a hash distributed table with default values of shard count and shard replication factor. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.0-10--6.0-11.sql | 106 +++++ src/backend/distributed/citus.control | 2 +- .../commands/create_distributed_table.c | 438 ++++++++++++++---- .../distributed/master/master_create_shards.c | 151 ++++-- .../distributed/master/master_node_protocol.c | 58 ++- .../master/master_stage_protocol.c | 4 +- .../distributed/utils/colocation_utils.c | 43 +- .../distributed/utils/metadata_cache.c | 25 + src/include/distributed/colocation_utils.h | 2 + src/include/distributed/master_protocol.h | 6 + src/include/distributed/metadata_cache.h | 2 + src/include/distributed/pg_dist_colocation.h | 47 ++ .../multi_colocated_shard_transfer.out | 128 ++--- .../expected/multi_colocation_utils.out | 308 +++++++++++- src/test/regress/expected/multi_extension.out | 1 + .../regress/sql/multi_colocation_utils.sql | 141 +++++- src/test/regress/sql/multi_extension.sql | 1 + 18 files changed, 1253 insertions(+), 214 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-10--6.0-11.sql create mode 100644 src/include/distributed/pg_dist_colocation.h diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index b06e5a876..beb90d024 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -78,6 +78,8 @@ $(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql cat $^ > $@ $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql cat $^ > $@ +$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-10--6.0-11.sql b/src/backend/distributed/citus--6.0-10--6.0-11.sql new file mode 100644 index 000000000..9c818c293 --- /dev/null +++ b/src/backend/distributed/citus--6.0-10--6.0-11.sql @@ -0,0 +1,106 @@ +SET search_path = 'pg_catalog'; + +CREATE SEQUENCE citus.pg_dist_colocationid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +ALTER SEQUENCE citus.pg_dist_colocationid_seq SET SCHEMA pg_catalog; + +/* add pg_dist_colocation */ +CREATE TABLE citus.pg_dist_colocation( + colocationid int NOT NULL PRIMARY KEY, + shardcount int NOT NULL, + replicationfactor int NOT NULL, + distributioncolumntype oid NOT NULL +); + +ALTER TABLE citus.pg_dist_colocation SET SCHEMA pg_catalog; + +CREATE INDEX pg_dist_colocation_configuration_index +ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type) + IS 'creates a distributed table'; + + +CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger() + RETURNS event_trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog + AS $cdbdt$ +DECLARE + v_obj record; + sequence_names text[] := '{}'; + node_names text[] := '{}'; + node_ports bigint[] := '{}'; + node_name text; + node_port bigint; + table_colocation_id integer; +BEGIN + -- collect set of dropped sequences to drop on workers later + SELECT array_agg(object_identity) INTO sequence_names + FROM pg_event_trigger_dropped_objects() + WHERE object_type = 'sequence'; + + -- Must accumulate set of affected nodes before deleting placements, as + -- master_drop_all_shards will erase their rows, making it impossible for + -- us to know where to drop sequences (which must be dropped after shards, + -- since they have default value expressions which depend on sequences). + SELECT array_agg(sp.nodename), array_agg(sp.nodeport) + INTO node_names, node_ports + FROM pg_event_trigger_dropped_objects() AS dobj, + pg_dist_shard AS s, + pg_dist_shard_placement AS sp + WHERE dobj.object_type IN ('table', 'foreign table') + AND dobj.objid = s.logicalrelid + AND s.shardid = sp.shardid; + + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP + IF v_obj.object_type NOT IN ('table', 'foreign table') THEN + CONTINUE; + END IF; + + -- nothing to do if not a distributed table + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN + CONTINUE; + END IF; + + -- ensure all shards are dropped + PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + + SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + + -- delete partition entry + DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN + DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id; + END IF; + END LOOP; + + IF cardinality(sequence_names) = 0 THEN + RETURN; + END IF; + + FOR node_name, node_port IN + SELECT DISTINCT name, port + FROM unnest(node_names, node_ports) AS nodes(name, port) + LOOP + PERFORM master_drop_sequences(sequence_names, node_name, node_port); + END LOOP; +END; +$cdbdt$; + +COMMENT ON FUNCTION citus_drop_trigger() + IS 'perform checks and actions at the end of DROP actions'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index da6f90cff..726af25ca 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-10' +default_version = '6.0-11' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9fcf4ecc7..b8c810d55 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,6 +9,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -27,14 +28,17 @@ #include "catalog/pg_trigger.h" #include "commands/defrem.h" #include "commands/extension.h" +#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "executor/spi.h" -#include "distributed/multi_logical_planner.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" @@ -51,28 +55,35 @@ /* local function forward declarations */ +static void ConvertToDistributedTable(Oid relationId, + text *distributionColumnText, + Oid distributionMethodOid, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static bool LocalTableEmpty(Oid tableId); +static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, + Var *distributionColumn); +static void InsertPgDistPartition(Oid relationId, char distributionMethod, + Node *distributionKey, uint32 colocationId); static void CreateTruncateTrigger(Oid relationId); +static uint32 ColocationId(int shardCount, int replicationFactor, + Oid distributionColumnType); +static uint32 CreateColocationGroup(int shardCount, int replicationFactor, + Oid distributionColumnType); +static uint32 GetNextColocationId(void); + /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); +PG_FUNCTION_INFO_V1(create_distributed_table); /* * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. - * - * XXX: We should perform more checks here to see if this table is fit for - * partitioning. At a minimum, we should validate the following: (i) this node - * runs as the master node, (ii) table does not make use of the inheritance - * mechanism, (iii) table does not own columns that are sequences, and (iv) - * table does not have collated columns. (v) table does not have - * preexisting content. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) @@ -81,82 +92,172 @@ master_create_distributed_table(PG_FUNCTION_ARGS) text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - Relation distributedRelation = NULL; - TupleDesc relationDesc = NULL; - char *distributedRelationName = NULL; - char relationKind = '\0'; + ConvertToDistributedTable(distributedRelationId, distributionColumnText, + distributionMethodOid, INVALID_COLOCATION_ID); + PG_RETURN_VOID(); +} + + +/* + * create_distributed_table accepts a table, distribution column and + * distribution method, then it creates a distributed table. + */ +Datum +create_distributed_table(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + text *distributionColumnText = PG_GETARG_TEXT_P(1); + Oid distributionMethodOid = PG_GETARG_OID(2); + + Relation distributedRelation = NULL; + Relation pgDistColocation = NULL; + Node *distributionKey = NULL; + Var *distributionColumn = NULL; + char *distributionColumnName = NULL; + int distributionColumnType = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + /* if distribution method is not hash, just create partition metadata */ + char distributionMethod = LookupDistributionMethod(distributionMethodOid); + if (distributionMethod != DISTRIBUTE_BY_HASH) + { + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, INVALID_COLOCATION_ID); + PG_RETURN_VOID(); + } + + /* get distribution column type */ + distributionColumnName = text_to_cstring(distributionColumnText); + distributedRelation = relation_open(relationId, AccessShareLock); + distributionKey = BuildDistributionKeyFromColumnName(distributedRelation, + distributionColumnName); + distributionColumn = (Var *) distributionKey; + distributionColumnType = distributionColumn->vartype; + + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the table until this + * transaction is committed. + */ + pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + /* check for existing colocations */ + colocationId = ColocationId(ShardCount, ShardReplicationFactor, + distributionColumnType); + + /* + * If there is a colocation group for the current configuration, get a + * colocated table from the group and use its shards as a reference to + * create new shards. Otherwise, create a new colocation group and create + * shards with the default round robin algorithm. + */ + if (colocationId != INVALID_COLOCATION_ID) + { + char *relationName = get_rel_name(relationId); + + Oid colocatedTableId = ColocatedTableId(colocationId); + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, colocationId); + + CreateColocatedShards(relationId, colocatedTableId); + ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", + relationName, colocationId))); + } + else + { + colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, + distributionColumnType); + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, colocationId); + + /* use the default way to create shards */ + CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor); + } + + heap_close(pgDistColocation, NoLock); + relation_close(distributedRelation, NoLock); + + PG_RETURN_VOID(); +} + + +/* + * ConvertToDistributedTable converts the given regular PostgreSQL table into a + * distributed table. First, it checks if the given table can be distributed, + * then it creates related tuple in pg_dist_partition. + * + * XXX: We should perform more checks here to see if this table is fit for + * partitioning. At a minimum, we should validate the following: (i) this node + * runs as the master node, (ii) table does not make use of the inheritance + * mechanism, (iii) table does not own columns that are sequences, and (iv) + * table does not have collated columns. + */ +static void +ConvertToDistributedTable(Oid relationId, text *distributionColumnText, + Oid distributionMethodOid, uint32 colocationId) +{ + Relation relation = NULL; + TupleDesc relationDesc = NULL; + char *relationName = NULL; + char relationKind = 0; - Relation pgDistPartition = NULL; char distributionMethod = LookupDistributionMethod(distributionMethodOid); - const char replicationModel = 'c'; char *distributionColumnName = text_to_cstring(distributionColumnText); Node *distributionKey = NULL; Var *distributionColumn = NULL; - char *distributionKeyString = NULL; - - List *indexOidList = NIL; - ListCell *indexOidCell = NULL; - - HeapTuple newTuple = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; /* * Lock target relation with an access exclusive lock - there's no way to * make sense of this table until we've committed, and we don't want * multiple backends manipulating this relation. */ - distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock); - relationDesc = RelationGetDescr(distributedRelation); - distributedRelationName = RelationGetRelationName(distributedRelation); + relation = relation_open(relationId, AccessExclusiveLock); + relationDesc = RelationGetDescr(relation); + relationName = RelationGetRelationName(relation); - EnsureTableOwner(distributedRelationId); - - /* open system catalog and insert new tuple */ - pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + EnsureTableOwner(relationId); /* check that the relation is not already distributed */ - if (IsDistributedTable(distributedRelationId)) + if (IsDistributedTable(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("table \"%s\" is already distributed", - distributedRelationName))); + relationName))); } /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ if (relationDesc->tdhasoid) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot distribute relation: %s", distributedRelationName), + errmsg("cannot distribute relation: %s", relationName), errdetail("Distributed relations must not specify the WITH " "(OIDS) option in their definitions."))); } /* verify target relation is either regular or foreign table */ - relationKind = distributedRelation->rd_rel->relkind; + relationKind = relation->rd_rel->relkind; if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot distribute relation: %s", - distributedRelationName), + relationName), errdetail("Distributed relations must be regular or " "foreign tables."))); } /* check that the relation does not contain any rows */ - if (!LocalTableEmpty(distributedRelationId)) + if (!LocalTableEmpty(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", - distributedRelationName), + relationName), errdetail("Relation \"%s\" contains data.", - distributedRelationName), + relationName), errhint("Empty your table before distributing it."))); } - distributionKey = BuildDistributionKeyFromColumnName(distributedRelation, + distributionKey = BuildDistributionKeyFromColumnName(relation, distributionColumnName); - distributionKeyString = nodeToString(distributionKey); /* the distribution key should always be a Var for now */ Assert(IsA(distributionKey, Var)); @@ -193,17 +294,91 @@ master_create_distributed_table(PG_FUNCTION_ARGS) } } + ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn); + + InsertPgDistPartition(relationId, distributionMethod, distributionKey, colocationId); + + relation_close(relation, NoLock); + /* - * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, - * since currently there is no way of enforcing uniqueness for overlapping shards. - * - * Similarly, do not allow such constraints it they do not - * include partition column. This check is important for two reasons. First, - * currently Citus does not enforce uniqueness constraint on multiple shards. - * Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no - * further check for constraints. + * PostgreSQL supports truncate trigger for regular relations only. + * Truncate on foreign tables is not supported. */ - indexOidList = RelationGetIndexList(distributedRelation); + if (relationKind == RELKIND_RELATION) + { + CreateTruncateTrigger(relationId); + } +} + + +/* + * InsertPgDistPartition inserts a new tuple into pg_dist_partition. + */ +static void +InsertPgDistPartition(Oid relationId, char distributionMethod, Node *distributionKey, + uint32 colocationId) +{ + Relation pgDistPartition = NULL; + const char replicationModel = 'c'; + char *distributionKeyString = NULL; + + HeapTuple newTuple = NULL; + Datum newValues[Natts_pg_dist_partition]; + bool newNulls[Natts_pg_dist_partition]; + + /* open system catalog and insert new tuple */ + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + + distributionKeyString = nodeToString(distributionKey); + + /* form new tuple for pg_dist_partition */ + memset(newValues, 0, sizeof(newValues)); + memset(newNulls, false, sizeof(newNulls)); + + newValues[Anum_pg_dist_partition_logicalrelid - 1] = + ObjectIdGetDatum(relationId); + newValues[Anum_pg_dist_partition_partmethod - 1] = + CharGetDatum(distributionMethod); + newValues[Anum_pg_dist_partition_partkey - 1] = + CStringGetTextDatum(distributionKeyString); + newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId; + newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); + + newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); + + /* finally insert tuple, build index entries & register cache invalidation */ + simple_heap_insert(pgDistPartition, newTuple); + CatalogUpdateIndexes(pgDistPartition, newTuple); + CitusInvalidateRelcacheByRelid(relationId); + + RecordDistributedRelationDependencies(relationId, distributionKey); + + CommandCounterIncrement(); + heap_close(pgDistPartition, NoLock); +} + + +/* + * ErrorIfNotSupportedConstraint run checks related to unique index / exclude + * constraints. + * + * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, + * since currently there is no way of enforcing uniqueness for overlapping shards. + * + * Similarly, do not allow such constraints if they do not include partition column. + * This check is important for two reasons. First, currently Citus does not enforce + * uniqueness constraint on multiple shards. + * Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no + * further check for constraints. + */ +static void +ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, + Var *distributionColumn) +{ + char *relationName = RelationGetRelationName(relation); + List *indexOidList = RelationGetIndexList(relation); + ListCell *indexOidCell = NULL; + foreach(indexOidCell, indexOidList) { Oid indexOid = lfirst_oid(indexOidCell); @@ -233,7 +408,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) { ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint", - distributedRelationName), + relationName), errdetail("UNIQUE constraints, EXCLUDE constraints, " "and PRIMARY KEYs on " "append-partitioned tables cannot be enforced."), @@ -271,7 +446,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: \"%s\"", - distributedRelationName), + relationName), errdetail("Distributed relations cannot have UNIQUE, " "EXCLUDE, or PRIMARY KEY constraints that do not " "include the partition column (with an equality " @@ -280,44 +455,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS) index_close(indexDesc, NoLock); } - - /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - - newValues[Anum_pg_dist_partition_logicalrelid - 1] = - ObjectIdGetDatum(distributedRelationId); - newValues[Anum_pg_dist_partition_partmethod - 1] = - CharGetDatum(distributionMethod); - newValues[Anum_pg_dist_partition_partkey - 1] = - CStringGetTextDatum(distributionKeyString); - newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID; - newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - - newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); - - /* finally insert tuple, build index entries & register cache invalidation */ - simple_heap_insert(pgDistPartition, newTuple); - CatalogUpdateIndexes(pgDistPartition, newTuple); - CitusInvalidateRelcacheByRelid(distributedRelationId); - - RecordDistributedRelationDependencies(distributedRelationId, distributionKey); - - CommandCounterIncrement(); - - heap_close(pgDistPartition, NoLock); - relation_close(distributedRelation, NoLock); - - /* - * PostgreSQL supports truncate trigger for regular relations only. - * Truncate on foreign tables is not supported. - */ - if (relationKind == RELKIND_RELATION) - { - CreateTruncateTrigger(distributedRelationId); - } - - PG_RETURN_VOID(); } @@ -525,3 +662,128 @@ CreateTruncateTrigger(Oid relationId) CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, internal); } + + +/* + * ColocationId searches pg_dist_colocation for shard count, replication factor + * and distribution column type. If a matching entry is found, it returns the + * colocation id, otherwise it returns INVALID_COLOCATION_ID. + */ +static uint32 +ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + ScanKeyData scanKey[3]; + int scanKeyCount = 3; + bool indexOK = true; + + /* acquire a lock, so that no one can do this concurrently */ + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. + */ +static uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + colocationId = GetNextColocationId(); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, NoLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +static uint32 +GetNextColocationId() +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index a55ed2842..19f70df0b 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -58,13 +58,8 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards); /* - * master_create_worker_shards creates empty shards for the given table based - * on the specified number of initial shards. The function first gets a list of - * candidate nodes and issues DDL commands on the nodes to create empty shard - * placements on those nodes. The function then updates metadata on the master - * node to make this shard (and its placements) visible. Note that the function - * assumes the table is hash partitioned and calculates the min/max hash token - * ranges for each shard, giving them an equal split of the hash space. + * master_create_worker_shards is a user facing function to create worker shards + * for the given relation in round robin order. */ Datum master_create_worker_shards(PG_FUNCTION_ARGS) @@ -74,10 +69,27 @@ master_create_worker_shards(PG_FUNCTION_ARGS) int32 replicationFactor = PG_GETARG_INT32(2); Oid distributedTableId = ResolveRelationId(tableNameText); - char relationKind = get_rel_relkind(distributedTableId); - char *tableName = text_to_cstring(tableNameText); + CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); + + PG_RETURN_VOID(); +} + + +/* + * CreateShardsWithRoundRobinPolicy creates empty shards for the given table + * based on the specified number of initial shards. The function first gets a + * list of candidate nodes and issues DDL commands on the nodes to create empty + * shard placements on those nodes. The function then updates metadata on the + * master node to make this shard (and its placements) visible. Note that the + * function assumes the table is hash partitioned and calculates the min/max + * hash token ranges for each shard, giving them an equal split of the hash space. + */ +void +CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, + int32 replicationFactor) +{ char *relationOwner = NULL; - char shardStorageType = '\0'; + char shardStorageType = 0; List *workerNodeList = NIL; List *ddlCommandList = NIL; int32 workerNodeCount = 0; @@ -106,6 +118,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) existingShardList = LoadShardList(distributedTableId); if (existingShardList != NIL) { + char *tableName = get_rel_name(distributedTableId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("table \"%s\" has already had shards created for it", tableName))); @@ -156,22 +169,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } /* set shard storage type according to relation type */ - if (relationKind == RELKIND_FOREIGN_TABLE) - { - bool cstoreTable = CStoreTable(distributedTableId); - if (cstoreTable) - { - shardStorageType = SHARD_STORAGE_COLUMNAR; - } - else - { - shardStorageType = SHARD_STORAGE_FOREIGN; - } - } - else - { - shardStorageType = SHARD_STORAGE_TABLE; - } + shardStorageType = ShardStorageType(distributedTableId); for (shardIndex = 0; shardIndex < shardCount; shardIndex++) { @@ -182,8 +180,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) text *maxHashTokenText = NULL; int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - Datum shardIdDatum = master_get_new_shardid(NULL); - int64 shardId = DatumGetInt64(shardIdDatum); + uint64 shardId = GetNextShardId(); /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -217,8 +214,104 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } RESUME_INTERRUPTS(); +} - PG_RETURN_VOID(); + +/* + * CreateColocatedShards creates shards for the target relation colocated with + * the source relation. + */ +void +CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) +{ + char *targetTableRelationOwner = NULL; + char targetShardStorageType = 0; + List *existingShardList = NIL; + List *sourceShardIntervalList = NIL; + List *targetTableDDLEvents = NIL; + ListCell *sourceShardCell = NULL; + + /* make sure that tables are hash partitioned */ + CheckHashPartitionedTable(targetRelationId); + CheckHashPartitionedTable(sourceRelationId); + + /* + * In contrast to append/range partitioned tables it makes more sense to + * require ownership privileges - shards for hash-partitioned tables are + * only created once, not continually during ingest as for the other + * partitioning types. + */ + EnsureTableOwner(targetRelationId); + + /* we plan to add shards: get an exclusive metadata lock on the target relation */ + LockRelationDistributionMetadata(targetRelationId, ExclusiveLock); + + /* a share metadata lock is enough on the source relation */ + LockRelationDistributionMetadata(sourceRelationId, ShareLock); + + /* prevent concurrent placement changes */ + sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); + LockShardListMetadata(sourceShardIntervalList, ShareLock); + + /* validate that shards haven't already been created for this table */ + existingShardList = LoadShardList(targetRelationId); + if (existingShardList != NIL) + { + char *targetRelationName = get_rel_name(targetRelationId); + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("table \"%s\" has already had shards created for it", + targetRelationName))); + } + + targetTableRelationOwner = TableOwner(targetRelationId); + targetTableDDLEvents = GetTableDDLEvents(targetRelationId); + targetShardStorageType = ShardStorageType(targetRelationId); + + foreach(sourceShardCell, sourceShardIntervalList) + { + ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell); + uint64 sourceShardId = sourceShardInterval->shardId; + uint64 newShardId = GetNextShardId(); + ListCell *sourceShardPlacementCell = NULL; + + int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); + int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); + text *shardMinValueText = IntegerToText(shardMinValue); + text *shardMaxValueText = IntegerToText(shardMaxValue); + + ListCell *sourceShardPlacementCell = NULL; + foreach(sourceShardPlacementCell, sourceShardPlacementList) + { + ShardPlacement *sourcePlacement = + (ShardPlacement *) lfirst(sourceShardPlacementCell); + char *sourceNodeName = sourcePlacement->nodeName; + int32 sourceNodePort = sourcePlacement->nodePort; + + bool created = WorkerCreateShard(targetRelationId, sourceNodeName, + sourceNodePort, newShardId, + targetTableRelationOwner, + targetTableDDLEvents); + if (created) + { + const RelayFileState shardState = FILE_FINALIZED; + const uint64 shardSize = 0; + + InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState, + shardSize, sourceNodeName, sourceNodePort); + } + else + { + char *targetRelationName = get_rel_name(targetRelationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("table \"%s\" could not be colocated with %s", + targetRelationName, sourceRelationName))); + } + } + + InsertShardRow(targetRelationId, newShardId, targetShardStorageType, + shardMinValueText, shardMaxValueText); + } } diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index bdb55e20c..b018e7edc 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -94,8 +94,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS) HeapTuple metadataTuple = NULL; TupleDesc metadataDescriptor = NULL; uint64 shardMaxSizeInBytes = 0; - char relationType = 0; - char storageType = 0; + char shardStorageType = 0; Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; @@ -122,26 +121,10 @@ master_get_table_metadata(PG_FUNCTION_ARGS) shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; /* get storage type */ - relationType = get_rel_relkind(relationId); - if (relationType == RELKIND_RELATION) - { - storageType = SHARD_STORAGE_TABLE; - } - else if (relationType == RELKIND_FOREIGN_TABLE) - { - bool cstoreTable = CStoreTable(relationId); - if (cstoreTable) - { - storageType = SHARD_STORAGE_COLUMNAR; - } - else - { - storageType = SHARD_STORAGE_FOREIGN; - } - } + shardStorageType = ShardStorageType(relationId); values[0] = ObjectIdGetDatum(relationId); - values[1] = storageType; + values[1] = shardStorageType; values[2] = partitionEntry->partitionMethod; values[3] = partitionKey; values[4] = Int32GetDatum(ShardReplicationFactor); @@ -730,6 +713,41 @@ GetTableDDLEvents(Oid relationId) } +/* + * ShardStorageType returns the shard storage type according to relation type. + */ +char +ShardStorageType(Oid relationId) +{ + char shardStorageType = 0; + + char relationType = get_rel_relkind(relationId); + if (relationType == RELKIND_RELATION) + { + shardStorageType = SHARD_STORAGE_TABLE; + } + else if (relationType == RELKIND_FOREIGN_TABLE) + { + bool cstoreTable = CStoreTable(relationId); + if (cstoreTable) + { + shardStorageType = SHARD_STORAGE_COLUMNAR; + } + else + { + shardStorageType = SHARD_STORAGE_FOREIGN; + } + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unexpected relation type: %c", relationType))); + } + + return shardStorageType; +} + + /* * WorkerNodeGetDatum converts the worker node passed to it into its datum * representation. To do this, the function first creates the heap tuple from diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index fa916c10a..d9852518f 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -41,8 +41,6 @@ /* Local functions forward declarations */ -static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, - uint64 shardId, char *newShardOwner, List *ddlCommandList); static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); @@ -426,7 +424,7 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, * shard on the worker node. Note that this function opens a new connection for * each DDL command, and could leave the shard in an half-initialized state. */ -static bool +bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, uint64 shardId, char *newShardOwner, List *ddlCommandList) { diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 776fbe089..f2d634ea2 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -134,7 +134,7 @@ ColocatedTableList(Oid distributedTableId) } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -219,3 +219,44 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) return colocatedShardList; } + + +/* + * ColocatedTableId returns an arbitrary table which belongs to given colocation + * group. If there is not such a colocation group, it returns invalid oid. + */ +Oid +ColocatedTableId(Oid colocationId) +{ + Oid colocatedTableId = InvalidOid; + Relation pgDistPartition = NULL; + TupleDesc tupleDescriptor = NULL; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + bool indexOK = true; + bool isNull = false; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); + + /* prevent DELETE statements */ + pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock); + tupleDescriptor = RelationGetDescr(pgDistPartition); + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionColocationidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid, + tupleDescriptor, &isNull); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, ShareLock); + + return colocatedTableId; +} diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index e276d0a7b..de2be6101 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -55,6 +55,8 @@ static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid; +static Oid distColocationRelationId = InvalidOid; +static Oid distColocationConfigurationIndexId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -690,6 +692,27 @@ DistLocalGroupIdRelationId(void) } +/* return oid of pg_dist_colocation relation */ +Oid +DistColocationRelationId(void) +{ + CachedRelationLookup("pg_dist_colocation", &distColocationRelationId); + + return distColocationRelationId; +} + + +/* return oid of pg_dist_colocation_configuration_index index */ +Oid +DistColocationConfigurationIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_configuration_index", + &distColocationConfigurationIndexId); + + return distColocationConfigurationIndexId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -1419,6 +1442,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardPlacementRelationId = InvalidOid; distLocalGroupRelationId = InvalidOid; distNodeRelationId = InvalidOid; + distColocationRelationId = InvalidOid; + distColocationConfigurationIndexId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index bd983ed43..c32ed8a01 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -23,5 +23,7 @@ extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); +extern Oid ColocatedTableId(Oid colocationId); + #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 9a08ebbb5..f323edafa 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -92,11 +92,17 @@ extern int ShardPlacementPolicy; extern bool CStoreTable(Oid relationId); extern Oid ResolveRelationId(text *relationName); extern List * GetTableDDLEvents(Oid relationId); +extern char ShardStorageType(Oid relationId); extern void CheckDistributedTable(Oid relationId); extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, char *newPlacementOwner, List *workerNodeList, int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); +extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, + int32 replicationFactor); +extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); +extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, + uint64 shardId, char *newShardOwner, List *ddlCommandList); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 35d7e835f..7cd38c39f 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -66,6 +66,8 @@ extern bool CitusHasBeenLoaded(void); extern HTAB * GetWorkerNodeHash(void); /* relation oids */ +extern Oid DistColocationRelationId(void); +extern Oid DistColocationConfigurationIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); diff --git a/src/include/distributed/pg_dist_colocation.h b/src/include/distributed/pg_dist_colocation.h new file mode 100644 index 000000000..e7acefa6c --- /dev/null +++ b/src/include/distributed/pg_dist_colocation.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_colocation.h + * definition of the relation that holds the colocation information on the + * cluster (pg_dist_colocation). + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_COLOCATION_H +#define PG_DIST_COLOCATION_H + +/* ---------------- + * pg_dist_colocation definition. + * ---------------- + */ +typedef struct FormData_pg_dist_colocation +{ + int colocationid; + int shardcount; + int replicationfactor; + Oid distributioncolumntype; +} FormData_pg_dist_colocation; + +/* ---------------- + * Form_pg_dist_colocation corresponds to a pointer to a tuple with + * the format of pg_dist_colocation relation. + * ---------------- + */ +typedef FormData_pg_dist_colocation *Form_pg_dist_colocation; + +/* ---------------- + * compiler constants for pg_dist_colocation + * ---------------- + */ +#define Natts_pg_dist_colocation 4 +#define Anum_pg_dist_colocation_colocationid 1 +#define Anum_pg_dist_colocation_shardcount 2 +#define Anum_pg_dist_colocation_replicationfactor 3 +#define Anum_pg_dist_colocation_distributioncolumntype 4 + +#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq" + + +#endif /* PG_DIST_COLOCATION_H */ diff --git a/src/test/regress/expected/multi_colocated_shard_transfer.out b/src/test/regress/expected/multi_colocated_shard_transfer.out index dc7c7e125..0cda8e584 100644 --- a/src/test/regress/expected/multi_colocated_shard_transfer.out +++ b/src/test/regress/expected/multi_colocated_shard_transfer.out @@ -19,22 +19,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 3 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 3 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- repair colocated shards @@ -55,22 +55,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 1 - 1300000 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 1 + 1300000 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- test repairing NOT colocated shard @@ -179,22 +179,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 3 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 3 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- repair while all placements of one shard in colocation group is unhealthy @@ -211,21 +211,21 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 3 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 3 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 1a15044b9..1a069702c 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -4,7 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; -- create test utility function -- =================================================================== CREATE SEQUENCE colocation_test_seq - MINVALUE 1 + MINVALUE 1000 NO CYCLE; /* a very simple UDF that only sets the colocation ids the same * DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of @@ -14,7 +14,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass RETURNS BOOL LANGUAGE plpgsql AS $colocate_tables$ -DECLARE nextid BIGINT; +DECLARE nextid INTEGER; BEGIN SELECT nextval('colocation_test_seq') INTO nextid; @@ -155,7 +155,7 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1'); SELECT get_table_colocation_id('table1_group1'); get_table_colocation_id ------------------------- - 1 + 1000 (1 row) SELECT get_table_colocation_id('table5_groupX'); @@ -189,7 +189,7 @@ SELECT tables_colocated('table6_append', 'table6_append'); t (1 row) --- check table co-location with same co-location group +-- check table co-location with same co-location group SELECT tables_colocated('table1_group1', 'table2_group1'); tables_colocated ------------------ @@ -339,3 +339,303 @@ SELECT find_shard_interval_index(1300016); 0 (1 row) +-- check external colocation API +SET citus.shard_count = 2; +CREATE TABLE table1_groupA ( id int ); +SELECT create_distributed_table('table1_groupA', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupA ( id int ); +SELECT create_distributed_table('table2_groupA', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- change shard replication factor +SET citus.shard_replication_factor = 1; +CREATE TABLE table1_groupB ( id int ); +SELECT create_distributed_table('table1_groupB', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupB ( id int ); +SELECT create_distributed_table('table2_groupB', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- revert back to default shard replication factor +SET citus.shard_replication_factor to DEFAULT; +-- change partition column type +CREATE TABLE table1_groupC ( id text ); +SELECT create_distributed_table('table1_groupC', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupC ( id text ); +SELECT create_distributed_table('table2_groupC', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- change shard count +SET citus.shard_count = 4; +CREATE TABLE table1_groupD ( id int ); +SELECT create_distributed_table('table1_groupD', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupD ( id int ); +SELECT create_distributed_table('table2_groupD', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- try other distribution methods +CREATE TABLE table_append ( id int ); +SELECT create_distributed_table('table_append', 'id', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_range ( id int ); +SELECT create_distributed_table('table_range', 'id', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +-- test foreign table creation +CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; +SELECT create_distributed_table('table3_groupD', 'id'); +NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1 | 2 | 2 | 23 + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 +(4 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + logicalrelid | colocationid +---------------+-------------- + table1_groupa | 1 + table2_groupa | 1 + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 +(9 rows) + +-- check effects of dropping tables +DROP TABLE table1_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1 | 2 | 2 | 23 +(1 row) + +-- dropping all tables in a colocation group also deletes the colocation group +DROP TABLE table2_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +-- create dropped colocation group again +SET citus.shard_count = 2; +CREATE TABLE table1_groupE ( id int ); +SELECT create_distributed_table('table1_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupE ( id int ); +SELECT create_distributed_table('table2_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- test different table DDL +CREATE TABLE table3_groupE ( dummy_column text, id int ); +SELECT create_distributed_table('table3_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- test different schema +CREATE SCHEMA schema_collocation; +CREATE TABLE schema_collocation.table4_groupE ( id int ); +SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- check worker table schemas +\c - - - :worker_1_port +\d table3_groupE_1300050 +Table "public.table3_groupe_1300050" + Column | Type | Modifiers +--------------+---------+----------- + dummy_column | text | + id | integer | + +\d schema_collocation.table4_groupE_1300052 +Table "schema_collocation.table4_groupe_1300052" + Column | Type | Modifiers +--------+---------+----------- + id | integer | + +\c - - - :master_port +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 5 | 2 | 2 | 23 +(4 rows) + +-- cross check with internal colocation API +SELECT + p1.logicalrelid::regclass AS table1, + p2.logicalrelid::regclass AS table2, + tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated +FROM + pg_dist_partition p1, + pg_dist_partition p2 +WHERE + p1.logicalrelid < p2.logicalrelid AND + p1.colocationid != 0 AND + p2.colocationid != 0 AND + tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE +ORDER BY + table1, + table2; + table1 | table2 | colocated +---------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t +(12 rows) + +-- check created shards +SELECT + logicalrelid, + pg_dist_shard.shardid AS shardid, + shardstorage, + nodeport, + shardminvalue, + shardmaxvalue +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + pg_dist_shard.shardid >= 1300026 +ORDER BY + logicalrelid, + shardmaxvalue::integer, + shardid, + placementid; + logicalrelid | shardid | shardstorage | nodeport | shardminvalue | shardmaxvalue +----------------------------------+---------+--------------+----------+---------------+--------------- + table1_groupb | 1300026 | t | 57637 | -2147483648 | -1 + table1_groupb | 1300027 | t | 57638 | 0 | 2147483647 + table2_groupb | 1300028 | t | 57637 | -2147483648 | -1 + table2_groupb | 1300029 | t | 57638 | 0 | 2147483647 + table1_groupc | 1300030 | t | 57637 | -2147483648 | -1 + table1_groupc | 1300030 | t | 57638 | -2147483648 | -1 + table1_groupc | 1300031 | t | 57638 | 0 | 2147483647 + table1_groupc | 1300031 | t | 57637 | 0 | 2147483647 + table2_groupc | 1300032 | t | 57638 | -2147483648 | -1 + table2_groupc | 1300032 | t | 57637 | -2147483648 | -1 + table2_groupc | 1300033 | t | 57637 | 0 | 2147483647 + table2_groupc | 1300033 | t | 57638 | 0 | 2147483647 + table1_groupd | 1300034 | t | 57637 | -2147483648 | -1073741825 + table1_groupd | 1300034 | t | 57638 | -2147483648 | -1073741825 + table1_groupd | 1300035 | t | 57638 | -1073741824 | -1 + table1_groupd | 1300035 | t | 57637 | -1073741824 | -1 + table1_groupd | 1300036 | t | 57637 | 0 | 1073741823 + table1_groupd | 1300036 | t | 57638 | 0 | 1073741823 + table1_groupd | 1300037 | t | 57638 | 1073741824 | 2147483647 + table1_groupd | 1300037 | t | 57637 | 1073741824 | 2147483647 + table2_groupd | 1300038 | t | 57638 | -2147483648 | -1073741825 + table2_groupd | 1300038 | t | 57637 | -2147483648 | -1073741825 + table2_groupd | 1300039 | t | 57637 | -1073741824 | -1 + table2_groupd | 1300039 | t | 57638 | -1073741824 | -1 + table2_groupd | 1300040 | t | 57638 | 0 | 1073741823 + table2_groupd | 1300040 | t | 57637 | 0 | 1073741823 + table2_groupd | 1300041 | t | 57637 | 1073741824 | 2147483647 + table2_groupd | 1300041 | t | 57638 | 1073741824 | 2147483647 + table3_groupd | 1300042 | f | 57637 | -2147483648 | -1073741825 + table3_groupd | 1300042 | f | 57638 | -2147483648 | -1073741825 + table3_groupd | 1300043 | f | 57638 | -1073741824 | -1 + table3_groupd | 1300043 | f | 57637 | -1073741824 | -1 + table3_groupd | 1300044 | f | 57637 | 0 | 1073741823 + table3_groupd | 1300044 | f | 57638 | 0 | 1073741823 + table3_groupd | 1300045 | f | 57638 | 1073741824 | 2147483647 + table3_groupd | 1300045 | f | 57637 | 1073741824 | 2147483647 + table1_groupe | 1300046 | t | 57637 | -2147483648 | -1 + table1_groupe | 1300046 | t | 57638 | -2147483648 | -1 + table1_groupe | 1300047 | t | 57638 | 0 | 2147483647 + table1_groupe | 1300047 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300048 | t | 57638 | -2147483648 | -1 + table2_groupe | 1300048 | t | 57637 | -2147483648 | -1 + table2_groupe | 1300049 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300049 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300050 | t | 57637 | -2147483648 | -1 + table3_groupe | 1300050 | t | 57638 | -2147483648 | -1 + table3_groupe | 1300051 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300051 | t | 57637 | 0 | 2147483647 + schema_collocation.table4_groupe | 1300052 | t | 57638 | -2147483648 | -1 + schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 + schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 + schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 +(52 rows) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 7ff35d2ee..ec7a73d5f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -36,6 +36,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; +ALTER EXTENSION citus UPDATE TO '6.0-11'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 93684c6b5..bd6d36116 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; -- =================================================================== CREATE SEQUENCE colocation_test_seq - MINVALUE 1 + MINVALUE 1000 NO CYCLE; /* a very simple UDF that only sets the colocation ids the same @@ -18,7 +18,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass RETURNS BOOL LANGUAGE plpgsql AS $colocate_tables$ -DECLARE nextid BIGINT; +DECLARE nextid INTEGER; BEGIN SELECT nextval('colocation_test_seq') INTO nextid; @@ -110,7 +110,7 @@ SELECT tables_colocated('table1_group1', 'table1_group1'); SELECT tables_colocated('table5_groupX', 'table5_groupX'); SELECT tables_colocated('table6_append', 'table6_append'); --- check table co-location with same co-location group +-- check table co-location with same co-location group SELECT tables_colocated('table1_group1', 'table2_group1'); -- check table co-location with different co-location group @@ -154,3 +154,138 @@ SELECT find_shard_interval_index(1300001); SELECT find_shard_interval_index(1300002); SELECT find_shard_interval_index(1300003); SELECT find_shard_interval_index(1300016); + + +-- check external colocation API + +SET citus.shard_count = 2; + +CREATE TABLE table1_groupA ( id int ); +SELECT create_distributed_table('table1_groupA', 'id'); + +CREATE TABLE table2_groupA ( id int ); +SELECT create_distributed_table('table2_groupA', 'id'); + +-- change shard replication factor +SET citus.shard_replication_factor = 1; + +CREATE TABLE table1_groupB ( id int ); +SELECT create_distributed_table('table1_groupB', 'id'); + +CREATE TABLE table2_groupB ( id int ); +SELECT create_distributed_table('table2_groupB', 'id'); + +-- revert back to default shard replication factor +SET citus.shard_replication_factor to DEFAULT; + +-- change partition column type +CREATE TABLE table1_groupC ( id text ); +SELECT create_distributed_table('table1_groupC', 'id'); + +CREATE TABLE table2_groupC ( id text ); +SELECT create_distributed_table('table2_groupC', 'id'); + +-- change shard count +SET citus.shard_count = 4; + +CREATE TABLE table1_groupD ( id int ); +SELECT create_distributed_table('table1_groupD', 'id'); + +CREATE TABLE table2_groupD ( id int ); +SELECT create_distributed_table('table2_groupD', 'id'); + +-- try other distribution methods +CREATE TABLE table_append ( id int ); +SELECT create_distributed_table('table_append', 'id', 'append'); + +CREATE TABLE table_range ( id int ); +SELECT create_distributed_table('table_range', 'id', 'range'); + +-- test foreign table creation +CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; +SELECT create_distributed_table('table3_groupD', 'id'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +-- check effects of dropping tables +DROP TABLE table1_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + +-- dropping all tables in a colocation group also deletes the colocation group +DROP TABLE table2_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + +-- create dropped colocation group again +SET citus.shard_count = 2; + +CREATE TABLE table1_groupE ( id int ); +SELECT create_distributed_table('table1_groupE', 'id'); + +CREATE TABLE table2_groupE ( id int ); +SELECT create_distributed_table('table2_groupE', 'id'); + +-- test different table DDL +CREATE TABLE table3_groupE ( dummy_column text, id int ); +SELECT create_distributed_table('table3_groupE', 'id'); + +-- test different schema +CREATE SCHEMA schema_collocation; + +CREATE TABLE schema_collocation.table4_groupE ( id int ); +SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); + +-- check worker table schemas +\c - - - :worker_1_port +\d table3_groupE_1300050 +\d schema_collocation.table4_groupE_1300052 + +\c - - - :master_port + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +-- cross check with internal colocation API +SELECT + p1.logicalrelid::regclass AS table1, + p2.logicalrelid::regclass AS table2, + tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated +FROM + pg_dist_partition p1, + pg_dist_partition p2 +WHERE + p1.logicalrelid < p2.logicalrelid AND + p1.colocationid != 0 AND + p2.colocationid != 0 AND + tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE +ORDER BY + table1, + table2; + +-- check created shards +SELECT + logicalrelid, + pg_dist_shard.shardid AS shardid, + shardstorage, + nodeport, + shardminvalue, + shardmaxvalue +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + pg_dist_shard.shardid >= 1300026 +ORDER BY + logicalrelid, + shardmaxvalue::integer, + shardid, + placementid; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index bf9a0e7c4..a7aaa0eef 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -41,6 +41,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; +ALTER EXTENSION citus UPDATE TO '6.0-11'; -- drop extension an re-create in newest version DROP EXTENSION citus;