Final refactoring

pull/867/head
Metin Doslu 2016-10-18 11:50:05 +03:00
parent 58ac477ffb
commit d3e7d9dc8d
6 changed files with 87 additions and 80 deletions

View File

@ -1,3 +1,5 @@
/* citus--6.0-10--6.0-11.sql */
SET search_path = 'pg_catalog'; SET search_path = 'pg_catalog';
CREATE SEQUENCE citus.pg_dist_colocationid_seq CREATE SEQUENCE citus.pg_dist_colocationid_seq
@ -77,11 +79,13 @@ BEGIN
-- ensure all shards are dropped -- ensure all shards are dropped
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- get colocation group
SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- delete partition entry -- delete partition entry
DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid;
-- drop colocation group if all referencing tables are dropped
IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN
DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id; DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id;
END IF; END IF;

View File

@ -66,7 +66,7 @@ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
static bool LocalTableEmpty(Oid tableId); static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn); Var *distributionColumn);
static void InsertPgDistPartition(Oid relationId, char distributionMethod, static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);
static void CreateTruncateTrigger(Oid relationId); static void CreateTruncateTrigger(Oid relationId);
static uint32 ColocationId(int shardCount, int replicationFactor, static uint32 ColocationId(int shardCount, int replicationFactor,
@ -84,6 +84,9 @@ PG_FUNCTION_INFO_V1(create_distributed_table);
/* /*
* master_create_distributed_table accepts a table, distribution column and * master_create_distributed_table accepts a table, distribution column and
* method and performs the corresponding catalog changes. * method and performs the corresponding catalog changes.
*
* Note that this udf is depreciated and cannot create colocated tables, so we
* always use INVALID_COLOCATION_ID.
*/ */
Datum Datum
master_create_distributed_table(PG_FUNCTION_ARGS) master_create_distributed_table(PG_FUNCTION_ARGS)
@ -134,8 +137,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
/* /*
* Get an exclusive lock on the colocation system catalog. Therefore, we * Get an exclusive lock on the colocation system catalog. Therefore, we
* can be sure that there will no modifications on the table until this * can be sure that there will no modifications on the colocation table
* transaction is committed. * until this transaction is committed.
*/ */
pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock);
@ -204,11 +207,11 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
Var *distributionColumn = NULL; Var *distributionColumn = NULL;
/* /*
* Lock target relation with an access exclusive lock - there's no way to * Lock target relation with an exclusive lock - there's no way to make
* make sense of this table until we've committed, and we don't want * sense of this table until we've committed, and we don't want multiple
* multiple backends manipulating this relation. * backends manipulating this relation.
*/ */
relation = relation_open(relationId, AccessExclusiveLock); relation = relation_open(relationId, ExclusiveLock);
relationDesc = RelationGetDescr(relation); relationDesc = RelationGetDescr(relation);
relationName = RelationGetRelationName(relation); relationName = RelationGetRelationName(relation);
@ -289,7 +292,7 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn); ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn);
InsertPgDistPartition(relationId, distributionMethod, distributionColumn, InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId); colocationId);
relation_close(relation, NoLock); relation_close(relation, NoLock);
@ -305,65 +308,20 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText,
} }
/*
* InsertPgDistPartition inserts a new tuple into pg_dist_partition.
*/
static void
InsertPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn,
uint32 colocationId)
{
Relation pgDistPartition = NULL;
const char replicationModel = 'c';
char *distributionColumnString = NULL;
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
distributionColumnString = nodeToString((Node *) distributionColumn);
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionColumnString);
newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId;
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
simple_heap_insert(pgDistPartition, newTuple);
CatalogUpdateIndexes(pgDistPartition, newTuple);
CitusInvalidateRelcacheByRelid(relationId);
RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock);
}
/* /*
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude * ErrorIfNotSupportedConstraint run checks related to unique index / exclude
* constraints. * constraints.
* *
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned
* since currently there is no way of enforcing uniqueness for overlapping shards. * tables, since currently there is no way of enforcing uniqueness for
* overlapping shards.
* *
* Similarly, do not allow such constraints if they do not include partition column. * Similarly, do not allow such constraints if they do not include partition
* This check is important for two reasons. First, currently Citus does not enforce * column. This check is important for two reasons:
* uniqueness constraint on multiple shards. * i. First, currently Citus does not enforce uniqueness constraint on multiple
* Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no * shards.
* further check for constraints. * ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed
* with no further check for constraints.
*/ */
static void static void
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
@ -452,6 +410,53 @@ ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
} }
/*
* InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition.
*/
static void
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId)
{
Relation pgDistPartition = NULL;
const char replicationModel = 'c';
char *distributionColumnString = NULL;
HeapTuple newTuple = NULL;
Datum newValues[Natts_pg_dist_partition];
bool newNulls[Natts_pg_dist_partition];
/* open system catalog and insert new tuple */
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
distributionColumnString = nodeToString((Node *) distributionColumn);
/* form new tuple for pg_dist_partition */
memset(newValues, 0, sizeof(newValues));
memset(newNulls, false, sizeof(newNulls));
newValues[Anum_pg_dist_partition_logicalrelid - 1] =
ObjectIdGetDatum(relationId);
newValues[Anum_pg_dist_partition_partmethod - 1] =
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_partkey - 1] =
CStringGetTextDatum(distributionColumnString);
newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId;
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls);
/* finally insert tuple, build index entries & register cache invalidation */
simple_heap_insert(pgDistPartition, newTuple);
CatalogUpdateIndexes(pgDistPartition, newTuple);
CitusInvalidateRelcacheByRelid(relationId);
RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn);
CommandCounterIncrement();
heap_close(pgDistPartition, NoLock);
}
/* /*
* RecordDistributedRelationDependencies creates the dependency entries * RecordDistributedRelationDependencies creates the dependency entries
* necessary for a distributed relation in addition to the preexisting ones * necessary for a distributed relation in addition to the preexisting ones
@ -669,11 +674,10 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
uint32 colocationId = INVALID_COLOCATION_ID; uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL; HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor; SysScanDesc scanDescriptor;
ScanKeyData scanKey[3]; const int scanKeyCount = 3;
int scanKeyCount = 3; ScanKeyData scanKey[scanKeyCount];
bool indexOK = true; bool indexOK = true;
/* acquire a lock, so that no one can do this concurrently */
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */ /* set scan arguments */
@ -706,12 +710,13 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
/* /*
* CreateColocationGroup creates a new colocation id and writes it into * CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. * pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/ */
static uint32 static uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{ {
uint32 colocationId = INVALID_COLOCATION_ID; uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL; Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL; HeapTuple heapTuple = NULL;
@ -722,8 +727,6 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
memset(values, 0, sizeof(values)); memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls)); memset(isNulls, false, sizeof(isNulls));
colocationId = GetNextColocationId();
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] = values[Anum_pg_dist_colocation_replicationfactor - 1] =
@ -742,7 +745,7 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
/* increment the counter so that next command can see the row */ /* increment the counter so that next command can see the row */
CommandCounterIncrement(); CommandCounterIncrement();
heap_close(pgDistColocation, NoLock); heap_close(pgDistColocation, RowExclusiveLock);
return colocationId; return colocationId;
} }

View File

@ -249,7 +249,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
/* a share metadata lock is enough on the source relation */ /* a share metadata lock is enough on the source relation */
LockRelationDistributionMetadata(sourceRelationId, ShareLock); LockRelationDistributionMetadata(sourceRelationId, ShareLock);
/* prevent concurrent placement changes */ /* prevent placement changes of the source relation until we colocate with them */
sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); sourceShardIntervalList = LoadShardIntervalList(sourceRelationId);
LockShardListMetadata(sourceShardIntervalList, ShareLock); LockShardListMetadata(sourceShardIntervalList, ShareLock);
@ -271,15 +271,15 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
{ {
ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell); ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell);
uint64 sourceShardId = sourceShardInterval->shardId; uint64 sourceShardId = sourceShardInterval->shardId;
List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
uint64 newShardId = GetNextShardId(); uint64 newShardId = GetNextShardId();
ListCell *sourceShardPlacementCell = NULL;
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue); text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue); text *shardMaxValueText = IntegerToText(shardMaxValue);
ListCell *sourceShardPlacementCell = NULL; List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
foreach(sourceShardPlacementCell, sourceShardPlacementList) foreach(sourceShardPlacementCell, sourceShardPlacementList)
{ {
ShardPlacement *sourcePlacement = ShardPlacement *sourcePlacement =

View File

@ -4,7 +4,7 @@
* definition of the relation that holds the colocation information on the * definition of the relation that holds the colocation information on the
* cluster (pg_dist_colocation). * cluster (pg_dist_colocation).
* *
* Copyright (c) 2012-2016, Citus Data, Inc. * Copyright (c) 2016, Citus Data, Inc.
* *
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
@ -19,8 +19,8 @@
typedef struct FormData_pg_dist_colocation typedef struct FormData_pg_dist_colocation
{ {
uint32 colocationid; uint32 colocationid;
int shardcount; uint32 shardcount;
int replicationfactor; uint32 replicationfactor;
Oid distributioncolumntype; Oid distributioncolumntype;
} FormData_pg_dist_colocation; } FormData_pg_dist_colocation;

View File

@ -37,7 +37,7 @@ $colocate_tables$;
-- create test functions -- create test functions
-- =================================================================== -- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass) CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT RETURNS INTEGER
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;
CREATE FUNCTION tables_colocated(regclass, regclass) CREATE FUNCTION tables_colocated(regclass, regclass)

View File

@ -43,7 +43,7 @@ $colocate_tables$;
-- =================================================================== -- ===================================================================
CREATE FUNCTION get_table_colocation_id(regclass) CREATE FUNCTION get_table_colocation_id(regclass)
RETURNS BIGINT RETURNS INTEGER
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;