diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index beb90d024..564b3f702 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -80,6 +80,8 @@ $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sq cat $^ > $@ $(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql cat $^ > $@ +$(EXTENSION)--6.0-12.sql: $(EXTENSION)--6.0-11.sql $(EXTENSION)--6.0-11--6.0-12.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-11--6.0-12.sql b/src/backend/distributed/citus--6.0-11--6.0-12.sql new file mode 100644 index 000000000..be050dac7 --- /dev/null +++ b/src/backend/distributed/citus--6.0-11--6.0-12.sql @@ -0,0 +1,12 @@ +/* citus--6.0-11--6.0-12.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION create_reference_table(table_name regclass) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_reference_table$$; +COMMENT ON FUNCTION create_reference_table(table_name regclass) + IS 'create a distributed reference table'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 726af25ca..b2deb379c 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-11' +default_version = '6.0-12' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 3ac7a5f75..1fe76da3d 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -55,9 +55,8 @@ /* local function forward declarations */ -static void ConvertToDistributedTable(Oid relationId, - text *distributionColumnText, - Oid distributionMethodOid, uint32 colocationId); +static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, + char distributionMethod, uint32 colocationId); static char LookupDistributionMethod(Oid distributionMethodOid); static void RecordDistributedRelationDependencies(Oid distributedRelationId, Node *distributionKey); @@ -74,11 +73,14 @@ static uint32 ColocationId(int shardCount, int replicationFactor, static uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType); static uint32 GetNextColocationId(void); +static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + int shardCount, int replicationFactor); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table); +PG_FUNCTION_INFO_V1(create_reference_table); /* @@ -95,8 +97,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS) text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - ConvertToDistributedTable(distributedRelationId, distributionColumnText, - distributionMethodOid, INVALID_COLOCATION_ID); + char *distributionColumnName = text_to_cstring(distributionColumnText); + char distributionMethod = LookupDistributionMethod(distributionMethodOid); + + ConvertToDistributedTable(distributedRelationId, distributionColumnName, + distributionMethod, INVALID_COLOCATION_ID); + PG_RETURN_VOID(); } @@ -112,71 +118,48 @@ create_distributed_table(PG_FUNCTION_ARGS) text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); - Relation distributedRelation = NULL; - Relation pgDistColocation = NULL; - Var *distributionColumn = NULL; - char *distributionColumnName = NULL; - int distributionColumnType = 0; - uint32 colocationId = INVALID_COLOCATION_ID; + char *distributionColumnName = text_to_cstring(distributionColumnText); + char distributionMethod = LookupDistributionMethod(distributionMethodOid); /* if distribution method is not hash, just create partition metadata */ - char distributionMethod = LookupDistributionMethod(distributionMethodOid); if (distributionMethod != DISTRIBUTE_BY_HASH) { - ConvertToDistributedTable(relationId, distributionColumnText, - distributionMethodOid, INVALID_COLOCATION_ID); + ConvertToDistributedTable(relationId, distributionColumnName, + distributionMethod, INVALID_COLOCATION_ID); PG_RETURN_VOID(); } - /* get distribution column type */ - distributionColumnName = text_to_cstring(distributionColumnText); - distributedRelation = relation_open(relationId, AccessShareLock); - distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, - distributionColumnName); - distributionColumnType = distributionColumn->vartype; + /* use configuration values for shard count and shard replication factor*/ + CreateHashDistributedTable(relationId, distributionColumnName, ShardCount, + ShardReplicationFactor); - /* - * Get an exclusive lock on the colocation system catalog. Therefore, we - * can be sure that there will no modifications on the colocation table - * until this transaction is committed. - */ - pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + PG_RETURN_VOID(); +} - /* check for existing colocations */ - colocationId = ColocationId(ShardCount, ShardReplicationFactor, - distributionColumnType); - /* - * If there is a colocation group for the current configuration, get a - * colocated table from the group and use its shards as a reference to - * create new shards. Otherwise, create a new colocation group and create - * shards with the default round robin algorithm. - */ - if (colocationId != INVALID_COLOCATION_ID) +/* + * create_reference_table accepts a table and then it creates a distributed + * table which has one shard and replication factor is set to + * shard_replication_factor configuration value. + */ +Datum +create_reference_table(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + int shardCount = 1; + AttrNumber firstColumnAttrNumber = 1; + + char *firstColumnName = get_attname(relationId, firstColumnAttrNumber); + if (firstColumnName == NULL) { char *relationName = get_rel_name(relationId); - - Oid colocatedTableId = ColocatedTableId(colocationId); - ConvertToDistributedTable(relationId, distributionColumnText, - distributionMethodOid, colocationId); - - CreateColocatedShards(relationId, colocatedTableId); - ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", - relationName, colocationId))); - } - else - { - colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor, - distributionColumnType); - ConvertToDistributedTable(relationId, distributionColumnText, - distributionMethodOid, colocationId); - - /* use the default way to create shards */ - CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("reference table candidate %s needs to have at" + "least one column", relationName))); } - heap_close(pgDistColocation, NoLock); - relation_close(distributedRelation, NoLock); + CreateHashDistributedTable(relationId, firstColumnName, shardCount, + ShardReplicationFactor); PG_RETURN_VOID(); } @@ -194,16 +177,13 @@ create_distributed_table(PG_FUNCTION_ARGS) * table does not have collated columns. */ static void -ConvertToDistributedTable(Oid relationId, text *distributionColumnText, - Oid distributionMethodOid, uint32 colocationId) +ConvertToDistributedTable(Oid relationId, char *distributionColumnName, + char distributionMethod, uint32 colocationId) { Relation relation = NULL; TupleDesc relationDesc = NULL; char *relationName = NULL; char relationKind = 0; - - char distributionMethod = LookupDistributionMethod(distributionMethodOid); - char *distributionColumnName = text_to_cstring(distributionColumnText); Var *distributionColumn = NULL; /* @@ -784,3 +764,67 @@ GetNextColocationId() return colocationId; } + + +/* + * CreateHashDistributedTable creates a hash distributed table with given + * shard count and shard replication factor. + */ +static void +CreateHashDistributedTable(Oid relationId, char *distributionColumnName, + int shardCount, int replicationFactor) +{ + Relation distributedRelation = NULL; + Relation pgDistColocation = NULL; + Var *distributionColumn = NULL; + int distributionColumnType = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + /* get distribution column type */ + distributedRelation = relation_open(relationId, AccessShareLock); + distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation, + distributionColumnName); + distributionColumnType = distributionColumn->vartype; + + /* + * Get an exclusive lock on the colocation system catalog. Therefore, we + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. + */ + pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); + + /* check for existing colocations */ + colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); + + /* + * If there is a colocation group for the current configuration, get a + * colocated table from the group and use its shards as a reference to + * create new shards. Otherwise, create a new colocation group and create + * shards with the default round robin algorithm. + */ + if (colocationId != INVALID_COLOCATION_ID) + { + char *relationName = get_rel_name(relationId); + + Oid colocatedTableId = ColocatedTableId(colocationId); + ConvertToDistributedTable(relationId, distributionColumnName, + DISTRIBUTE_BY_HASH, colocationId); + + CreateColocatedShards(relationId, colocatedTableId); + ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d", + relationName, colocationId))); + } + else + { + colocationId = CreateColocationGroup(shardCount, replicationFactor, + distributionColumnType); + ConvertToDistributedTable(relationId, distributionColumnName, + DISTRIBUTE_BY_HASH, colocationId); + + /* use the default way to create shards */ + CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); + } + + heap_close(pgDistColocation, NoLock); + relation_close(distributedRelation, NoLock); +} diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index fdff3970c..2b269ea82 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -520,6 +520,20 @@ Table "schema_collocation.table4_groupe_1300052" id | integer | \c - - - :master_port +CREATE TABLE table1_groupF ( id int ); +SELECT create_reference_table('table1_groupF'); + create_reference_table +------------------------ + +(1 row) + +CREATE TABLE table2_groupF ( id int ); +SELECT create_reference_table('table2_groupF'); + create_reference_table +------------------------ + +(1 row) + -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -530,7 +544,8 @@ SELECT * FROM pg_dist_colocation 3 | 2 | 2 | 25 4 | 4 | 2 | 23 5 | 2 | 2 | 23 -(4 rows) + 6 | 1 | 2 | 23 +(5 rows) -- cross check with internal colocation API SELECT @@ -562,7 +577,8 @@ ORDER BY table2_groupe | table3_groupe | t table2_groupe | schema_collocation.table4_groupe | t table3_groupe | schema_collocation.table4_groupe | t -(12 rows) + table1_groupf | table2_groupf | t +(13 rows) -- check created shards SELECT @@ -637,5 +653,9 @@ ORDER BY schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1 schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647 schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647 -(52 rows) + table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647 + table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647 + table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647 +(56 rows) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ec7a73d5f..9c858fc6a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -37,6 +37,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-11'; +ALTER EXTENSION citus UPDATE TO '6.0-12'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 803280f5d..8f714c4e8 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -248,6 +248,12 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id'); \c - - - :master_port +CREATE TABLE table1_groupF ( id int ); +SELECT create_reference_table('table1_groupF'); + +CREATE TABLE table2_groupF ( id int ); +SELECT create_reference_table('table2_groupF'); + -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index a7aaa0eef..e04b7cca0 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -42,6 +42,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8'; ALTER EXTENSION citus UPDATE TO '6.0-9'; ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-11'; +ALTER EXTENSION citus UPDATE TO '6.0-12'; -- drop extension an re-create in newest version DROP EXTENSION citus;