diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index b06e5a876..beb90d024 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -78,6 +78,8 @@ $(EXTENSION)--6.0-9.sql: $(EXTENSION)--6.0-8.sql $(EXTENSION)--6.0-8--6.0-9.sql cat $^ > $@ $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sql cat $^ > $@ +$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-10--6.0-11.sql b/src/backend/distributed/citus--6.0-10--6.0-11.sql new file mode 100644 index 000000000..48fdca26c --- /dev/null +++ b/src/backend/distributed/citus--6.0-10--6.0-11.sql @@ -0,0 +1,112 @@ +/* citus--6.0-10--6.0-11.sql */ + +SET search_path = 'pg_catalog'; + +CREATE SEQUENCE citus.pg_dist_colocationid_seq + MINVALUE 1 + MAXVALUE 4294967296; + +ALTER SEQUENCE citus.pg_dist_colocationid_seq SET SCHEMA pg_catalog; + +/* add pg_dist_colocation */ +CREATE TABLE citus.pg_dist_colocation( + colocationid int NOT NULL PRIMARY KEY, + shardcount int NOT NULL, + replicationfactor int NOT NULL, + distributioncolumntype oid NOT NULL +); + +ALTER TABLE citus.pg_dist_colocation SET SCHEMA pg_catalog; + +CREATE INDEX pg_dist_colocation_configuration_index +ON pg_dist_colocation USING btree(shardcount, replicationfactor, distributioncolumntype); + +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type) + IS 'creates a distributed table'; + + +CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger() + RETURNS event_trigger + LANGUAGE plpgsql + SECURITY DEFINER + SET search_path = pg_catalog + AS $cdbdt$ +DECLARE + v_obj record; + sequence_names text[] := '{}'; + node_names text[] := '{}'; + node_ports bigint[] := '{}'; + node_name text; + node_port bigint; + table_colocation_id integer; +BEGIN + -- collect set of dropped sequences to drop on workers later + SELECT array_agg(object_identity) INTO sequence_names + FROM pg_event_trigger_dropped_objects() + WHERE object_type = 'sequence'; + + -- Must accumulate set of affected nodes before deleting placements, as + -- master_drop_all_shards will erase their rows, making it impossible for + -- us to know where to drop sequences (which must be dropped after shards, + -- since they have default value expressions which depend on sequences). + SELECT array_agg(sp.nodename), array_agg(sp.nodeport) + INTO node_names, node_ports + FROM pg_event_trigger_dropped_objects() AS dobj, + pg_dist_shard AS s, + pg_dist_shard_placement AS sp + WHERE dobj.object_type IN ('table', 'foreign table') + AND dobj.objid = s.logicalrelid + AND s.shardid = sp.shardid; + + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() LOOP + IF v_obj.object_type NOT IN ('table', 'foreign table') THEN + CONTINUE; + END IF; + + -- nothing to do if not a distributed table + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE logicalrelid = v_obj.objid) THEN + CONTINUE; + END IF; + + -- ensure all shards are dropped + PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + + -- get colocation group + SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + + -- delete partition entry + DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + + -- drop colocation group if all referencing tables are dropped + IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN + DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id; + END IF; + END LOOP; + + IF cardinality(sequence_names) = 0 THEN + RETURN; + END IF; + + FOR node_name, node_port IN + SELECT DISTINCT name, port + FROM unnest(node_names, node_ports) AS nodes(name, port) + LOOP + PERFORM master_drop_sequences(sequence_names, node_name, node_port); + END LOOP; +END; +$cdbdt$; + +COMMENT ON FUNCTION citus_drop_trigger() + IS 'perform checks and actions at the end of DROP actions'; + +ALTER TABLE pg_dist_partition ALTER COLUMN colocationid TYPE integer; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index da6f90cff..726af25ca 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-10' +default_version = '6.0-11' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9fcf4ecc7..3ac7a5f75 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,6 +9,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -27,14 +28,17 @@ #include "catalog/pg_trigger.h" #include "commands/defrem.h" #include "commands/extension.h" +#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "executor/spi.h" -#include "distributed/multi_logical_planner.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" #include "nodes/pg_list.h" @@ -51,28 +55,38 @@ /* local function forward declarations */ +static void ConvertToDistributedTable(Oid relationId, + text *distributionColumnText, + Oid distributionMethodOid, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); static bool LocalTableEmpty(Oid tableId); +static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, + Var *distributionColumn); +static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId); static void CreateTruncateTrigger(Oid relationId); +static uint32 ColocationId(int shardCount, int replicationFactor, + Oid distributionColumnType); +static uint32 CreateColocationGroup(int shardCount, int replicationFactor, + Oid distributionColumnType); +static uint32 GetNextColocationId(void); + /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); +PG_FUNCTION_INFO_V1(create_distributed_table); /* * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. * - * XXX: We should perform more checks here to see if this table is fit for - * partitioning. At a minimum, we should validate the following: (i) this node - * runs as the master node, (ii) table does not make use of the inheritance - * mechanism, (iii) table does not own columns that are sequences, and (iv) - * table does not have collated columns. (v) table does not have - * preexisting content. + * Note that this udf is depreciated and cannot create colocated tables, so we + * always use INVALID_COLOCATION_ID. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) @@ -81,86 +95,169 @@ master_create_distributed_table(PG_FUNCTION_ARGS) text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); + ConvertToDistributedTable(distributedRelationId, distributionColumnText, + distributionMethodOid, INVALID_COLOCATION_ID); + PG_RETURN_VOID(); +} + + +/* + * create_distributed_table accepts a table, distribution column and + * distribution method, then it creates a distributed table. + */ +Datum +create_distributed_table(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + text *distributionColumnText = PG_GETARG_TEXT_P(1); + Oid distributionMethodOid = PG_GETARG_OID(2); + Relation distributedRelation = NULL; - TupleDesc relationDesc = NULL; - char *distributedRelationName = NULL; - char relationKind = '\0'; - - Relation pgDistPartition = NULL; - char distributionMethod = LookupDistributionMethod(distributionMethodOid); - const char replicationModel = 'c'; - char *distributionColumnName = text_to_cstring(distributionColumnText); - Node *distributionKey = NULL; + Relation pgDistColocation = NULL; Var *distributionColumn = NULL; - char *distributionKeyString = NULL; + char *distributionColumnName = NULL; + int distributionColumnType = 0; + uint32 colocationId = INVALID_COLOCATION_ID; - List *indexOidList = NIL; - ListCell *indexOidCell = NULL; + /* if distribution method is not hash, just create partition metadata */ + char distributionMethod = LookupDistributionMethod(distributionMethodOid); + if (distributionMethod != DISTRIBUTE_BY_HASH) + { + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, INVALID_COLOCATION_ID); + PG_RETURN_VOID(); + } - HeapTuple newTuple = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; + /* get distribution column type */ + distributionColumnName = text_to_cstring(distributionColumnText); + distributedRelation = relation_open(relationId, AccessShareLock); + distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, + distributionColumnName); + distributionColumnType = distributionColumn->vartype; /* - * Lock target relation with an access exclusive lock - there's no way to - * make sense of this table until we've committed, and we don't want - * multiple backends manipulating this relation. + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. */ - distributedRelation = relation_open(distributedRelationId, AccessExclusiveLock); - relationDesc = RelationGetDescr(distributedRelation); - distributedRelationName = RelationGetRelationName(distributedRelation); + pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); - EnsureTableOwner(distributedRelationId); + /* check for existing colocations */ + colocationId = ColocationId(ShardCount, ShardReplicationFactor, + distributionColumnType); - /* open system catalog and insert new tuple */ - pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + /* + * If there is a colocation group for the current configuration, get a + * colocated table from the group and use its shards as a reference to + * create new shards. Otherwise, create a new colocation group and create + * shards with the default round robin algorithm. + */ + if (colocationId != INVALID_COLOCATION_ID) + { + char *relationName = get_rel_name(relationId); + + Oid colocatedTableId = ColocatedTableId(colocationId); + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, colocationId); + + CreateColocatedShards(relationId, colocatedTableId); + ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", + relationName, colocationId))); + } + else + { + colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, + distributionColumnType); + ConvertToDistributedTable(relationId, distributionColumnText, + distributionMethodOid, colocationId); + + /* use the default way to create shards */ + CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor); + } + + heap_close(pgDistColocation, NoLock); + relation_close(distributedRelation, NoLock); + + PG_RETURN_VOID(); +} + + +/* + * ConvertToDistributedTable converts the given regular PostgreSQL table into a + * distributed table. First, it checks if the given table can be distributed, + * then it creates related tuple in pg_dist_partition. + * + * XXX: We should perform more checks here to see if this table is fit for + * partitioning. At a minimum, we should validate the following: (i) this node + * runs as the master node, (ii) table does not make use of the inheritance + * mechanism, (iii) table does not own columns that are sequences, and (iv) + * table does not have collated columns. + */ +static void +ConvertToDistributedTable(Oid relationId, text *distributionColumnText, + Oid distributionMethodOid, uint32 colocationId) +{ + Relation relation = NULL; + TupleDesc relationDesc = NULL; + char *relationName = NULL; + char relationKind = 0; + + char distributionMethod = LookupDistributionMethod(distributionMethodOid); + char *distributionColumnName = text_to_cstring(distributionColumnText); + Var *distributionColumn = NULL; + + /* + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. + */ + relation = relation_open(relationId, ExclusiveLock); + relationDesc = RelationGetDescr(relation); + relationName = RelationGetRelationName(relation); + + EnsureTableOwner(relationId); /* check that the relation is not already distributed */ - if (IsDistributedTable(distributedRelationId)) + if (IsDistributedTable(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("table \"%s\" is already distributed", - distributedRelationName))); + relationName))); } /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ if (relationDesc->tdhasoid) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot distribute relation: %s", distributedRelationName), + errmsg("cannot distribute relation: %s", relationName), errdetail("Distributed relations must not specify the WITH " "(OIDS) option in their definitions."))); } /* verify target relation is either regular or foreign table */ - relationKind = distributedRelation->rd_rel->relkind; + relationKind = relation->rd_rel->relkind; if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE) { ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("cannot distribute relation: %s", - distributedRelationName), + relationName), errdetail("Distributed relations must be regular or " "foreign tables."))); } /* check that the relation does not contain any rows */ - if (!LocalTableEmpty(distributedRelationId)) + if (!LocalTableEmpty(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", - distributedRelationName), + relationName), errdetail("Relation \"%s\" contains data.", - distributedRelationName), + relationName), errhint("Empty your table before distributing it."))); } - distributionKey = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionKeyString = nodeToString(distributionKey); - - /* the distribution key should always be a Var for now */ - Assert(IsA(distributionKey, Var)); - distributionColumn = (Var *) distributionKey; + distributionColumn = BuildDistributionKeyFromColumnName(relation, + distributionColumnName); /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) @@ -193,17 +290,47 @@ master_create_distributed_table(PG_FUNCTION_ARGS) } } + ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn); + + InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, + colocationId); + + relation_close(relation, NoLock); + /* - * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, - * since currently there is no way of enforcing uniqueness for overlapping shards. - * - * Similarly, do not allow such constraints it they do not - * include partition column. This check is important for two reasons. First, - * currently Citus does not enforce uniqueness constraint on multiple shards. - * Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no - * further check for constraints. + * PostgreSQL supports truncate trigger for regular relations only. + * Truncate on foreign tables is not supported. */ - indexOidList = RelationGetIndexList(distributedRelation); + if (relationKind == RELKIND_RELATION) + { + CreateTruncateTrigger(relationId); + } +} + + +/* + * ErrorIfNotSupportedConstraint run checks related to unique index / exclude + * constraints. + * + * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned + * tables, since currently there is no way of enforcing uniqueness for + * overlapping shards. + * + * Similarly, do not allow such constraints if they do not include partition + * column. This check is important for two reasons: + * i. First, currently Citus does not enforce uniqueness constraint on multiple + * shards. + * ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed + * with no further check for constraints. + */ +static void +ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, + Var *distributionColumn) +{ + char *relationName = RelationGetRelationName(relation); + List *indexOidList = RelationGetIndexList(relation); + ListCell *indexOidCell = NULL; + foreach(indexOidCell, indexOidList) { Oid indexOid = lfirst_oid(indexOidCell); @@ -233,7 +360,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) { ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint", - distributedRelationName), + relationName), errdetail("UNIQUE constraints, EXCLUDE constraints, " "and PRIMARY KEYs on " "append-partitioned tables cannot be enforced."), @@ -271,7 +398,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation: \"%s\"", - distributedRelationName), + relationName), errdetail("Distributed relations cannot have UNIQUE, " "EXCLUDE, or PRIMARY KEY constraints that do not " "include the partition column (with an equality " @@ -280,18 +407,40 @@ master_create_distributed_table(PG_FUNCTION_ARGS) index_close(indexDesc, NoLock); } +} + + +/* + * InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition. + */ +static void +InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId) +{ + Relation pgDistPartition = NULL; + const char replicationModel = 'c'; + char *distributionColumnString = NULL; + + HeapTuple newTuple = NULL; + Datum newValues[Natts_pg_dist_partition]; + bool newNulls[Natts_pg_dist_partition]; + + /* open system catalog and insert new tuple */ + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + + distributionColumnString = nodeToString((Node *) distributionColumn); /* form new tuple for pg_dist_partition */ memset(newValues, 0, sizeof(newValues)); memset(newNulls, false, sizeof(newNulls)); newValues[Anum_pg_dist_partition_logicalrelid - 1] = - ObjectIdGetDatum(distributedRelationId); + ObjectIdGetDatum(relationId); newValues[Anum_pg_dist_partition_partmethod - 1] = CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_partkey - 1] = - CStringGetTextDatum(distributionKeyString); - newValues[Anum_pg_dist_partition_colocationid - 1] = INVALID_COLOCATION_ID; + CStringGetTextDatum(distributionColumnString); + newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId; newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); @@ -299,25 +448,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS) /* finally insert tuple, build index entries & register cache invalidation */ simple_heap_insert(pgDistPartition, newTuple); CatalogUpdateIndexes(pgDistPartition, newTuple); - CitusInvalidateRelcacheByRelid(distributedRelationId); + CitusInvalidateRelcacheByRelid(relationId); - RecordDistributedRelationDependencies(distributedRelationId, distributionKey); + RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn); CommandCounterIncrement(); - heap_close(pgDistPartition, NoLock); - relation_close(distributedRelation, NoLock); - - /* - * PostgreSQL supports truncate trigger for regular relations only. - * Truncate on foreign tables is not supported. - */ - if (relationKind == RELKIND_RELATION) - { - CreateTruncateTrigger(distributedRelationId); - } - - PG_RETURN_VOID(); } @@ -525,3 +661,126 @@ CreateTruncateTrigger(Oid relationId) CreateTrigger(trigger, NULL, relationId, InvalidOid, InvalidOid, InvalidOid, internal); } + + +/* + * ColocationId searches pg_dist_colocation for shard count, replication factor + * and distribution column type. If a matching entry is found, it returns the + * colocation id, otherwise it returns INVALID_COLOCATION_ID. + */ +static uint32 +ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + const int scanKeyCount = 3; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. + */ +static uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = GetNextColocationId(); + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, RowExclusiveLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +static uint32 +GetNextColocationId() +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index a55ed2842..abf89ff22 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -58,13 +58,8 @@ PG_FUNCTION_INFO_V1(master_create_worker_shards); /* - * master_create_worker_shards creates empty shards for the given table based - * on the specified number of initial shards. The function first gets a list of - * candidate nodes and issues DDL commands on the nodes to create empty shard - * placements on those nodes. The function then updates metadata on the master - * node to make this shard (and its placements) visible. Note that the function - * assumes the table is hash partitioned and calculates the min/max hash token - * ranges for each shard, giving them an equal split of the hash space. + * master_create_worker_shards is a user facing function to create worker shards + * for the given relation in round robin order. */ Datum master_create_worker_shards(PG_FUNCTION_ARGS) @@ -74,10 +69,27 @@ master_create_worker_shards(PG_FUNCTION_ARGS) int32 replicationFactor = PG_GETARG_INT32(2); Oid distributedTableId = ResolveRelationId(tableNameText); - char relationKind = get_rel_relkind(distributedTableId); - char *tableName = text_to_cstring(tableNameText); + CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); + + PG_RETURN_VOID(); +} + + +/* + * CreateShardsWithRoundRobinPolicy creates empty shards for the given table + * based on the specified number of initial shards. The function first gets a + * list of candidate nodes and issues DDL commands on the nodes to create empty + * shard placements on those nodes. The function then updates metadata on the + * master node to make this shard (and its placements) visible. Note that the + * function assumes the table is hash partitioned and calculates the min/max + * hash token ranges for each shard, giving them an equal split of the hash space. + */ +void +CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, + int32 replicationFactor) +{ char *relationOwner = NULL; - char shardStorageType = '\0'; + char shardStorageType = 0; List *workerNodeList = NIL; List *ddlCommandList = NIL; int32 workerNodeCount = 0; @@ -106,6 +118,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) existingShardList = LoadShardList(distributedTableId); if (existingShardList != NIL) { + char *tableName = get_rel_name(distributedTableId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("table \"%s\" has already had shards created for it", tableName))); @@ -156,22 +169,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } /* set shard storage type according to relation type */ - if (relationKind == RELKIND_FOREIGN_TABLE) - { - bool cstoreTable = CStoreTable(distributedTableId); - if (cstoreTable) - { - shardStorageType = SHARD_STORAGE_COLUMNAR; - } - else - { - shardStorageType = SHARD_STORAGE_FOREIGN; - } - } - else - { - shardStorageType = SHARD_STORAGE_TABLE; - } + shardStorageType = ShardStorageType(distributedTableId); for (shardIndex = 0; shardIndex < shardCount; shardIndex++) { @@ -182,8 +180,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) text *maxHashTokenText = NULL; int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); - Datum shardIdDatum = master_get_new_shardid(NULL); - int64 shardId = DatumGetInt64(shardIdDatum); + uint64 shardId = GetNextShardId(); /* if we are at the last shard, make sure the max token value is INT_MAX */ if (shardIndex == (shardCount - 1)) @@ -217,8 +214,104 @@ master_create_worker_shards(PG_FUNCTION_ARGS) } RESUME_INTERRUPTS(); +} - PG_RETURN_VOID(); + +/* + * CreateColocatedShards creates shards for the target relation colocated with + * the source relation. + */ +void +CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) +{ + char *targetTableRelationOwner = NULL; + char targetShardStorageType = 0; + List *existingShardList = NIL; + List *sourceShardIntervalList = NIL; + List *targetTableDDLEvents = NIL; + ListCell *sourceShardCell = NULL; + + /* make sure that tables are hash partitioned */ + CheckHashPartitionedTable(targetRelationId); + CheckHashPartitionedTable(sourceRelationId); + + /* + * In contrast to append/range partitioned tables it makes more sense to + * require ownership privileges - shards for hash-partitioned tables are + * only created once, not continually during ingest as for the other + * partitioning types. + */ + EnsureTableOwner(targetRelationId); + + /* we plan to add shards: get an exclusive metadata lock on the target relation */ + LockRelationDistributionMetadata(targetRelationId, ExclusiveLock); + + /* a share metadata lock is enough on the source relation */ + LockRelationDistributionMetadata(sourceRelationId, ShareLock); + + /* prevent placement changes of the source relation until we colocate with them */ + sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); + LockShardListMetadata(sourceShardIntervalList, ShareLock); + + /* validate that shards haven't already been created for this table */ + existingShardList = LoadShardList(targetRelationId); + if (existingShardList != NIL) + { + char *targetRelationName = get_rel_name(targetRelationId); + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("table \"%s\" has already had shards created for it", + targetRelationName))); + } + + targetTableRelationOwner = TableOwner(targetRelationId); + targetTableDDLEvents = GetTableDDLEvents(targetRelationId); + targetShardStorageType = ShardStorageType(targetRelationId); + + foreach(sourceShardCell, sourceShardIntervalList) + { + ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell); + uint64 sourceShardId = sourceShardInterval->shardId; + uint64 newShardId = GetNextShardId(); + ListCell *sourceShardPlacementCell = NULL; + + int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); + int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); + text *shardMinValueText = IntegerToText(shardMinValue); + text *shardMaxValueText = IntegerToText(shardMaxValue); + + List *sourceShardPlacementList = ShardPlacementList(sourceShardId); + foreach(sourceShardPlacementCell, sourceShardPlacementList) + { + ShardPlacement *sourcePlacement = + (ShardPlacement *) lfirst(sourceShardPlacementCell); + char *sourceNodeName = sourcePlacement->nodeName; + int32 sourceNodePort = sourcePlacement->nodePort; + + bool created = WorkerCreateShard(targetRelationId, sourceNodeName, + sourceNodePort, newShardId, + targetTableRelationOwner, + targetTableDDLEvents); + if (created) + { + const RelayFileState shardState = FILE_FINALIZED; + const uint64 shardSize = 0; + + InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState, + shardSize, sourceNodeName, sourceNodePort); + } + else + { + char *targetRelationName = get_rel_name(targetRelationId); + char *sourceRelationName = get_rel_name(sourceRelationId); + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("table \"%s\" could not be colocated with %s", + targetRelationName, sourceRelationName))); + } + } + + InsertShardRow(targetRelationId, newShardId, targetShardStorageType, + shardMinValueText, shardMaxValueText); + } } diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 625f9202a..830b56324 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -55,6 +55,7 @@ /* Shard related configuration */ +int ShardCount = 32; int ShardReplicationFactor = 2; /* desired replication factor for shards */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; @@ -93,8 +94,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS) HeapTuple metadataTuple = NULL; TupleDesc metadataDescriptor = NULL; uint64 shardMaxSizeInBytes = 0; - char relationType = 0; - char storageType = 0; + char shardStorageType = 0; Datum values[TABLE_METADATA_FIELDS]; bool isNulls[TABLE_METADATA_FIELDS]; @@ -121,26 +121,10 @@ master_get_table_metadata(PG_FUNCTION_ARGS) shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; /* get storage type */ - relationType = get_rel_relkind(relationId); - if (relationType == RELKIND_RELATION) - { - storageType = SHARD_STORAGE_TABLE; - } - else if (relationType == RELKIND_FOREIGN_TABLE) - { - bool cstoreTable = CStoreTable(relationId); - if (cstoreTable) - { - storageType = SHARD_STORAGE_COLUMNAR; - } - else - { - storageType = SHARD_STORAGE_FOREIGN; - } - } + shardStorageType = ShardStorageType(relationId); values[0] = ObjectIdGetDatum(relationId); - values[1] = storageType; + values[1] = shardStorageType; values[2] = partitionEntry->partitionMethod; values[3] = partitionKey; values[4] = Int32GetDatum(ShardReplicationFactor); @@ -247,12 +231,8 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) /* - * master_get_new_shardid allocates and returns a unique shardId for the shard - * to be created. This allocation occurs both in shared memory and in write - * ahead logs; writing to logs avoids the risk of having shardId collisions. - * - * Please note that the caller is still responsible for finalizing shard data - * and the shardId with the master node. + * master_get_new_shardid is a user facing wrapper function around GetNextShardId() + * which allocates and returns a unique shardId for the shard to be created. * * NB: This can be called by any user; for now we have decided that that's * ok. We might want to restrict this to users part of a specific role or such @@ -260,6 +240,24 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) */ Datum master_get_new_shardid(PG_FUNCTION_ARGS) +{ + uint64 shardId = GetNextShardId(); + Datum shardIdDatum = Int64GetDatum(shardId); + + PG_RETURN_DATUM(shardIdDatum); +} + + +/* + * GetNextShardId allocates and returns a unique shardId for the shard to be + * created. This allocation occurs both in shared memory and in write ahead + * logs; writing to logs avoids the risk of having shardId collisions. + * + * Please note that the caller is still responsible for finalizing shard data + * and the shardId with the master node. + */ +uint64 +GetNextShardId() { text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); @@ -267,6 +265,7 @@ master_get_new_shardid(PG_FUNCTION_ARGS) Oid savedUserId = InvalidOid; int savedSecurityContext = 0; Datum shardIdDatum = 0; + uint64 shardId = 0; GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); @@ -276,7 +275,9 @@ master_get_new_shardid(PG_FUNCTION_ARGS) SetUserIdAndSecContext(savedUserId, savedSecurityContext); - PG_RETURN_DATUM(shardIdDatum); + shardId = DatumGetInt64(shardIdDatum); + + return shardId; } @@ -729,6 +730,41 @@ GetTableDDLEvents(Oid relationId) } +/* + * ShardStorageType returns the shard storage type according to relation type. + */ +char +ShardStorageType(Oid relationId) +{ + char shardStorageType = 0; + + char relationType = get_rel_relkind(relationId); + if (relationType == RELKIND_RELATION) + { + shardStorageType = SHARD_STORAGE_TABLE; + } + else if (relationType == RELKIND_FOREIGN_TABLE) + { + bool cstoreTable = CStoreTable(relationId); + if (cstoreTable) + { + shardStorageType = SHARD_STORAGE_COLUMNAR; + } + else + { + shardStorageType = SHARD_STORAGE_FOREIGN; + } + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unexpected relation type: %c", relationType))); + } + + return shardStorageType; +} + + /* * WorkerNodeGetDatum converts the worker node passed to it into its datum * representation. To do this, the function first creates the heap tuple from diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index fa916c10a..b162838a3 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -41,8 +41,6 @@ /* Local functions forward declarations */ -static bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, - uint64 shardId, char *newShardOwner, List *ddlCommandList); static bool WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardName, uint64 *shardSize, text **shardMinValue, text **shardMaxValue); @@ -66,8 +64,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) text *relationNameText = PG_GETARG_TEXT_P(0); char *relationName = text_to_cstring(relationNameText); List *workerNodeList = WorkerNodeList(); - Datum shardIdDatum = 0; - int64 shardId = INVALID_SHARD_ID; + uint64 shardId = INVALID_SHARD_ID; List *ddlEventList = NULL; uint32 attemptableNodeCount = 0; uint32 liveNodeCount = 0; @@ -116,8 +113,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) } /* generate new and unique shardId from sequence */ - shardIdDatum = master_get_new_shardid(NULL); - shardId = DatumGetInt64(shardIdDatum); + shardId = GetNextShardId(); /* get table DDL commands to replay on the worker node */ ddlEventList = GetTableDDLEvents(relationId); @@ -426,7 +422,7 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, * shard on the worker node. Note that this function opens a new connection for * each DDL command, and could leave the shard in an half-initialized state. */ -static bool +bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, uint64 shardId, char *newShardOwner, List *ddlCommandList) { diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 5980b6425..11f794e0c 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -215,14 +215,14 @@ DistributionCreateCommand(DistTableCacheEntry *cacheEntry) char *qualifiedRelationName = generate_qualified_relation_name(relationId); char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString); - uint64 colocationId = cacheEntry->colocationId; + uint32 colocationId = cacheEntry->colocationId; char replicationModel = cacheEntry->replicationModel; appendStringInfo(insertDistributionCommand, "INSERT INTO pg_dist_partition " "(logicalrelid, partmethod, partkey, colocationid, repmodel) " "VALUES " - "(%s::regclass, '%c', column_name_to_column(%s,%s), %lu, '%c')", + "(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')", quote_literal_cstr(qualifiedRelationName), distributionMethod, quote_literal_cstr(qualifiedRelationName), diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 149cd3d13..8d8a9b125 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -332,6 +332,17 @@ RegisterCitusConfigVariables(void) 0, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.shard_count", + gettext_noop("Sets the number of shards for a new hash-partitioned table" + "created with create_distributed_table()."), + NULL, + &ShardCount, + 32, 1, 64000, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.shard_replication_factor", gettext_noop("Sets the replication factor for shards."), diff --git a/src/backend/distributed/test/colocation_utils.c b/src/backend/distributed/test/colocation_utils.c index 1e538edc7..ad83d5304 100644 --- a/src/backend/distributed/test/colocation_utils.c +++ b/src/backend/distributed/test/colocation_utils.c @@ -35,7 +35,7 @@ Datum get_table_colocation_id(PG_FUNCTION_ARGS) { Oid distributedTableId = PG_GETARG_OID(0); - int colocationId = TableColocationId(distributedTableId); + uint32 colocationId = TableColocationId(distributedTableId); PG_RETURN_INT32(colocationId); } diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 5ec894774..ec9496e42 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -224,8 +224,7 @@ create_monolithic_shard_row(PG_FUNCTION_ARGS) Oid distributedTableId = PG_GETARG_OID(0); StringInfo minInfo = makeStringInfo(); StringInfo maxInfo = makeStringInfo(); - Datum newShardIdDatum = master_get_new_shardid(NULL); - int64 newShardId = DatumGetInt64(newShardIdDatum); + uint64 newShardId = GetNextShardId(); text *maxInfoText = NULL; text *minInfoText = NULL; diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 776fbe089..f74791513 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -23,10 +23,10 @@ /* - * TableColocationId function returns co-location id of given table. This function errors - * out if given table is not distributed. + * TableColocationId function returns co-location id of given table. This function + * errors out if given table is not distributed. */ -uint64 +uint32 TableColocationId(Oid distributedTableId) { DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); @@ -44,8 +44,8 @@ TableColocationId(Oid distributedTableId) bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId) { - uint64 leftColocationId = INVALID_COLOCATION_ID; - uint64 rightColocationId = INVALID_COLOCATION_ID; + uint32 leftColocationId = INVALID_COLOCATION_ID; + uint32 rightColocationId = INVALID_COLOCATION_ID; if (leftDistributedTableId == rightDistributedTableId) { @@ -112,7 +112,7 @@ ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInter List * ColocatedTableList(Oid distributedTableId) { - int tableColocationId = TableColocationId(distributedTableId); + uint32 tableColocationId = TableColocationId(distributedTableId); List *colocatedTableList = NIL; Relation pgDistPartition = NULL; @@ -134,7 +134,7 @@ ColocatedTableList(Oid distributedTableId) } ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, - BTEqualStrategyNumber, F_INT8EQ, ObjectIdGetDatum(tableColocationId)); + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId)); pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock); tupleDescriptor = RelationGetDescr(pgDistPartition); @@ -219,3 +219,44 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) return colocatedShardList; } + + +/* + * ColocatedTableId returns an arbitrary table which belongs to given colocation + * group. If there is not such a colocation group, it returns invalid oid. + */ +Oid +ColocatedTableId(Oid colocationId) +{ + Oid colocatedTableId = InvalidOid; + Relation pgDistPartition = NULL; + TupleDesc tupleDescriptor = NULL; + SysScanDesc scanDescriptor = NULL; + HeapTuple heapTuple = NULL; + bool indexOK = true; + bool isNull = false; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid, + BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId)); + + /* prevent DELETE statements */ + pgDistPartition = heap_open(DistPartitionRelationId(), ShareLock); + tupleDescriptor = RelationGetDescr(pgDistPartition); + scanDescriptor = systable_beginscan(pgDistPartition, + DistPartitionColocationidIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + colocatedTableId = heap_getattr(heapTuple, Anum_pg_dist_partition_logicalrelid, + tupleDescriptor, &isNull); + } + + systable_endscan(scanDescriptor); + heap_close(pgDistPartition, ShareLock); + + return colocatedTableId; +} diff --git a/src/backend/distributed/utils/distribution_column.c b/src/backend/distributed/utils/distribution_column.c index 8a351c016..e5ecdce5f 100644 --- a/src/backend/distributed/utils/distribution_column.c +++ b/src/backend/distributed/utils/distribution_column.c @@ -57,7 +57,7 @@ column_name_to_column(PG_FUNCTION_ARGS) relation = relation_open(relationId, AccessShareLock); - column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName); + column = BuildDistributionKeyFromColumnName(relation, columnName); columnNodeString = nodeToString(column); columnNodeText = cstring_to_text(columnNodeString); @@ -83,7 +83,7 @@ column_name_to_column_id(PG_FUNCTION_ARGS) relation = relation_open(distributedTableId, AccessExclusiveLock); - column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName); + column = BuildDistributionKeyFromColumnName(relation, columnName); relation_close(relation, NoLock); @@ -121,12 +121,12 @@ column_to_column_name(PG_FUNCTION_ARGS) * specified column does not exist or is not suitable to be used as a * distribution column. */ -Node * +Var * BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName) { HeapTuple columnTuple = NULL; Form_pg_attribute columnForm = NULL; - Var *column = NULL; + Var *distributionColumn = NULL; char *tableName = RelationGetRelationName(distributedRelation); /* it'd probably better to downcase identifiers consistent with SQL case folding */ @@ -153,12 +153,12 @@ BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnNam } /* build Var referencing only the chosen distribution column */ - column = makeVar(1, columnForm->attnum, columnForm->atttypid, - columnForm->atttypmod, columnForm->attcollation, 0); + distributionColumn = makeVar(1, columnForm->attnum, columnForm->atttypid, + columnForm->atttypmod, columnForm->attcollation, 0); ReleaseSysCache(columnTuple); - return (Node *) column; + return distributionColumn; } diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index e276d0a7b..8ae60fd89 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -55,6 +55,8 @@ static Oid distShardRelationId = InvalidOid; static Oid distShardPlacementRelationId = InvalidOid; static Oid distNodeRelationId = InvalidOid; static Oid distLocalGroupRelationId = InvalidOid; +static Oid distColocationRelationId = InvalidOid; +static Oid distColocationConfigurationIndexId = InvalidOid; static Oid distPartitionRelationId = InvalidOid; static Oid distPartitionLogicalRelidIndexId = InvalidOid; static Oid distPartitionColocationidIndexId = InvalidOid; @@ -274,7 +276,7 @@ LookupDistTableCacheEntry(Oid relationId) HeapTuple distPartitionTuple = NULL; char *partitionKeyString = NULL; char partitionMethod = 0; - uint64 colocationId = INVALID_COLOCATION_ID; + uint32 colocationId = INVALID_COLOCATION_ID; char replicationModel = 0; List *distShardTupleList = NIL; int shardIntervalArrayLength = 0; @@ -690,6 +692,27 @@ DistLocalGroupIdRelationId(void) } +/* return oid of pg_dist_colocation relation */ +Oid +DistColocationRelationId(void) +{ + CachedRelationLookup("pg_dist_colocation", &distColocationRelationId); + + return distColocationRelationId; +} + + +/* return oid of pg_dist_colocation_configuration_index index */ +Oid +DistColocationConfigurationIndexId(void) +{ + CachedRelationLookup("pg_dist_colocation_configuration_index", + &distColocationConfigurationIndexId); + + return distColocationConfigurationIndexId; +} + + /* return oid of pg_dist_partition relation */ Oid DistPartitionRelationId(void) @@ -1419,6 +1442,8 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardPlacementRelationId = InvalidOid; distLocalGroupRelationId = InvalidOid; distNodeRelationId = InvalidOid; + distColocationRelationId = InvalidOid; + distColocationConfigurationIndexId = InvalidOid; distPartitionRelationId = InvalidOid; distPartitionLogicalRelidIndexId = InvalidOid; distPartitionColocationidIndexId = InvalidOid; diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index bd983ed43..4ca8b54e1 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -17,11 +17,13 @@ #define INVALID_COLOCATION_ID 0 -extern uint64 TableColocationId(Oid distributedTableId); +extern uint32 TableColocationId(Oid distributedTableId); extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId); extern bool ShardsColocated(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); +extern Oid ColocatedTableId(Oid colocationId); + #endif /* COLOCATION_UTILS_H_ */ diff --git a/src/include/distributed/distribution_column.h b/src/include/distributed/distribution_column.h index d8b7aadc2..63d34cea6 100644 --- a/src/include/distributed/distribution_column.h +++ b/src/include/distributed/distribution_column.h @@ -19,8 +19,8 @@ /* Remaining metadata utility functions */ -extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation, - char *columnName); +extern Var * BuildDistributionKeyFromColumnName(Relation distributedRelation, + char *columnName); extern char * ColumnNameToColumn(Oid relationId, char *columnNodeString); #endif /* DISTRIBUTION_COLUMN_H */ diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 18c5451a4..77161e5e7 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -82,6 +82,7 @@ typedef enum /* Config variables managed via guc.c */ +extern int ShardCount; extern int ShardReplicationFactor; extern int ShardMaxSize; extern int ShardPlacementPolicy; @@ -89,13 +90,20 @@ extern int ShardPlacementPolicy; /* Function declarations local to the distributed module */ extern bool CStoreTable(Oid relationId); +extern uint64 GetNextShardId(void); extern Oid ResolveRelationId(text *relationName); extern List * GetTableDDLEvents(Oid relationId); +extern char ShardStorageType(Oid relationId); extern void CheckDistributedTable(Oid relationId); extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, char *newPlacementOwner, List *workerNodeList, int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); +extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, + int32 replicationFactor); +extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); +extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, + uint64 shardId, char *newShardOwner, List *ddlCommandList); /* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 35d7e835f..3e64c80c5 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -40,7 +40,7 @@ typedef struct /* pg_dist_partition metadata for this table */ char *partitionKeyString; char partitionMethod; - uint64 colocationId; + uint32 colocationId; char replicationModel; /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ @@ -66,6 +66,8 @@ extern bool CitusHasBeenLoaded(void); extern HTAB * GetWorkerNodeHash(void); /* relation oids */ +extern Oid DistColocationRelationId(void); +extern Oid DistColocationConfigurationIndexId(void); extern Oid DistPartitionRelationId(void); extern Oid DistShardRelationId(void); extern Oid DistShardPlacementRelationId(void); diff --git a/src/include/distributed/pg_dist_colocation.h b/src/include/distributed/pg_dist_colocation.h new file mode 100644 index 000000000..985c1332d --- /dev/null +++ b/src/include/distributed/pg_dist_colocation.h @@ -0,0 +1,47 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_colocation.h + * definition of the relation that holds the colocation information on the + * cluster (pg_dist_colocation). + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_COLOCATION_H +#define PG_DIST_COLOCATION_H + +/* ---------------- + * pg_dist_colocation definition. + * ---------------- + */ +typedef struct FormData_pg_dist_colocation +{ + uint32 colocationid; + uint32 shardcount; + uint32 replicationfactor; + Oid distributioncolumntype; +} FormData_pg_dist_colocation; + +/* ---------------- + * Form_pg_dist_colocation corresponds to a pointer to a tuple with + * the format of pg_dist_colocation relation. + * ---------------- + */ +typedef FormData_pg_dist_colocation *Form_pg_dist_colocation; + +/* ---------------- + * compiler constants for pg_dist_colocation + * ---------------- + */ +#define Natts_pg_dist_colocation 4 +#define Anum_pg_dist_colocation_colocationid 1 +#define Anum_pg_dist_colocation_shardcount 2 +#define Anum_pg_dist_colocation_replicationfactor 3 +#define Anum_pg_dist_colocation_distributioncolumntype 4 + +#define COLOCATIONID_SEQUENCE_NAME "pg_dist_colocationid_seq" + + +#endif /* PG_DIST_COLOCATION_H */ diff --git a/src/include/distributed/pg_dist_partition.h b/src/include/distributed/pg_dist_partition.h index b80b89fcb..11e2063c1 100644 --- a/src/include/distributed/pg_dist_partition.h +++ b/src/include/distributed/pg_dist_partition.h @@ -25,7 +25,7 @@ typedef struct FormData_pg_dist_partition char partmethod; /* partition method; see codes below */ #ifdef CATALOG_VARLEN /* variable-length fields start here */ text partkey; /* partition key expression */ - uint64 colocationid; /* id of the co-location group of particular table belongs to */ + uint32 colocationid; /* id of the co-location group of particular table belongs to */ char repmodel; /* replication model; see codes below */ #endif } FormData_pg_dist_partition; diff --git a/src/test/regress/expected/multi_colocated_shard_transfer.out b/src/test/regress/expected/multi_colocated_shard_transfer.out index dc7c7e125..0cda8e584 100644 --- a/src/test/regress/expected/multi_colocated_shard_transfer.out +++ b/src/test/regress/expected/multi_colocated_shard_transfer.out @@ -19,22 +19,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 3 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 3 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- repair colocated shards @@ -55,22 +55,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 1 - 1300000 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 1 + 1300000 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- test repairing NOT colocated shard @@ -179,22 +179,22 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 3 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 3 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) -- repair while all placements of one shard in colocation group is unhealthy @@ -211,21 +211,21 @@ WHERE ORDER BY s.shardid; shardid | logicalrelid | nodeport | colocationid | shardstate ---------+---------------+----------+--------------+------------ - 1300000 | table1_group1 | 57638 | 1 | 3 - 1300000 | table1_group1 | 57637 | 1 | 3 - 1300001 | table1_group1 | 57637 | 1 | 1 - 1300001 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57638 | 1 | 1 - 1300002 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57637 | 1 | 1 - 1300003 | table1_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57638 | 1 | 1 - 1300004 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57637 | 1 | 1 - 1300005 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57638 | 1 | 1 - 1300006 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57637 | 1 | 1 - 1300007 | table2_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57638 | 1000 | 3 + 1300000 | table1_group1 | 57637 | 1000 | 3 + 1300001 | table1_group1 | 57637 | 1000 | 1 + 1300001 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57638 | 1000 | 1 + 1300002 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57637 | 1000 | 1 + 1300003 | table1_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57638 | 1000 | 1 + 1300004 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57637 | 1000 | 1 + 1300005 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57638 | 1000 | 1 + 1300006 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57637 | 1000 | 1 + 1300007 | table2_group1 | 57638 | 1000 | 1 (16 rows) diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 1a15044b9..fdff3970c 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -4,7 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; -- create test utility function -- =================================================================== CREATE SEQUENCE colocation_test_seq - MINVALUE 1 + MINVALUE 1000 NO CYCLE; /* a very simple UDF that only sets the colocation ids the same * DO NOT USE THIS FUNCTION IN PRODUCTION. It manually sets colocationid column of @@ -14,7 +14,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass RETURNS BOOL LANGUAGE plpgsql AS $colocate_tables$ -DECLARE nextid BIGINT; +DECLARE nextid INTEGER; BEGIN SELECT nextval('colocation_test_seq') INTO nextid; @@ -37,7 +37,7 @@ $colocate_tables$; -- create test functions -- =================================================================== CREATE FUNCTION get_table_colocation_id(regclass) - RETURNS BIGINT + RETURNS INTEGER AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION tables_colocated(regclass, regclass) @@ -155,7 +155,7 @@ SELECT colocation_test_colocate_tables('table1_group1', 'table2_group1'); SELECT get_table_colocation_id('table1_group1'); get_table_colocation_id ------------------------- - 1 + 1000 (1 row) SELECT get_table_colocation_id('table5_groupX'); @@ -189,7 +189,7 @@ SELECT tables_colocated('table6_append', 'table6_append'); t (1 row) --- check table co-location with same co-location group +-- check table co-location with same co-location group SELECT tables_colocated('table1_group1', 'table2_group1'); tables_colocated ------------------ @@ -339,3 +339,303 @@ SELECT find_shard_interval_index(1300016); 0 (1 row) +-- check external colocation API +SET citus.shard_count = 2; +CREATE TABLE table1_groupA ( id int ); +SELECT create_distributed_table('table1_groupA', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupA ( id int ); +SELECT create_distributed_table('table2_groupA', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- change shard replication factor +SET citus.shard_replication_factor = 1; +CREATE TABLE table1_groupB ( id int ); +SELECT create_distributed_table('table1_groupB', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupB ( id int ); +SELECT create_distributed_table('table2_groupB', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- revert back to default shard replication factor +SET citus.shard_replication_factor to DEFAULT; +-- change partition column type +CREATE TABLE table1_groupC ( id text ); +SELECT create_distributed_table('table1_groupC', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupC ( id text ); +SELECT create_distributed_table('table2_groupC', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- change shard count +SET citus.shard_count = 4; +CREATE TABLE table1_groupD ( id int ); +SELECT create_distributed_table('table1_groupD', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupD ( id int ); +SELECT create_distributed_table('table2_groupD', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- try other distribution methods +CREATE TABLE table_append ( id int ); +SELECT create_distributed_table('table_append', 'id', 'append'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table_range ( id int ); +SELECT create_distributed_table('table_range', 'id', 'range'); + create_distributed_table +-------------------------- + +(1 row) + +-- test foreign table creation +CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; +SELECT create_distributed_table('table3_groupD', 'id'); +NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined + create_distributed_table +-------------------------- + +(1 row) + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1 | 2 | 2 | 23 + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 +(4 rows) + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + logicalrelid | colocationid +---------------+-------------- + table1_groupa | 1 + table2_groupa | 1 + table1_groupb | 2 + table2_groupb | 2 + table1_groupc | 3 + table2_groupc | 3 + table1_groupd | 4 + table2_groupd | 4 + table3_groupd | 4 +(9 rows) + +-- check effects of dropping tables +DROP TABLE table1_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 1 | 2 | 2 | 23 +(1 row) + +-- dropping all tables in a colocation group also deletes the colocation group +DROP TABLE table2_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ +(0 rows) + +-- create dropped colocation group again +SET citus.shard_count = 2; +CREATE TABLE table1_groupE ( id int ); +SELECT create_distributed_table('table1_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE table2_groupE ( id int ); +SELECT create_distributed_table('table2_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- test different table DDL +CREATE TABLE table3_groupE ( dummy_column text, id int ); +SELECT create_distributed_table('table3_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- test different schema +CREATE SCHEMA schema_collocation; +CREATE TABLE schema_collocation.table4_groupE ( id int ); +SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +-- check worker table schemas +\c - - - :worker_1_port +\d table3_groupE_1300050 +Table "public.table3_groupe_1300050" + Column | Type | Modifiers +--------------+---------+----------- + dummy_column | text | + id | integer | + +\d schema_collocation.table4_groupE_1300052 +Table "schema_collocation.table4_groupe_1300052" + Column | Type | Modifiers +--------+---------+----------- + id | integer | + +\c - - - :master_port +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + colocationid | shardcount | replicationfactor | distributioncolumntype +--------------+------------+-------------------+------------------------ + 2 | 2 | 1 | 23 + 3 | 2 | 2 | 25 + 4 | 4 | 2 | 23 + 5 | 2 | 2 | 23 +(4 rows) + +-- cross check with internal colocation API +SELECT + p1.logicalrelid::regclass AS table1, + p2.logicalrelid::regclass AS table2, + tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated +FROM + pg_dist_partition p1, + pg_dist_partition p2 +WHERE + p1.logicalrelid < p2.logicalrelid AND + p1.colocationid != 0 AND + p2.colocationid != 0 AND + tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE +ORDER BY + table1, + table2; + table1 | table2 | colocated +---------------+----------------------------------+----------- + table1_group1 | table2_group1 | t + table1_groupb | table2_groupb | t + table1_groupc | table2_groupc | t + table1_groupd | table2_groupd | t + table1_groupd | table3_groupd | t + table2_groupd | table3_groupd | t + table1_groupe | table2_groupe | t + table1_groupe | table3_groupe | t + table1_groupe | schema_collocation.table4_groupe | t + table2_groupe | table3_groupe | t + table2_groupe | schema_collocation.table4_groupe | t + table3_groupe | schema_collocation.table4_groupe | t +(12 rows) + +-- check created shards +SELECT + logicalrelid, + pg_dist_shard.shardid AS shardid, + shardstorage, + nodeport, + shardminvalue, + shardmaxvalue +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + pg_dist_shard.shardid >= 1300026 +ORDER BY + logicalrelid, + shardmaxvalue::integer, + shardid, + placementid; + logicalrelid | shardid | shardstorage | nodeport | shardminvalue | shardmaxvalue +----------------------------------+---------+--------------+----------+---------------+--------------- + table1_groupb | 1300026 | t | 57637 | -2147483648 | -1 + table1_groupb | 1300027 | t | 57638 | 0 | 2147483647 + table2_groupb | 1300028 | t | 57637 | -2147483648 | -1 + table2_groupb | 1300029 | t | 57638 | 0 | 2147483647 + table1_groupc | 1300030 | t | 57637 | -2147483648 | -1 + table1_groupc | 1300030 | t | 57638 | -2147483648 | -1 + table1_groupc | 1300031 | t | 57638 | 0 | 2147483647 + table1_groupc | 1300031 | t | 57637 | 0 | 2147483647 + table2_groupc | 1300032 | t | 57638 | -2147483648 | -1 + table2_groupc | 1300032 | t | 57637 | -2147483648 | -1 + table2_groupc | 1300033 | t | 57637 | 0 | 2147483647 + table2_groupc | 1300033 | t | 57638 | 0 | 2147483647 + table1_groupd | 1300034 | t | 57637 | -2147483648 | -1073741825 + table1_groupd | 1300034 | t | 57638 | -2147483648 | -1073741825 + table1_groupd | 1300035 | t | 57638 | -1073741824 | -1 + table1_groupd | 1300035 | t | 57637 | -1073741824 | -1 + table1_groupd | 1300036 | t | 57637 | 0 | 1073741823 + table1_groupd | 1300036 | t | 57638 | 0 | 1073741823 + table1_groupd | 1300037 | t | 57638 | 1073741824 | 2147483647 + table1_groupd | 1300037 | t | 57637 | 1073741824 | 2147483647 + table2_groupd | 1300038 | t | 57638 | -2147483648 | -1073741825 + table2_groupd | 1300038 | t | 57637 | -2147483648 | -1073741825 + table2_groupd | 1300039 | t | 57637 | -1073741824 | -1 + table2_groupd | 1300039 | t | 57638 | -1073741824 | -1 + table2_groupd | 1300040 | t | 57638 | 0 | 1073741823 + table2_groupd | 1300040 | t | 57637 | 0 | 1073741823 + table2_groupd | 1300041 | t | 57637 | 1073741824 | 2147483647 + table2_groupd | 1300041 | t | 57638 | 1073741824 | 2147483647 + table3_groupd | 1300042 | f | 57637 | -2147483648 | -1073741825 + table3_groupd | 1300042 | f | 57638 | -2147483648 | -1073741825 + table3_groupd | 1300043 | f | 57638 | -1073741824 | -1 + table3_groupd | 1300043 | f | 57637 | -1073741824 | -1 + table3_groupd | 1300044 | f | 57637 | 0 | 1073741823 + table3_groupd | 1300044 | f | 57638 | 0 | 1073741823 + table3_groupd | 1300045 | f | 57638 | 1073741824 | 2147483647 + table3_groupd | 1300045 | f | 57637 | 1073741824 | 2147483647 + table1_groupe | 1300046 | t | 57637 | -2147483648 | -1 + table1_groupe | 1300046 | t | 57638 | -2147483648 | -1 + table1_groupe | 1300047 | t | 57638 | 0 | 2147483647 + table1_groupe | 1300047 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300048 | t | 57638 | -2147483648 | -1 + table2_groupe | 1300048 | t | 57637 | -2147483648 | -1 + table2_groupe | 1300049 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300049 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300050 | t | 57637 | -2147483648 | -1 + table3_groupe | 1300050 | t | 57638 | -2147483648 | -1 + table3_groupe | 1300051 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300051 | t | 57637 | 0 | 2147483647 + schema_collocation.table4_groupe | 1300052 | t | 57638 | -2147483648 | -1 + schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 + schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 + schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 +(52 rows) + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 7ff35d2ee..ec7a73d5f 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -36,6 +36,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; +ALTER EXTENSION citus UPDATE TO '6.0-11'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 93684c6b5..803280f5d 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; -- =================================================================== CREATE SEQUENCE colocation_test_seq - MINVALUE 1 + MINVALUE 1000 NO CYCLE; /* a very simple UDF that only sets the colocation ids the same @@ -18,7 +18,7 @@ CREATE OR REPLACE FUNCTION colocation_test_colocate_tables(source_table regclass RETURNS BOOL LANGUAGE plpgsql AS $colocate_tables$ -DECLARE nextid BIGINT; +DECLARE nextid INTEGER; BEGIN SELECT nextval('colocation_test_seq') INTO nextid; @@ -43,7 +43,7 @@ $colocate_tables$; -- =================================================================== CREATE FUNCTION get_table_colocation_id(regclass) - RETURNS BIGINT + RETURNS INTEGER AS 'citus' LANGUAGE C STRICT; @@ -110,7 +110,7 @@ SELECT tables_colocated('table1_group1', 'table1_group1'); SELECT tables_colocated('table5_groupX', 'table5_groupX'); SELECT tables_colocated('table6_append', 'table6_append'); --- check table co-location with same co-location group +-- check table co-location with same co-location group SELECT tables_colocated('table1_group1', 'table2_group1'); -- check table co-location with different co-location group @@ -154,3 +154,138 @@ SELECT find_shard_interval_index(1300001); SELECT find_shard_interval_index(1300002); SELECT find_shard_interval_index(1300003); SELECT find_shard_interval_index(1300016); + + +-- check external colocation API + +SET citus.shard_count = 2; + +CREATE TABLE table1_groupA ( id int ); +SELECT create_distributed_table('table1_groupA', 'id'); + +CREATE TABLE table2_groupA ( id int ); +SELECT create_distributed_table('table2_groupA', 'id'); + +-- change shard replication factor +SET citus.shard_replication_factor = 1; + +CREATE TABLE table1_groupB ( id int ); +SELECT create_distributed_table('table1_groupB', 'id'); + +CREATE TABLE table2_groupB ( id int ); +SELECT create_distributed_table('table2_groupB', 'id'); + +-- revert back to default shard replication factor +SET citus.shard_replication_factor to DEFAULT; + +-- change partition column type +CREATE TABLE table1_groupC ( id text ); +SELECT create_distributed_table('table1_groupC', 'id'); + +CREATE TABLE table2_groupC ( id text ); +SELECT create_distributed_table('table2_groupC', 'id'); + +-- change shard count +SET citus.shard_count = 4; + +CREATE TABLE table1_groupD ( id int ); +SELECT create_distributed_table('table1_groupD', 'id'); + +CREATE TABLE table2_groupD ( id int ); +SELECT create_distributed_table('table2_groupD', 'id'); + +-- try other distribution methods +CREATE TABLE table_append ( id int ); +SELECT create_distributed_table('table_append', 'id', 'append'); + +CREATE TABLE table_range ( id int ); +SELECT create_distributed_table('table_range', 'id', 'range'); + +-- test foreign table creation +CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; +SELECT create_distributed_table('table3_groupD', 'id'); + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +SELECT logicalrelid, colocationid FROM pg_dist_partition + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +-- check effects of dropping tables +DROP TABLE table1_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + +-- dropping all tables in a colocation group also deletes the colocation group +DROP TABLE table2_groupA; +SELECT * FROM pg_dist_colocation WHERE colocationid = 1; + +-- create dropped colocation group again +SET citus.shard_count = 2; + +CREATE TABLE table1_groupE ( id int ); +SELECT create_distributed_table('table1_groupE', 'id'); + +CREATE TABLE table2_groupE ( id int ); +SELECT create_distributed_table('table2_groupE', 'id'); + +-- test different table DDL +CREATE TABLE table3_groupE ( dummy_column text, id int ); +SELECT create_distributed_table('table3_groupE', 'id'); + +-- test different schema +CREATE SCHEMA schema_collocation; + +CREATE TABLE schema_collocation.table4_groupE ( id int ); +SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); + +-- check worker table schemas +\c - - - :worker_1_port +\d table3_groupE_1300050 +\d schema_collocation.table4_groupE_1300052 + +\c - - - :master_port + +-- check metadata +SELECT * FROM pg_dist_colocation + WHERE colocationid >= 1 AND colocationid < 1000 + ORDER BY colocationid; + +-- cross check with internal colocation API +SELECT + p1.logicalrelid::regclass AS table1, + p2.logicalrelid::regclass AS table2, + tables_colocated(p1.logicalrelid , p2.logicalrelid) AS colocated +FROM + pg_dist_partition p1, + pg_dist_partition p2 +WHERE + p1.logicalrelid < p2.logicalrelid AND + p1.colocationid != 0 AND + p2.colocationid != 0 AND + tables_colocated(p1.logicalrelid , p2.logicalrelid) is TRUE +ORDER BY + table1, + table2; + +-- check created shards +SELECT + logicalrelid, + pg_dist_shard.shardid AS shardid, + shardstorage, + nodeport, + shardminvalue, + shardmaxvalue +FROM + pg_dist_shard, + pg_dist_shard_placement +WHERE + pg_dist_shard.shardid = pg_dist_shard_placement.shardid AND + pg_dist_shard.shardid >= 1300026 +ORDER BY + logicalrelid, + shardmaxvalue::integer, + shardid, + placementid; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index bf9a0e7c4..a7aaa0eef 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -41,6 +41,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-7'; ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; +ALTER EXTENSION citus UPDATE TO '6.0-11'; -- drop extension an re-create in newest version DROP EXTENSION citus;