Add create_reference_table()

create_reference_table() creates a hash distributed table with shard count
equals to 1 and replication factor equals to shard_replication_factor
configuration value.
pull/892/head
Metin Doslu 2016-10-19 18:20:52 +03:00
parent b6a9b61d32
commit 405335fcee
8 changed files with 152 additions and 66 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -80,6 +80,8 @@ $(EXTENSION)--6.0-10.sql: $(EXTENSION)--6.0-9.sql $(EXTENSION)--6.0-9--6.0-10.sq
cat $^ > $@
$(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.sql
cat $^ > $@
$(EXTENSION)--6.0-12.sql: $(EXTENSION)--6.0-11.sql $(EXTENSION)--6.0-11--6.0-12.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,12 @@
/* citus--6.0-11--6.0-12.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION create_reference_table(table_name regclass)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$create_reference_table$$;
COMMENT ON FUNCTION create_reference_table(table_name regclass)
IS 'create a distributed reference table';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.0-11'
default_version = '6.0-12'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -55,9 +55,8 @@
/* local function forward declarations */
static void ConvertToDistributedTable(Oid relationId,
text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId);
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, uint32 colocationId);
static char LookupDistributionMethod(Oid distributionMethodOid);
static void RecordDistributedRelationDependencies(Oid distributedRelationId,
Node *distributionKey);
@ -74,11 +73,14 @@ static uint32 ColocationId(int shardCount, int replicationFactor,
static uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
int shardCount, int replicationFactor);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table);
PG_FUNCTION_INFO_V1(create_reference_table);
/*
@ -95,8 +97,12 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
ConvertToDistributedTable(distributedRelationId, distributionColumnText,
distributionMethodOid, INVALID_COLOCATION_ID);
char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}
@ -112,71 +118,48 @@ create_distributed_table(PG_FUNCTION_ARGS)
text *distributionColumnText = PG_GETARG_TEXT_P(1);
Oid distributionMethodOid = PG_GETARG_OID(2);
Relation distributedRelation = NULL;
Relation pgDistColocation = NULL;
Var *distributionColumn = NULL;
char *distributionColumnName = NULL;
int distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
/* if distribution method is not hash, just create partition metadata */
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, INVALID_COLOCATION_ID);
ConvertToDistributedTable(relationId, distributionColumnName,
distributionMethod, INVALID_COLOCATION_ID);
PG_RETURN_VOID();
}
/* get distribution column type */
distributionColumnName = text_to_cstring(distributionColumnText);
distributedRelation = relation_open(relationId, AccessShareLock);
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumnType = distributionColumn->vartype;
/* use configuration values for shard count and shard replication factor*/
CreateHashDistributedTable(relationId, distributionColumnName, ShardCount,
ShardReplicationFactor);
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
PG_RETURN_VOID();
}
/*
* create_reference_table accepts a table and then it creates a distributed
* table which has one shard and replication factor is set to
* shard_replication_factor configuration value.
*/
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
Datum
create_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
int shardCount = 1;
AttrNumber firstColumnAttrNumber = 1;
/* check for existing colocations */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionColumnType);
/*
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
char *firstColumnName = get_attname(relationId, firstColumnAttrNumber);
if (firstColumnName == NULL)
{
char *relationName = get_rel_name(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
CreateColocatedShards(relationId, colocatedTableId);
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
relationName, colocationId)));
}
else
{
colocationId = CreateColocationGroup(ShardCount, ShardReplicationFactor,
distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnText,
distributionMethodOid, colocationId);
/* use the default way to create shards */
CreateShardsWithRoundRobinPolicy(relationId, ShardCount, ShardReplicationFactor);
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("reference table candidate %s needs to have at"
"least one column", relationName)));
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
CreateHashDistributedTable(relationId, firstColumnName, shardCount,
ShardReplicationFactor);
PG_RETURN_VOID();
}
@ -194,16 +177,13 @@ create_distributed_table(PG_FUNCTION_ARGS)
* table does not have collated columns.
*/
static void
ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
Oid distributionMethodOid, uint32 colocationId)
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, uint32 colocationId)
{
Relation relation = NULL;
TupleDesc relationDesc = NULL;
char *relationName = NULL;
char relationKind = 0;
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
char *distributionColumnName = text_to_cstring(distributionColumnText);
Var *distributionColumn = NULL;
/*
@ -784,3 +764,67 @@ GetNextColocationId()
return colocationId;
}
/*
* CreateHashDistributedTable creates a hash distributed table with given
* shard count and shard replication factor.
*/
static void
CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
int shardCount, int replicationFactor)
{
Relation distributedRelation = NULL;
Relation pgDistColocation = NULL;
Var *distributionColumn = NULL;
int distributionColumnType = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
/* get distribution column type */
distributedRelation = relation_open(relationId, AccessShareLock);
distributionColumn = BuildDistributionKeyFromColumnName(distributedRelation,
distributionColumnName);
distributionColumnType = distributionColumn->vartype;
/*
* Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the colocation table
* until this transaction is committed.
*/
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
/* check for existing colocations */
colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType);
/*
* If there is a colocation group for the current configuration, get a
* colocated table from the group and use its shards as a reference to
* create new shards. Otherwise, create a new colocation group and create
* shards with the default round robin algorithm.
*/
if (colocationId != INVALID_COLOCATION_ID)
{
char *relationName = get_rel_name(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_HASH, colocationId);
CreateColocatedShards(relationId, colocatedTableId);
ereport(DEBUG2, (errmsg("table %s is added to colocation group: %d",
relationName, colocationId)));
}
else
{
colocationId = CreateColocationGroup(shardCount, replicationFactor,
distributionColumnType);
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_HASH, colocationId);
/* use the default way to create shards */
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
}

View File

@ -520,6 +520,20 @@ Table "schema_collocation.table4_groupe_1300052"
id | integer |
\c - - - :master_port
CREATE TABLE table1_groupF ( id int );
SELECT create_reference_table('table1_groupF');
create_reference_table
------------------------
(1 row)
CREATE TABLE table2_groupF ( id int );
SELECT create_reference_table('table2_groupF');
create_reference_table
------------------------
(1 row)
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
@ -530,7 +544,8 @@ SELECT * FROM pg_dist_colocation
3 | 2 | 2 | 25
4 | 4 | 2 | 23
5 | 2 | 2 | 23
(4 rows)
6 | 1 | 2 | 23
(5 rows)
-- cross check with internal colocation API
SELECT
@ -562,7 +577,8 @@ ORDER BY
table2_groupe | table3_groupe | t
table2_groupe | schema_collocation.table4_groupe | t
table3_groupe | schema_collocation.table4_groupe | t
(12 rows)
table1_groupf | table2_groupf | t
(13 rows)
-- check created shards
SELECT
@ -637,5 +653,9 @@ ORDER BY
schema_collocation.table4_groupe | 1300052 | t | 57637 | -2147483648 | -1
schema_collocation.table4_groupe | 1300053 | t | 57637 | 0 | 2147483647
schema_collocation.table4_groupe | 1300053 | t | 57638 | 0 | 2147483647
(52 rows)
table1_groupf | 1300054 | t | 57637 | -2147483648 | 2147483647
table1_groupf | 1300054 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300055 | t | 57638 | -2147483648 | 2147483647
table2_groupf | 1300055 | t | 57637 | -2147483648 | 2147483647
(56 rows)

View File

@ -37,6 +37,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
ALTER EXTENSION citus UPDATE TO '6.0-12';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -248,6 +248,12 @@ SELECT create_distributed_table('schema_collocation.table4_groupE', 'id');
\c - - - :master_port
CREATE TABLE table1_groupF ( id int );
SELECT create_reference_table('table1_groupF');
CREATE TABLE table2_groupF ( id int );
SELECT create_reference_table('table2_groupF');
-- check metadata
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000

View File

@ -42,6 +42,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-8';
ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11';
ALTER EXTENSION citus UPDATE TO '6.0-12';
-- drop extension an re-create in newest version
DROP EXTENSION citus;