mirror of https://github.com/citusdata/citus.git
Merge pull request #892 from citusdata/add_create_reference_table_udf
Add create_reference_table()pull/897/head
commit
2dcca0939b
|
@ -8,7 +8,7 @@ EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11
|
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -80,6 +80,8 @@ $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sq
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql
|
$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--6.0-12.sql: $(EXTENSION)--6.0-11.sql $(EXTENSION)--6.0-11--6.0-12.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
/* citus--6.0-11--6.0-12.sql */
|
||||||
|
|
||||||
|
SET search_path = 'pg_catalog';
|
||||||
|
|
||||||
|
CREATE FUNCTION create_reference_table(table_name regclass)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$create_reference_table$$;
|
||||||
|
COMMENT ON FUNCTION create_reference_table(table_name regclass)
|
||||||
|
IS 'create a distributed reference table';
|
||||||
|
|
||||||
|
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '6.0-11'
|
default_version = '6.0-12'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -55,9 +55,8 @@
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void ConvertToDistributedTable(Oid relationId,
|
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
text *distributionColumnText,
|
char distributionMethod, uint32 colocationId);
|
||||||
Oid distributionMethodOid, uint32 colocationId);
|
|
||||||
static char LookupDistributionMethod(Oid distributionMethodOid);
|
static char LookupDistributionMethod(Oid distributionMethodOid);
|
||||||
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
|
||||||
Node *distributionKey);
|
Node *distributionKey);
|
||||||
|
@ -74,11 +73,14 @@ static uint32 ColocationId(int shardCount, int replicationFactor,
|
||||||
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||||
Oid distributionColumnType);
|
Oid distributionColumnType);
|
||||||
static uint32 GetNextColocationId(void);
|
static uint32 GetNextColocationId(void);
|
||||||
|
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
int shardCount, int replicationFactor);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
||||||
PG_FUNCTION_INFO_V1(create_distributed_table);
|
PG_FUNCTION_INFO_V1(create_distributed_table);
|
||||||
|
PG_FUNCTION_INFO_V1(create_reference_table);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -95,8 +97,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
||||||
Oid distributionMethodOid = PG_GETARG_OID(2);
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
|
|
||||||
ConvertToDistributedTable(distributedRelationId, distributionColumnText,
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
distributionMethodOid, INVALID_COLOCATION_ID);
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
|
|
||||||
|
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
||||||
|
distributionMethod, INVALID_COLOCATION_ID);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -112,71 +118,48 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
text *distributionColumnText = PG_GETARG_TEXT_P(1);
|
||||||
Oid distributionMethodOid = PG_GETARG_OID(2);
|
Oid distributionMethodOid = PG_GETARG_OID(2);
|
||||||
|
|
||||||
Relation distributedRelation = NULL;
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
Relation pgDistColocation = NULL;
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
Var *distributionColumn = NULL;
|
|
||||||
char *distributionColumnName = NULL;
|
|
||||||
int distributionColumnType = 0;
|
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
|
||||||
|
|
||||||
/* if distribution method is not hash, just create partition metadata */
|
/* if distribution method is not hash, just create partition metadata */
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
|
||||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
ConvertToDistributedTable(relationId, distributionColumnText,
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
distributionMethodOid, INVALID_COLOCATION_ID);
|
distributionMethod, INVALID_COLOCATION_ID);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* get distribution column type */
|
/* use configuration values for shard count and shard replication factor*/
|
||||||
distributionColumnName = text_to_cstring(distributionColumnText);
|
CreateHashDistributedTable(relationId, distributionColumnName, ShardCount,
|
||||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
ShardReplicationFactor);
|
||||||
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
|
|
||||||
distributionColumnName);
|
|
||||||
distributionColumnType = distributionColumn->vartype;
|
|
||||||
|
|
||||||
/*
|
PG_RETURN_VOID();
|
||||||
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
}
|
||||||
* can be sure that there will no modifications on the colocation table
|
|
||||||
* until this transaction is committed.
|
|
||||||
|
/*
|
||||||
|
* create_reference_table accepts a table and then it creates a distributed
|
||||||
|
* table which has one shard and replication factor is set to
|
||||||
|
* shard_replication_factor configuration value.
|
||||||
*/
|
*/
|
||||||
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
Datum
|
||||||
|
create_reference_table(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
int shardCount = 1;
|
||||||
|
AttrNumber firstColumnAttrNumber = 1;
|
||||||
|
|
||||||
/* check for existing colocations */
|
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
|
||||||
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
|
if (firstColumnName == NULL)
|
||||||
distributionColumnType);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If there is a colocation group for the current configuration, get a
|
|
||||||
* colocated table from the group and use its shards as a reference to
|
|
||||||
* create new shards. Otherwise, create a new colocation group and create
|
|
||||||
* shards with the default round robin algorithm.
|
|
||||||
*/
|
|
||||||
if (colocationId != INVALID_COLOCATION_ID)
|
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
Oid colocatedTableId = ColocatedTableId(colocationId);
|
errmsg("reference table candidate %s needs to have at"
|
||||||
ConvertToDistributedTable(relationId, distributionColumnText,
|
"least one column", relationName)));
|
||||||
distributionMethodOid, colocationId);
|
|
||||||
|
|
||||||
CreateColocatedShards(relationId, colocatedTableId);
|
|
||||||
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
|
|
||||||
relationName, colocationId)));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
|
|
||||||
distributionColumnType);
|
|
||||||
ConvertToDistributedTable(relationId, distributionColumnText,
|
|
||||||
distributionMethodOid, colocationId);
|
|
||||||
|
|
||||||
/* use the default way to create shards */
|
|
||||||
CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
heap_close(pgDistColocation, NoLock);
|
CreateHashDistributedTable(relationId, firstColumnName, shardCount,
|
||||||
relation_close(distributedRelation, NoLock);
|
ShardReplicationFactor);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -194,16 +177,13 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
* table does not have collated columns.
|
* table does not have collated columns.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
|
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
Oid distributionMethodOid, uint32 colocationId)
|
char distributionMethod, uint32 colocationId)
|
||||||
{
|
{
|
||||||
Relation relation = NULL;
|
Relation relation = NULL;
|
||||||
TupleDesc relationDesc = NULL;
|
TupleDesc relationDesc = NULL;
|
||||||
char *relationName = NULL;
|
char *relationName = NULL;
|
||||||
char relationKind = 0;
|
char relationKind = 0;
|
||||||
|
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -784,3 +764,67 @@ GetNextColocationId()
|
||||||
|
|
||||||
return colocationId;
|
return colocationId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateHashDistributedTable creates a hash distributed table with given
|
||||||
|
* shard count and shard replication factor.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
|
int shardCount, int replicationFactor)
|
||||||
|
{
|
||||||
|
Relation distributedRelation = NULL;
|
||||||
|
Relation pgDistColocation = NULL;
|
||||||
|
Var *distributionColumn = NULL;
|
||||||
|
int distributionColumnType = 0;
|
||||||
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
|
/* get distribution column type */
|
||||||
|
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||||
|
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
|
||||||
|
distributionColumnName);
|
||||||
|
distributionColumnType = distributionColumn->vartype;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Get an exclusive lock on the colocation system catalog. Therefore, we
|
||||||
|
* can be sure that there will no modifications on the colocation table
|
||||||
|
* until this transaction is committed.
|
||||||
|
*/
|
||||||
|
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
|
||||||
|
|
||||||
|
/* check for existing colocations */
|
||||||
|
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If there is a colocation group for the current configuration, get a
|
||||||
|
* colocated table from the group and use its shards as a reference to
|
||||||
|
* create new shards. Otherwise, create a new colocation group and create
|
||||||
|
* shards with the default round robin algorithm.
|
||||||
|
*/
|
||||||
|
if (colocationId != INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
|
Oid colocatedTableId = ColocatedTableId(colocationId);
|
||||||
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
|
DISTRIBUTE_BY_HASH, colocationId);
|
||||||
|
|
||||||
|
CreateColocatedShards(relationId, colocatedTableId);
|
||||||
|
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
|
||||||
|
relationName, colocationId)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
colocationId = CreateColocationGroup(shardCount, replicationFactor,
|
||||||
|
distributionColumnType);
|
||||||
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
|
DISTRIBUTE_BY_HASH, colocationId);
|
||||||
|
|
||||||
|
/* use the default way to create shards */
|
||||||
|
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(pgDistColocation, NoLock);
|
||||||
|
relation_close(distributedRelation, NoLock);
|
||||||
|
}
|
||||||
|
|
|
@ -520,6 +520,20 @@ Table "schema_collocation.table4_groupe_1300052"
|
||||||
id | integer |
|
id | integer |
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
CREATE TABLE table1_groupF ( id int );
|
||||||
|
SELECT create_reference_table('table1_groupF');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE table2_groupF ( id int );
|
||||||
|
SELECT create_reference_table('table2_groupF');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- check metadata
|
-- check metadata
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
@ -530,7 +544,8 @@ SELECT * FROM pg_dist_colocation
|
||||||
3 | 2 | 2 | 25
|
3 | 2 | 2 | 25
|
||||||
4 | 4 | 2 | 23
|
4 | 4 | 2 | 23
|
||||||
5 | 2 | 2 | 23
|
5 | 2 | 2 | 23
|
||||||
(4 rows)
|
6 | 1 | 2 | 23
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
-- cross check with internal colocation API
|
-- cross check with internal colocation API
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -562,7 +577,8 @@ ORDER BY
|
||||||
table2_groupe | table3_groupe | t
|
table2_groupe | table3_groupe | t
|
||||||
table2_groupe | schema_collocation.table4_groupe | t
|
table2_groupe | schema_collocation.table4_groupe | t
|
||||||
table3_groupe | schema_collocation.table4_groupe | t
|
table3_groupe | schema_collocation.table4_groupe | t
|
||||||
(12 rows)
|
table1_groupf | table2_groupf | t
|
||||||
|
(13 rows)
|
||||||
|
|
||||||
-- check created shards
|
-- check created shards
|
||||||
SELECT
|
SELECT
|
||||||
|
@ -637,5 +653,9 @@ ORDER BY
|
||||||
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
|
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
|
||||||
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
|
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
|
||||||
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
|
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
|
||||||
(52 rows)
|
table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647
|
||||||
|
table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647
|
||||||
|
table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647
|
||||||
|
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
|
||||||
|
(56 rows)
|
||||||
|
|
||||||
|
|
|
@ -37,6 +37,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-9';
|
ALTER EXTENSION citus UPDATE TO '6.0-9';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-10';
|
ALTER EXTENSION citus UPDATE TO '6.0-10';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-11';
|
ALTER EXTENSION citus UPDATE TO '6.0-11';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
\c
|
\c
|
||||||
|
|
|
@ -248,6 +248,12 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
|
CREATE TABLE table1_groupF ( id int );
|
||||||
|
SELECT create_reference_table('table1_groupF');
|
||||||
|
|
||||||
|
CREATE TABLE table2_groupF ( id int );
|
||||||
|
SELECT create_reference_table('table2_groupF');
|
||||||
|
|
||||||
-- check metadata
|
-- check metadata
|
||||||
SELECT * FROM pg_dist_colocation
|
SELECT * FROM pg_dist_colocation
|
||||||
WHERE colocationid >= 1 AND colocationid < 1000
|
WHERE colocationid >= 1 AND colocationid < 1000
|
||||||
|
|
|
@ -42,6 +42,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-9';
|
ALTER EXTENSION citus UPDATE TO '6.0-9';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-10';
|
ALTER EXTENSION citus UPDATE TO '6.0-10';
|
||||||
ALTER EXTENSION citus UPDATE TO '6.0-11';
|
ALTER EXTENSION citus UPDATE TO '6.0-11';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '6.0-12';
|
||||||
|
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
|
|
Loading…
Reference in New Issue