Adds shard_count parameter to create_distributed_table

pull/4827/head
Halil Ozan Akgul 2021-03-17 09:05:18 +03:00
parent 76a1ddac94
commit a5038046f9
15 changed files with 295 additions and 18 deletions

View File

@ -1072,7 +1072,7 @@ CreateDistributedTableLike(TableConversionState *con)
}
char partitionMethod = PartitionMethod(con->relationId);
CreateDistributedTable(con->newRelationId, newDistributionKey, partitionMethod,
newShardCount, newColocateWith, false);
newShardCount, true, newColocateWith, false);
}
@ -1089,7 +1089,7 @@ CreateCitusTableLike(TableConversionState *con)
}
else if (IsCitusTableType(con->relationId, REFERENCE_TABLE))
{
CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0,
CreateDistributedTable(con->newRelationId, NULL, DISTRIBUTE_BY_NONE, 0, false,
NULL, false);
}
else if (IsCitusTableType(con->relationId, CITUS_LOCAL_TABLE))

View File

@ -57,6 +57,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/shared_library_init.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "distributed/version_compat.h"
@ -96,7 +97,8 @@ static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static uint32 ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char replicationModel,
int shardCount, char *colocateWithTableName,
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName,
bool viaDeprecatedAPI);
static void EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
char distributionMethod, uint32 colocationId,
@ -175,7 +177,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
ShardCount, colocateWithTableName, viaDeprecatedAPI);
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
relation_close(relation, NoLock);
@ -191,12 +193,37 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
Datum
create_distributed_table(PG_FUNCTION_ARGS)
{
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2) || PG_ARGISNULL(3))
{
PG_RETURN_VOID();
}
bool viaDeprecatedAPI = false;
Oid relationId = PG_GETARG_OID(0);
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
text *colocateWithTableNameText = PG_GETARG_TEXT_P(3);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
bool shardCountIsStrict = false;
int shardCount = ShardCount;
if (!PG_ARGISNULL(4))
{
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0 &&
pg_strncasecmp(colocateWithTableName, "none", NAMEDATALEN) != 0)
{
ereport(ERROR, (errmsg("Cannot use colocate_with with a table "
"and shard_count at the same time")));
}
shardCount = PG_GETARG_INT32(4);
/*
* if shard_count parameter is given than we have to
* make sure table has that many shards
*/
shardCountIsStrict = true;
}
CheckCitusVersion(ERROR);
@ -225,10 +252,16 @@ create_distributed_table(PG_FUNCTION_ARGS)
Assert(distributionColumn != NULL);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
char *colocateWithTableName = text_to_cstring(colocateWithTableNameText);
if (shardCount < 1 || shardCount > MAX_SHARD_COUNT)
{
ereport(ERROR, (errmsg("%d is outside the valid range for "
"parameter \"shard_count\" (1 .. %d)",
shardCount, MAX_SHARD_COUNT)));
}
CreateDistributedTable(relationId, distributionColumn, distributionMethod,
ShardCount, colocateWithTableName, viaDeprecatedAPI);
shardCount, shardCountIsStrict, colocateWithTableName,
viaDeprecatedAPI);
PG_RETURN_VOID();
}
@ -284,7 +317,7 @@ create_reference_table(PG_FUNCTION_ARGS)
}
CreateDistributedTable(relationId, distributionColumn, DISTRIBUTE_BY_NONE,
ShardCount, colocateWithTableName, viaDeprecatedAPI);
ShardCount, false, colocateWithTableName, viaDeprecatedAPI);
PG_RETURN_VOID();
}
@ -341,7 +374,8 @@ EnsureRelationExists(Oid relationId)
*/
void
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
int shardCount, char *colocateWithTableName, bool viaDeprecatedAPI)
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName, bool viaDeprecatedAPI)
{
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
@ -416,7 +450,8 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
*/
uint32 colocationId = ColocationIdForNewTable(relationId, distributionColumn,
distributionMethod, replicationModel,
shardCount, colocateWithTableName,
shardCount, shardCountIsStrict,
colocateWithTableName,
viaDeprecatedAPI);
EnsureRelationCanBeDistributed(relationId, distributionColumn, distributionMethod,
@ -489,7 +524,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
foreach_oid(partitionRelationId, partitionList)
{
CreateDistributedTable(partitionRelationId, distributionColumn,
distributionMethod, shardCount,
distributionMethod, shardCount, false,
colocateWithTableName, viaDeprecatedAPI);
}
}
@ -694,8 +729,8 @@ CreateHashDistributedTableShards(Oid relationId, int shardCount,
static uint32
ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char replicationModel,
int shardCount, char *colocateWithTableName,
bool viaDeprecatedAPI)
int shardCount, bool shardCountIsStrict,
char *colocateWithTableName, bool viaDeprecatedAPI)
{
uint32 colocationId = INVALID_COLOCATION_ID;
@ -742,6 +777,27 @@ ColocationIdForNewTable(Oid relationId, Var *distributionColumn,
distributionColumnType,
distributionColumnCollation);
/*
* if the shardCount is strict then we check if the shard count
* of the colocated table is actually shardCount
*/
if (shardCountIsStrict && colocationId != INVALID_COLOCATION_ID)
{
Oid colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
CitusTableCacheEntry *cacheEntry =
GetCitusTableCacheEntry(colocatedTableId);
int colocatedTableShardCount = cacheEntry->shardIntervalArrayLength;
if (colocatedTableShardCount != shardCount)
{
colocationId = INVALID_COLOCATION_ID;
}
}
}
if (colocationId == INVALID_COLOCATION_ID)
{
colocationId = CreateColocationGroup(shardCount, ShardReplicationFactor,

View File

@ -323,7 +323,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
bool viaDeprecatedAPI = false;
CreateDistributedTable(relationId, parentDistributionColumn,
parentDistributionMethod, ShardCount,
parentDistributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}
}
@ -397,7 +397,7 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
bool viaDeprecatedAPI = false;
CreateDistributedTable(partitionRelationId, distributionColumn,
distributionMethod, ShardCount,
distributionMethod, ShardCount, false,
parentRelationName, viaDeprecatedAPI);
}
}

View File

@ -1,3 +1,4 @@
-- citus--10.0-3--10.1-1
#include "../../columnar/sql/columnar--10.0-3--10.1-1.sql"
#include "udfs/create_distributed_table/10.1-1.sql";

View File

@ -1,3 +1,17 @@
-- citus--10.1-1--10.0-2
#include "../../../columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql"
DROP FUNCTION pg_catalog.create_distributed_table(regclass, text, citus.distribution_type, text, int);
CREATE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text)
IS 'creates a distributed table';

View File

@ -0,0 +1,15 @@
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text);
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text,
shard_count int)
IS 'creates a distributed table';

View File

@ -0,0 +1,15 @@
DROP FUNCTION create_distributed_table(regclass, text, citus.distribution_type, text);
CREATE OR REPLACE FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type DEFAULT 'hash',
colocate_with text DEFAULT 'default',
shard_count int DEFAULT NULL)
RETURNS void
LANGUAGE C
AS 'MODULE_PATHNAME', $$create_distributed_table$$;
COMMENT ON FUNCTION create_distributed_table(table_name regclass,
distribution_column text,
distribution_type citus.distribution_type,
colocate_with text,
shard_count int)
IS 'creates a distributed table';

View File

@ -226,7 +226,8 @@ extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
char distributionMethod, int shardCount,
char *colocateWithTableName, bool viaDeprecatedAPI);
bool shardCountIsStrict, char *colocateWithTableName,
bool viaDeprecatedAPI);
extern void CreateTruncateTrigger(Oid relationId);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);

View File

@ -339,3 +339,71 @@ SELECT create_distributed_table('pg_class', 'relname');
ERROR: cannot create a citus table from a catalog table
SELECT create_reference_table('pg_class');
ERROR: cannot create a citus table from a catalog table
-- test shard_count parameter
-- first set citus.shard_count so we know the parameter works
SET citus.shard_count TO 10;
CREATE TABLE shard_count_table (a INT, b TEXT);
CREATE TABLE shard_count_table_2 (a INT, b TEXT);
SELECT create_distributed_table('shard_count_table', 'a', shard_count:=5);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shard_count FROM citus_tables WHERE table_name::text = 'shard_count_table';
shard_count
---------------------------------------------------------------------
5
(1 row)
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=0);
ERROR: 0 is outside the valid range for parameter "shard_count" (1 .. 64000)
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100);
ERROR: -100 is outside the valid range for parameter "shard_count" (1 .. 64000)
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
ERROR: 64001 is outside the valid range for parameter "shard_count" (1 .. 64000)
-- shard count with colocate with table should error
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count');
ERROR: Cannot use colocate_with with a table and shard_count at the same time
-- none should not error
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE shard_count_table, shard_count_table_2;
-- test a shard count with an empty default colocation group
-- ensure there is no colocation group with 13 shards
SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 13;
count
---------------------------------------------------------------------
0
(1 row)
SET citus.shard_count TO 13;
CREATE TABLE shard_count_drop_table (a int);
SELECT create_distributed_table('shard_count_drop_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
DROP TABLE shard_count_drop_table;
CREATE TABLE shard_count_table_3 (a int);
SELECT create_distributed_table('shard_count_table_3', 'a', shard_count:=13);
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT shardcount FROM pg_dist_colocation WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_3'::regclass
);
shardcount
---------------------------------------------------------------------
13
(1 row)
DROP TABLE shard_count_table_3;

View File

@ -848,3 +848,41 @@ DROP SCHEMA sc5 CASCADE;
NOTICE: drop cascades to table sc5.alter_replica_table
DROP SCHEMA sc6 CASCADE;
NOTICE: drop cascades to table sc6.alter_replica_table
CREATE TABLE shard_col_table (a INT, b INT);
CREATE TABLE shard_col_table_2 (a INT, b INT);
SELECT create_distributed_table('shard_col_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- ensure there are no colocation group with 11 shards
SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 11;
count
---------------------------------------------------------------------
0
(1 row)
UPDATE pg_dist_colocation SET shardcount = 11 WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass
);
SELECT create_distributed_table('shard_col_table_2', 'a', shard_count:=11);
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- ensure shard_col_table and shard_col_table_2 are not colocated
SELECT a.colocation_id = b.colocation_id FROM citus_tables a, citus_tables b
WHERE a.table_name = 'shard_col_table'::regclass AND b.table_name = 'shard_col_table_2'::regclass;
?column?
---------------------------------------------------------------------
f
(1 row)
UPDATE pg_dist_colocation SET shardcount = 12 WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass
);
DROP TABLE shard_col_table, shard_col_table_2;

View File

@ -562,7 +562,9 @@ ALTER EXTENSION citus UPDATE TO '10.1-1';
SELECT * FROM print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
function create_distributed_table(regclass,text,citus.distribution_type,text) |
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
(2 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -106,7 +106,7 @@ ORDER BY 1;
function coord_combine_agg_ffunc(internal,oid,cstring,anyelement)
function coord_combine_agg_sfunc(internal,oid,cstring,anyelement)
function create_distributed_function(regprocedure,text,text)
function create_distributed_table(regclass,text,citus.distribution_type,text)
function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
function create_intermediate_result(text,text)
function create_reference_table(regclass)
function distributed_tables_colocated(regclass,regclass)

View File

@ -102,7 +102,7 @@ ORDER BY 1;
function coord_combine_agg_ffunc(internal,oid,cstring,anyelement)
function coord_combine_agg_sfunc(internal,oid,cstring,anyelement)
function create_distributed_function(regprocedure,text,text)
function create_distributed_table(regclass,text,citus.distribution_type,text)
function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
function create_intermediate_result(text,text)
function create_reference_table(regclass)
function distributed_tables_colocated(regclass,regclass)

View File

@ -224,3 +224,45 @@ END;
-- distributing catalog tables is not supported
SELECT create_distributed_table('pg_class', 'relname');
SELECT create_reference_table('pg_class');
-- test shard_count parameter
-- first set citus.shard_count so we know the parameter works
SET citus.shard_count TO 10;
CREATE TABLE shard_count_table (a INT, b TEXT);
CREATE TABLE shard_count_table_2 (a INT, b TEXT);
SELECT create_distributed_table('shard_count_table', 'a', shard_count:=5);
SELECT shard_count FROM citus_tables WHERE table_name::text = 'shard_count_table';
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=0);
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=-100);
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=64001);
-- shard count with colocate with table should error
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='shard_count');
-- none should not error
SELECT create_distributed_table('shard_count_table_2', 'a', shard_count:=12, colocate_with:='none');
DROP TABLE shard_count_table, shard_count_table_2;
-- test a shard count with an empty default colocation group
-- ensure there is no colocation group with 13 shards
SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 13;
SET citus.shard_count TO 13;
CREATE TABLE shard_count_drop_table (a int);
SELECT create_distributed_table('shard_count_drop_table', 'a');
DROP TABLE shard_count_drop_table;
CREATE TABLE shard_count_table_3 (a int);
SELECT create_distributed_table('shard_count_table_3', 'a', shard_count:=13);
SELECT shardcount FROM pg_dist_colocation WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_count_table_3'::regclass
);
DROP TABLE shard_count_table_3;

View File

@ -436,3 +436,28 @@ DROP SCHEMA sc3 CASCADE;
DROP SCHEMA sc4 CASCADE;
DROP SCHEMA sc5 CASCADE;
DROP SCHEMA sc6 CASCADE;
CREATE TABLE shard_col_table (a INT, b INT);
CREATE TABLE shard_col_table_2 (a INT, b INT);
SELECT create_distributed_table('shard_col_table', 'a');
-- ensure there are no colocation group with 11 shards
SELECT count(*) FROM pg_dist_colocation WHERE shardcount = 11;
UPDATE pg_dist_colocation SET shardcount = 11 WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass
);
SELECT create_distributed_table('shard_col_table_2', 'a', shard_count:=11);
-- ensure shard_col_table and shard_col_table_2 are not colocated
SELECT a.colocation_id = b.colocation_id FROM citus_tables a, citus_tables b
WHERE a.table_name = 'shard_col_table'::regclass AND b.table_name = 'shard_col_table_2'::regclass;
UPDATE pg_dist_colocation SET shardcount = 12 WHERE colocationid IN
(
SELECT colocation_id FROM citus_tables WHERE table_name = 'shard_col_table'::regclass
);
DROP TABLE shard_col_table, shard_col_table_2;