Move colocation related functions to colocation_utils.c

pull/1046/head
Metin Doslu 2016-12-15 13:50:51 +02:00
parent b3593442c4
commit edbedbd744
3 changed files with 129 additions and 128 deletions

View File

@ -9,7 +9,6 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/hash.h" #include "access/hash.h"
@ -33,7 +32,6 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h"
#include "commands/trigger.h" #include "commands/trigger.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/distribution_column.h" #include "distributed/distribution_column.h"
@ -76,9 +74,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation,
uint32 colocationId); uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);
static uint32 ColocationId(int shardCount, int replicationFactor,
Oid distributionColumnType);
static uint32 GetNextColocationId(void);
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
int shardCount, int replicationFactor); int shardCount, int replicationFactor);
@ -848,129 +843,6 @@ CreateTruncateTrigger(Oid relationId)
} }
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
static uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 3;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/* /*
* CreateHashDistributedTable creates a hash distributed table with given * CreateHashDistributedTable creates a hash distributed table with given
* shard count and shard replication factor. * shard count and shard replication factor.

View File

@ -10,20 +10,24 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "commands/sequence.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/rel.h" #include "utils/rel.h"
@ -36,6 +40,7 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval,
ShardInterval *rightShardInterval); ShardInterval *rightShardInterval);
static int CompareShardPlacementsByNode(const void *leftElement, static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement); const void *rightElement);
static uint32 GetNextColocationId(void);
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
@ -339,6 +344,129 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement)
} }
/*
* ColocationId searches pg_dist_colocation for shard count, replication factor
* and distribution column type. If a matching entry is found, it returns the
* colocation id, otherwise it returns INVALID_COLOCATION_ID.
*/
uint32
ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = INVALID_COLOCATION_ID;
HeapTuple colocationTuple = NULL;
SysScanDesc scanDescriptor;
const int scanKeyCount = 3;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = true;
Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock);
/* set scan arguments */
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount));
ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor));
ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType));
scanDescriptor = systable_beginscan(pgDistColocation,
DistColocationConfigurationIndexId(),
indexOK, NULL, scanKeyCount, scanKey);
colocationTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(colocationTuple))
{
Form_pg_dist_colocation colocationForm =
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
colocationId = colocationForm->colocationid;
}
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, AccessShareLock);
return colocationId;
}
/*
* CreateColocationGroup creates a new colocation id and writes it into
* pg_dist_colocation with the given configuration. It also returns the created
* colocation id.
*/
uint32
CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType)
{
uint32 colocationId = GetNextColocationId();
Relation pgDistColocation = NULL;
TupleDesc tupleDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_colocation];
bool isNulls[Natts_pg_dist_colocation];
/* form new colocation tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId);
values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount);
values[Anum_pg_dist_colocation_replicationfactor - 1] =
UInt32GetDatum(replicationFactor);
values[Anum_pg_dist_colocation_distributioncolumntype - 1] =
ObjectIdGetDatum(distributionColumnType);
/* open colocation relation and insert the new tuple */
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistColocation);
heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
simple_heap_insert(pgDistColocation, heapTuple);
CatalogUpdateIndexes(pgDistColocation, heapTuple);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
heap_close(pgDistColocation, RowExclusiveLock);
return colocationId;
}
/*
* GetNextColocationId allocates and returns a unique colocationId for the
* colocation group to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having
* colocationId collisions.
*
* Please note that the caller is still responsible for finalizing colocationId
* with the master node. Further note that this function relies on an internal
* sequence created in initdb to generate unique identifiers.
*/
static uint32
GetNextColocationId()
{
text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum colocationIdDatum = 0;
uint32 colocationId = INVALID_COLOCATION_ID;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique colocation id from sequence */
colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
colocationId = DatumGetUInt32(colocationIdDatum);
return colocationId;
}
/* /*
* UpdateRelationColocationGroup updates colocation group in pg_dist_partition * UpdateRelationColocationGroup updates colocation group in pg_dist_partition
* for the given relation. * for the given relation.

View File

@ -25,6 +25,7 @@ extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId); extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType);
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
Oid distributionColumnType); Oid distributionColumnType);