From d3e7d9dc8d6dd81a2d5560823e686f928e7f3e95 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 18 Oct 2016 11:50:05 +0300 Subject: [PATCH] Final refactoring --- .../distributed/citus--6.0-10--6.0-11.sql | 4 + .../commands/create_distributed_table.c | 147 +++++++++--------- .../distributed/master/master_create_shards.c | 6 +- src/include/distributed/pg_dist_colocation.h | 6 +- .../expected/multi_colocation_utils.out | 2 +- .../regress/sql/multi_colocation_utils.sql | 2 +- 6 files changed, 87 insertions(+), 80 deletions(-) diff --git a/src/backend/distributed/citus--6.0-10--6.0-11.sql b/src/backend/distributed/citus--6.0-10--6.0-11.sql index 8dda70294..48fdca26c 100644 --- a/src/backend/distributed/citus--6.0-10--6.0-11.sql +++ b/src/backend/distributed/citus--6.0-10--6.0-11.sql @@ -1,3 +1,5 @@ +/* citus--6.0-10--6.0-11.sql */ + SET search_path = 'pg_catalog'; CREATE SEQUENCE citus.pg_dist_colocationid_seq @@ -77,11 +79,13 @@ BEGIN -- ensure all shards are dropped PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + -- get colocation group SELECT colocationid INTO table_colocation_id FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; -- delete partition entry DELETE FROM pg_dist_partition WHERE logicalrelid = v_obj.objid; + -- drop colocation group if all referencing tables are dropped IF NOT EXISTS(SELECT * FROM pg_dist_partition WHERE colocationId = table_colocation_id) THEN DELETE FROM pg_dist_colocation WHERE colocationId = table_colocation_id; END IF; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index f66da0e86..3ac7a5f75 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -66,8 +66,8 @@ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn); -static void InsertPgDistPartition(Oid relationId, char distributionMethod, - Var *distributionColumn, uint32 colocationId); +static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId); static void CreateTruncateTrigger(Oid relationId); static uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); @@ -84,6 +84,9 @@ PG_FUNCTION_INFO_V1(create_distributed_table); /* * master_create_distributed_table accepts a table, distribution column and * method and performs the corresponding catalog changes. + * + * Note that this udf is depreciated and cannot create colocated tables, so we + * always use INVALID_COLOCATION_ID. */ Datum master_create_distributed_table(PG_FUNCTION_ARGS) @@ -134,8 +137,8 @@ create_distributed_table(PG_FUNCTION_ARGS) /* * Get an exclusive lock on the colocation system catalog. Therefore, we - * can be sure that there will no modifications on the table until this - * transaction is committed. + * can be sure that there will no modifications on the colocation table + * until this transaction is committed. */ pgDistColocation = heap_open(DistColocationRelationId(), ExclusiveLock); @@ -204,11 +207,11 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText, Var *distributionColumn = NULL; /* - * Lock target relation with an access exclusive lock - there's no way to - * make sense of this table until we've committed, and we don't want - * multiple backends manipulating this relation. + * Lock target relation with an exclusive lock - there's no way to make + * sense of this table until we've committed, and we don't want multiple + * backends manipulating this relation. */ - relation = relation_open(relationId, AccessExclusiveLock); + relation = relation_open(relationId, ExclusiveLock); relationDesc = RelationGetDescr(relation); relationName = RelationGetRelationName(relation); @@ -289,8 +292,8 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText, ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn); - InsertPgDistPartition(relationId, distributionMethod, distributionColumn, - colocationId); + InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, + colocationId); relation_close(relation, NoLock); @@ -305,65 +308,20 @@ ConvertToDistributedTable(Oid relationId, text *distributionColumnText, } -/* - * InsertPgDistPartition inserts a new tuple into pg_dist_partition. - */ -static void -InsertPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, - uint32 colocationId) -{ - Relation pgDistPartition = NULL; - const char replicationModel = 'c'; - char *distributionColumnString = NULL; - - HeapTuple newTuple = NULL; - Datum newValues[Natts_pg_dist_partition]; - bool newNulls[Natts_pg_dist_partition]; - - /* open system catalog and insert new tuple */ - pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); - - distributionColumnString = nodeToString((Node *) distributionColumn); - - /* form new tuple for pg_dist_partition */ - memset(newValues, 0, sizeof(newValues)); - memset(newNulls, false, sizeof(newNulls)); - - newValues[Anum_pg_dist_partition_logicalrelid - 1] = - ObjectIdGetDatum(relationId); - newValues[Anum_pg_dist_partition_partmethod - 1] = - CharGetDatum(distributionMethod); - newValues[Anum_pg_dist_partition_partkey - 1] = - CStringGetTextDatum(distributionColumnString); - newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId; - newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); - - newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); - - /* finally insert tuple, build index entries & register cache invalidation */ - simple_heap_insert(pgDistPartition, newTuple); - CatalogUpdateIndexes(pgDistPartition, newTuple); - CitusInvalidateRelcacheByRelid(relationId); - - RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn); - - CommandCounterIncrement(); - heap_close(pgDistPartition, NoLock); -} - - /* * ErrorIfNotSupportedConstraint run checks related to unique index / exclude * constraints. * - * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned tables, - * since currently there is no way of enforcing uniqueness for overlapping shards. + * Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned + * tables, since currently there is no way of enforcing uniqueness for + * overlapping shards. * - * Similarly, do not allow such constraints if they do not include partition column. - * This check is important for two reasons. First, currently Citus does not enforce - * uniqueness constraint on multiple shards. - * Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed with no - * further check for constraints. + * Similarly, do not allow such constraints if they do not include partition + * column. This check is important for two reasons: + * i. First, currently Citus does not enforce uniqueness constraint on multiple + * shards. + * ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed + * with no further check for constraints. */ static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, @@ -452,6 +410,53 @@ ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, } +/* + * InsertIntoPgDistPartition inserts a new tuple into pg_dist_partition. + */ +static void +InsertIntoPgDistPartition(Oid relationId, char distributionMethod, + Var *distributionColumn, uint32 colocationId) +{ + Relation pgDistPartition = NULL; + const char replicationModel = 'c'; + char *distributionColumnString = NULL; + + HeapTuple newTuple = NULL; + Datum newValues[Natts_pg_dist_partition]; + bool newNulls[Natts_pg_dist_partition]; + + /* open system catalog and insert new tuple */ + pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock); + + distributionColumnString = nodeToString((Node *) distributionColumn); + + /* form new tuple for pg_dist_partition */ + memset(newValues, 0, sizeof(newValues)); + memset(newNulls, false, sizeof(newNulls)); + + newValues[Anum_pg_dist_partition_logicalrelid - 1] = + ObjectIdGetDatum(relationId); + newValues[Anum_pg_dist_partition_partmethod - 1] = + CharGetDatum(distributionMethod); + newValues[Anum_pg_dist_partition_partkey - 1] = + CStringGetTextDatum(distributionColumnString); + newValues[Anum_pg_dist_partition_colocationid - 1] = colocationId; + newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); + + newTuple = heap_form_tuple(RelationGetDescr(pgDistPartition), newValues, newNulls); + + /* finally insert tuple, build index entries & register cache invalidation */ + simple_heap_insert(pgDistPartition, newTuple); + CatalogUpdateIndexes(pgDistPartition, newTuple); + CitusInvalidateRelcacheByRelid(relationId); + + RecordDistributedRelationDependencies(relationId, (Node *) distributionColumn); + + CommandCounterIncrement(); + heap_close(pgDistPartition, NoLock); +} + + /* * RecordDistributedRelationDependencies creates the dependency entries * necessary for a distributed relation in addition to the preexisting ones @@ -669,11 +674,10 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) uint32 colocationId = INVALID_COLOCATION_ID; HeapTuple colocationTuple = NULL; SysScanDesc scanDescriptor; - ScanKeyData scanKey[3]; - int scanKeyCount = 3; + const int scanKeyCount = 3; + ScanKeyData scanKey[scanKeyCount]; bool indexOK = true; - /* acquire a lock, so that no one can do this concurrently */ Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); /* set scan arguments */ @@ -706,12 +710,13 @@ ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) /* * CreateColocationGroup creates a new colocation id and writes it into - * pg_dist_colocation with the given configuration. + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. */ static uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) { - uint32 colocationId = INVALID_COLOCATION_ID; + uint32 colocationId = GetNextColocationId(); Relation pgDistColocation = NULL; TupleDesc tupleDescriptor = NULL; HeapTuple heapTuple = NULL; @@ -722,8 +727,6 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - colocationId = GetNextColocationId(); - values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); values[Anum_pg_dist_colocation_replicationfactor - 1] = @@ -742,7 +745,7 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol /* increment the counter so that next command can see the row */ CommandCounterIncrement(); - heap_close(pgDistColocation, NoLock); + heap_close(pgDistColocation, RowExclusiveLock); return colocationId; } diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 311a1c857..abf89ff22 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -249,7 +249,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) /* a share metadata lock is enough on the source relation */ LockRelationDistributionMetadata(sourceRelationId, ShareLock); - /* prevent concurrent placement changes */ + /* prevent placement changes of the source relation until we colocate with them */ sourceShardIntervalList = LoadShardIntervalList(sourceRelationId); LockShardListMetadata(sourceShardIntervalList, ShareLock); @@ -271,15 +271,15 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) { ShardInterval *sourceShardInterval = (ShardInterval *) lfirst(sourceShardCell); uint64 sourceShardId = sourceShardInterval->shardId; - List *sourceShardPlacementList = ShardPlacementList(sourceShardId); uint64 newShardId = GetNextShardId(); + ListCell *sourceShardPlacementCell = NULL; int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); text *shardMinValueText = IntegerToText(shardMinValue); text *shardMaxValueText = IntegerToText(shardMaxValue); - ListCell *sourceShardPlacementCell = NULL; + List *sourceShardPlacementList = ShardPlacementList(sourceShardId); foreach(sourceShardPlacementCell, sourceShardPlacementList) { ShardPlacement *sourcePlacement = diff --git a/src/include/distributed/pg_dist_colocation.h b/src/include/distributed/pg_dist_colocation.h index a0250c643..985c1332d 100644 --- a/src/include/distributed/pg_dist_colocation.h +++ b/src/include/distributed/pg_dist_colocation.h @@ -4,7 +4,7 @@ * definition of the relation that holds the colocation information on the * cluster (pg_dist_colocation). * - * Copyright (c) 2012-2016, Citus Data, Inc. + * Copyright (c) 2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ @@ -19,8 +19,8 @@ typedef struct FormData_pg_dist_colocation { uint32 colocationid; - int shardcount; - int replicationfactor; + uint32 shardcount; + uint32 replicationfactor; Oid distributioncolumntype; } FormData_pg_dist_colocation; diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 1a069702c..fdff3970c 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -37,7 +37,7 @@ $colocate_tables$; -- create test functions -- =================================================================== CREATE FUNCTION get_table_colocation_id(regclass) - RETURNS BIGINT + RETURNS INTEGER AS 'citus' LANGUAGE C STRICT; CREATE FUNCTION tables_colocated(regclass, regclass) diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index bd6d36116..803280f5d 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -43,7 +43,7 @@ $colocate_tables$; -- =================================================================== CREATE FUNCTION get_table_colocation_id(regclass) - RETURNS BIGINT + RETURNS INTEGER AS 'citus' LANGUAGE C STRICT;