diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index afac591a8..f109cdbd5 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -1072,7 +1072,7 @@ CreateDistributedTableLike(TableConversionState *con) } char partitionMethod = PartitionMethod(con->relationId); CreateDistributedTable(con->newRelationId, newDistributionKey, partitionMethod, - newShardCount, newColocateWith, false); + newShardCount, true, newColocateWith, false); } @@ -1089,7 +1089,7 @@ CreateCitusTableLike(TableConversionState *con) } else if (IsCitusTableType(con->relationId, REFERENCE_TABLE)) { - CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, + CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false, NULL, false); } else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE)) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 8d424a04f..dcdd292db 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -57,6 +57,7 @@ #include "distributed/reference_table_utils.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" +#include "distributed/shared_library_init.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "distributed/version_compat.h" @@ -96,7 +97,8 @@ static void CreateHashDistributedTableShards(Oid relationId, int shardCount, Oid colocatedTableId, bool localTableEmpty); static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, char distributionMethod, char replicationModel, - int shardCount, char *colocateWithTableName, + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName, bool viaDeprecatedAPI); static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, char distributionMethod, uint32 colocationId, @@ -175,7 +177,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = LookupDistributionMethod(distributionMethodOid); CreateDistributedTable(relationId, distributionColumn, distributionMethod, - ShardCount, colocateWithTableName, viaDeprecatedAPI); + ShardCount, false, colocateWithTableName, viaDeprecatedAPI); relation_close(relation, NoLock); @@ -191,12 +193,37 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { + if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3)) + { + PG_RETURN_VOID(); + } bool viaDeprecatedAPI = false; Oid relationId = PG_GETARG_OID(0); text *distributionColumnText = PG_GETARG_TEXT_P(1); Oid distributionMethodOid = PG_GETARG_OID(2); text *colocateWithTableNameText = PG_GETARG_TEXT_P(3); + char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); + + bool shardCountIsStrict = false; + int shardCount = ShardCount; + if (!PG_ARGISNULL(4)) + { + if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 && + pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0) + { + ereport(ERROR, (errmsg("Cannot use colocate_with with a table " + "and shard_count at the same time"))); + } + + shardCount = PG_GETARG_INT32(4); + + /* + * if shard_count parameter is given than we have to + * make sure table has that many shards + */ + shardCountIsStrict = true; + } CheckCitusVersion(ERROR); @@ -225,10 +252,16 @@ create_distributed_table(PG_FUNCTION_ARGS) Assert(distributionColumn != NULL); char distributionMethod = LookupDistributionMethod(distributionMethodOid); - char *colocateWithTableName = text_to_cstring(colocateWithTableNameText); + if (shardCount < 1 || shardCount > MAX_SHARD_COUNT) + { + ereport(ERROR, (errmsg("%d is outside the valid range for " + "parameter \"shard_count\" (1 .. %d)", + shardCount, MAX_SHARD_COUNT))); + } CreateDistributedTable(relationId, distributionColumn, distributionMethod, - ShardCount, colocateWithTableName, viaDeprecatedAPI); + shardCount, shardCountIsStrict, colocateWithTableName, + viaDeprecatedAPI); PG_RETURN_VOID(); } @@ -284,7 +317,7 @@ create_reference_table(PG_FUNCTION_ARGS) } CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE, - ShardCount, colocateWithTableName, viaDeprecatedAPI); + ShardCount, false, colocateWithTableName, viaDeprecatedAPI); PG_RETURN_VOID(); } @@ -341,7 +374,8 @@ EnsureRelationExists(Oid relationId) */ void CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, - int shardCount, char *colocateWithTableName, bool viaDeprecatedAPI) + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName, bool viaDeprecatedAPI) { /* * EnsureTableNotDistributed errors out when relation is a citus table but @@ -416,7 +450,8 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio */ uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn, distributionMethod, replicationModel, - shardCount, colocateWithTableName, + shardCount, shardCountIsStrict, + colocateWithTableName, viaDeprecatedAPI); EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod, @@ -489,7 +524,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio foreach_oid(partitionRelationId, partitionList) { CreateDistributedTable(partitionRelationId, distributionColumn, - distributionMethod, shardCount, + distributionMethod, shardCount, false, colocateWithTableName, viaDeprecatedAPI); } } @@ -694,8 +729,8 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount, static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn, char distributionMethod, char replicationModel, - int shardCount, char *colocateWithTableName, - bool viaDeprecatedAPI) + int shardCount, bool shardCountIsStrict, + char *colocateWithTableName, bool viaDeprecatedAPI) { uint32 colocationId = INVALID_COLOCATION_ID; @@ -742,6 +777,27 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn, distributionColumnType, distributionColumnCollation); + /* + * if the shardCount is strict then we check if the shard count + * of the colocated table is actually shardCount + */ + if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID) + { + Oid colocatedTableId = ColocatedTableId(colocationId); + + if (colocatedTableId != InvalidOid) + { + CitusTableCacheEntry *cacheEntry = + GetCitusTableCacheEntry(colocatedTableId); + int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength; + + if (colocatedTableShardCount != shardCount) + { + colocationId = INVALID_COLOCATION_ID; + } + } + } + if (colocationId == INVALID_COLOCATION_ID) { colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor, diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index a0f5cb09b..41d9fc3ba 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -323,7 +323,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const bool viaDeprecatedAPI = false; CreateDistributedTable(relationId, parentDistributionColumn, - parentDistributionMethod, ShardCount, + parentDistributionMethod, ShardCount, false, parentRelationName, viaDeprecatedAPI); } } @@ -397,7 +397,7 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement, bool viaDeprecatedAPI = false; CreateDistributedTable(partitionRelationId, distributionColumn, - distributionMethod, ShardCount, + distributionMethod, ShardCount, false, parentRelationName, viaDeprecatedAPI); } } diff --git a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql index 867041e18..6784e6950 100644 --- a/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-3--10.1-1.sql @@ -1,3 +1,4 @@ -- citus--10.0-3--10.1-1 #include "../../columnar/sql/columnar--10.0-3--10.1-1.sql" +#include "udfs/create_distributed_table/10.1-1.sql"; diff --git a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql index 9612cf9d4..c4949049e 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.1-1--10.0-3.sql @@ -1,3 +1,17 @@ -- citus--10.1-1--10.0-2 #include "../../../columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql" + +DROP FUNCTION pg_catalog.create_distributed_table(regclass, text, citus.distribution_type, text, int); +CREATE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default') + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text) + IS 'creates a distributed table'; diff --git a/src/backend/distributed/sql/udfs/create_distributed_table/10.1-1.sql b/src/backend/distributed/sql/udfs/create_distributed_table/10.1-1.sql new file mode 100644 index 000000000..dacb6bd93 --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_distributed_table/10.1-1.sql @@ -0,0 +1,15 @@ +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text); +CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default', + shard_count int DEFAULT NULL) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text, + shard_count int) + IS 'creates a distributed table'; diff --git a/src/backend/distributed/sql/udfs/create_distributed_table/latest.sql b/src/backend/distributed/sql/udfs/create_distributed_table/latest.sql new file mode 100644 index 000000000..dacb6bd93 --- /dev/null +++ b/src/backend/distributed/sql/udfs/create_distributed_table/latest.sql @@ -0,0 +1,15 @@ +DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text); +CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type DEFAULT 'hash', + colocate_with text DEFAULT 'default', + shard_count int DEFAULT NULL) + RETURNS void + LANGUAGE C + AS 'MODULE_PATHNAME', $$create_distributed_table$$; +COMMENT ON FUNCTION create_distributed_table(table_name regclass, + distribution_column text, + distribution_type citus.distribution_type, + colocate_with text, + shard_count int) + IS 'creates a distributed table'; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 4b453966f..e86dc2f73 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -226,7 +226,8 @@ extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void DeleteShardPlacementRow(uint64 placementId); extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, int shardCount, - char *colocateWithTableName, bool viaDeprecatedAPI); + bool shardCountIsStrict, char *colocateWithTableName, + bool viaDeprecatedAPI); extern void CreateTruncateTrigger(Oid relationId); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index ba527c975..dddef65eb 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -339,3 +339,71 @@ SELECT create_distributed_table('pg_class', 'relname'); ERROR: cannot create a citus table from a catalog table SELECT create_reference_table('pg_class'); ERROR: cannot create a citus table from a catalog table +-- test shard_count parameter +-- first set citus.shard_count so we know the parameter works +SET citus.shard_count TO 10; +CREATE TABLE shard_count_table (a INT, b TEXT); +CREATE TABLE shard_count_table_2 (a INT, b TEXT); +SELECT create_distributed_table('shard_count_table', 'a', shard_count:=5); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT shard_count FROM citus_tables WHERE table_name::text = 'shard_count_table'; + shard_count +--------------------------------------------------------------------- + 5 +(1 row) + +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=0); +ERROR: 0 is outside the valid range for parameter "shard_count" (1 .. 64000) +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100); +ERROR: -100 is outside the valid range for parameter "shard_count" (1 .. 64000) +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001); +ERROR: 64001 is outside the valid range for parameter "shard_count" (1 .. 64000) +-- shard count with colocate with table should error +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count'); +ERROR: Cannot use colocate_with with a table and shard_count at the same time +-- none should not error +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE shard_count_table, shard_count_table_2; +-- test a shard count with an empty default colocation group +-- ensure there is no colocation group with 13 shards +SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 13; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.shard_count TO 13; +CREATE TABLE shard_count_drop_table (a int); +SELECT create_distributed_table('shard_count_drop_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +DROP TABLE shard_count_drop_table; +CREATE TABLE shard_count_table_3 (a int); +SELECT create_distributed_table('shard_count_table_3', 'a', shard_count:=13); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT shardcount FROM pg_dist_colocation WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_3'::regclass +); + shardcount +--------------------------------------------------------------------- + 13 +(1 row) + +DROP TABLE shard_count_table_3; diff --git a/src/test/regress/expected/multi_create_table_superuser.out b/src/test/regress/expected/multi_create_table_superuser.out index 5e8927546..ff34e8ef3 100644 --- a/src/test/regress/expected/multi_create_table_superuser.out +++ b/src/test/regress/expected/multi_create_table_superuser.out @@ -848,3 +848,41 @@ DROP SCHEMA sc5 CASCADE; NOTICE: drop cascades to table sc5.alter_replica_table DROP SCHEMA sc6 CASCADE; NOTICE: drop cascades to table sc6.alter_replica_table +CREATE TABLE shard_col_table (a INT, b INT); +CREATE TABLE shard_col_table_2 (a INT, b INT); +SELECT create_distributed_table('shard_col_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- ensure there are no colocation group with 11 shards +SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 11; + count +--------------------------------------------------------------------- + 0 +(1 row) + +UPDATE pg_dist_colocation SET shardcount = 11 WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass +); +SELECT create_distributed_table('shard_col_table_2', 'a', shard_count:=11); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- ensure shard_col_table and shard_col_table_2 are not colocated +SELECT a.colocation_id = b.colocation_id FROM citus_tables a, citus_tables b + WHERE a.table_name = 'shard_col_table'::regclass AND b.table_name = 'shard_col_table_2'::regclass; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +UPDATE pg_dist_colocation SET shardcount = 12 WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass +); +DROP TABLE shard_col_table, shard_col_table_2; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index e51bdacf6..72256f545 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -562,7 +562,9 @@ ALTER EXTENSION citus UPDATE TO '10.1-1'; SELECT * FROM print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- -(0 rows) + function create_distributed_table(regclass,text,citus.distribution_type,text) | + | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) +(2 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index d25d2858e..d8f3a6998 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -106,7 +106,7 @@ ORDER BY 1; function coord_combine_agg_ffunc(internal,oid,cstring,anyelement) function coord_combine_agg_sfunc(internal,oid,cstring,anyelement) function create_distributed_function(regprocedure,text,text) - function create_distributed_table(regclass,text,citus.distribution_type,text) + function create_distributed_table(regclass,text,citus.distribution_type,text,integer) function create_intermediate_result(text,text) function create_reference_table(regclass) function distributed_tables_colocated(regclass,regclass) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index 6df1b6a71..d69060a7d 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -102,7 +102,7 @@ ORDER BY 1; function coord_combine_agg_ffunc(internal,oid,cstring,anyelement) function coord_combine_agg_sfunc(internal,oid,cstring,anyelement) function create_distributed_function(regprocedure,text,text) - function create_distributed_table(regclass,text,citus.distribution_type,text) + function create_distributed_table(regclass,text,citus.distribution_type,text,integer) function create_intermediate_result(text,text) function create_reference_table(regclass) function distributed_tables_colocated(regclass,regclass) diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 974bc0696..b347987eb 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -224,3 +224,45 @@ END; -- distributing catalog tables is not supported SELECT create_distributed_table('pg_class', 'relname'); SELECT create_reference_table('pg_class'); + + +-- test shard_count parameter +-- first set citus.shard_count so we know the parameter works +SET citus.shard_count TO 10; + +CREATE TABLE shard_count_table (a INT, b TEXT); +CREATE TABLE shard_count_table_2 (a INT, b TEXT); + +SELECT create_distributed_table('shard_count_table', 'a', shard_count:=5); +SELECT shard_count FROM citus_tables WHERE table_name::text = 'shard_count_table'; + +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=0); +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100); +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001); + +-- shard count with colocate with table should error +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count'); + +-- none should not error +SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none'); + +DROP TABLE shard_count_table, shard_count_table_2; + +-- test a shard count with an empty default colocation group +-- ensure there is no colocation group with 13 shards +SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 13; +SET citus.shard_count TO 13; + +CREATE TABLE shard_count_drop_table (a int); +SELECT create_distributed_table('shard_count_drop_table', 'a'); +DROP TABLE shard_count_drop_table; + +CREATE TABLE shard_count_table_3 (a int); +SELECT create_distributed_table('shard_count_table_3', 'a', shard_count:=13); + +SELECT shardcount FROM pg_dist_colocation WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_3'::regclass +); + +DROP TABLE shard_count_table_3; diff --git a/src/test/regress/sql/multi_create_table_superuser.sql b/src/test/regress/sql/multi_create_table_superuser.sql index ce59f25e5..57a32878c 100644 --- a/src/test/regress/sql/multi_create_table_superuser.sql +++ b/src/test/regress/sql/multi_create_table_superuser.sql @@ -436,3 +436,28 @@ DROP SCHEMA sc3 CASCADE; DROP SCHEMA sc4 CASCADE; DROP SCHEMA sc5 CASCADE; DROP SCHEMA sc6 CASCADE; + +CREATE TABLE shard_col_table (a INT, b INT); +CREATE TABLE shard_col_table_2 (a INT, b INT); + +SELECT create_distributed_table('shard_col_table', 'a'); + +-- ensure there are no colocation group with 11 shards +SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 11; +UPDATE pg_dist_colocation SET shardcount = 11 WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass +); + +SELECT create_distributed_table('shard_col_table_2', 'a', shard_count:=11); + +-- ensure shard_col_table and shard_col_table_2 are not colocated +SELECT a.colocation_id = b.colocation_id FROM citus_tables a, citus_tables b + WHERE a.table_name = 'shard_col_table'::regclass AND b.table_name = 'shard_col_table_2'::regclass; + +UPDATE pg_dist_colocation SET shardcount = 12 WHERE colocationid IN +( + SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass +); + +DROP TABLE shard_col_table, shard_col_table_2;