Move colocation related functions to colocation_utils.c

pull/1012/head
Metin Doslu 2016-12-06 12:02:44 +02:00
parent be5b633e30
commit b681843d84
3 changed files with 152 additions and 151 deletions

View File

@ -9,7 +9,6 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/hash.h"
@ -33,7 +32,6 @@
#include "catalog/pg_type.h"
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h"
#include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h"
@ -41,7 +39,6 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
@ -77,7 +74,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
static uint32 GetNextColocationId(void);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
char *colocateWithTableName,
int shardCount, int replicationFactor);
@ -882,137 +878,6 @@ CreateTruncateTrigger(Oid relationId)
}
/*
* DefaultColocationId searches pg_dist_colocation for the default colocation group
* with the given configuration: shard count, replication factor and distribution
* column type. If a matching entry is found, it returns the colocation id,
* otherwise it returns INVALID_COLOCATION_ID.
*/
uint32
DefaultColocationGroupId(int shardCount, int replicationFactor,
Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 4;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
bool defaulColocationGroup = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaulColocationGroup));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType,
bool defaultColocationGroup)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
values[Anum_pg_dist_colocation_defaultgroup - 1] =
BoolGetDatum(defaultColocationGroup);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId(void)
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/*
* CreateHashDistributedTable creates a hash distributed table with given
* shard count and shard replication factor.

View File

@ -10,20 +10,24 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/indexing.h"
#include "commands/sequence.h"
#include "distributed/colocation_utils.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
@ -36,6 +40,7 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static uint32 GetNextColocationId(void);
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
@ -350,6 +355,150 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
}
/*
* TableColocationId function returns co-location id of given table. This function
* errors out if given table is not distributed.
*/
uint32
TableColocationId(Oid distributedTableId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
return cacheEntry->colocationId;
}
/*
* DefaultColocationId searches pg_dist_colocation for the default colocation group
* with the given configuration: shard count, replication factor and distribution
* column type. If a matching entry is found, it returns the colocation id,
* otherwise it returns INVALID_COLOCATION_ID.
*/
uint32
DefaultColocationGroupId(int shardCount, int replicationFactor,
Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 4;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
bool defaultColocationGroup = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
ScanKeyInit(&scanKey[3], Anum_pg_dist_colocation_defaultgroup,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(defaultColocationGroup));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType,
bool defaultColocationGroup)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
values[Anum_pg_dist_colocation_defaultgroup - 1] =
BoolGetDatum(defaultColocationGroup);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId(void)
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/*
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition
* for the given relation.
@ -407,19 +556,6 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
}
/*
* TableColocationId function returns co-location id of given table. This function
* errors out if given table is not distributed.
*/
uint32
TableColocationId(Oid distributedTableId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
return cacheEntry->colocationId;
}
/*
* TablesColocated function checks whether given two tables are co-located and
* returns true if they are co-located. A table is always co-located with itself.

View File

@ -20,6 +20,9 @@
extern uint32 TableColocationId(Oid distributedTableId);
extern uint32 DefaultColocationGroupId(int shardCount, int replicationFactor,
Oid distributionColumnType);
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType,
bool defaultColocationGroup);
extern bool TablesColocated(Oid leftDistributedTableId, Oid rightDistributedTableId);
extern bool ShardsColocated(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval);
@ -27,9 +30,6 @@ extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType,
bool defaultColocationGroup);
#endif /* COLOCATION_UTILS_H_ */