diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index b59add3fe..9f1de23ac 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,7 +9,6 @@ */ #include "postgres.h" -#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -33,7 +32,6 @@ #include "catalog/pg_type.h" #include "commands/defrem.h" #include "commands/extension.h" -#include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/colocation_utils.h" #include "distributed/distribution_column.h" @@ -76,9 +74,6 @@ static void ErrorIfNotSupportedForeignConstraint(Relation relation, uint32 colocationId); static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId); -static uint32 ColocationId(int shardCount, int replicationFactor, - Oid distributionColumnType); -static uint32 GetNextColocationId(void); static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName, int shardCount, int replicationFactor); @@ -848,129 +843,6 @@ CreateTruncateTrigger(Oid relationId) } -/* - * ColocationId searches pg_dist_colocation for shard count, replication factor - * and distribution column type. If a matching entry is found, it returns the - * colocation id, otherwise it returns INVALID_COLOCATION_ID. - */ -static uint32 -ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = INVALID_COLOCATION_ID; - HeapTuple colocationTuple = NULL; - SysScanDesc scanDescriptor; - const int scanKeyCount = 3; - ScanKeyData scanKey[scanKeyCount]; - bool indexOK = true; - - Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); - - /* set scan arguments */ - ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, - BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); - ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, - BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); - ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); - - scanDescriptor = systable_beginscan(pgDistColocation, - DistColocationConfigurationIndexId(), - indexOK, NULL, scanKeyCount, scanKey); - - colocationTuple = systable_getnext(scanDescriptor); - if (HeapTupleIsValid(colocationTuple)) - { - Form_pg_dist_colocation colocationForm = - (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); - - colocationId = colocationForm->colocationid; - } - - systable_endscan(scanDescriptor); - heap_close(pgDistColocation, AccessShareLock); - - return colocationId; -} - - -/* - * CreateColocationGroup creates a new colocation id and writes it into - * pg_dist_colocation with the given configuration. It also returns the created - * colocation id. - */ -uint32 -CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) -{ - uint32 colocationId = GetNextColocationId(); - Relation pgDistColocation = NULL; - TupleDesc tupleDescriptor = NULL; - HeapTuple heapTuple = NULL; - Datum values[Natts_pg_dist_colocation]; - bool isNulls[Natts_pg_dist_colocation]; - - /* form new colocation tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); - values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); - values[Anum_pg_dist_colocation_replicationfactor - 1] = - UInt32GetDatum(replicationFactor); - values[Anum_pg_dist_colocation_distributioncolumntype - 1] = - ObjectIdGetDatum(distributionColumnType); - - /* open colocation relation and insert the new tuple */ - pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); - - tupleDescriptor = RelationGetDescr(pgDistColocation); - heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); - - simple_heap_insert(pgDistColocation, heapTuple); - CatalogUpdateIndexes(pgDistColocation, heapTuple); - - /* increment the counter so that next command can see the row */ - CommandCounterIncrement(); - heap_close(pgDistColocation, RowExclusiveLock); - - return colocationId; -} - - -/* - * GetNextColocationId allocates and returns a unique colocationId for the - * colocation group to be created. This allocation occurs both in shared memory - * and in write ahead logs; writing to logs avoids the risk of having - * colocationId collisions. - * - * Please note that the caller is still responsible for finalizing colocationId - * with the master node. Further note that this function relies on an internal - * sequence created in initdb to generate unique identifiers. - */ -static uint32 -GetNextColocationId() -{ - text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); - Oid sequenceId = ResolveRelationId(sequenceName); - Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - Datum colocationIdDatum = 0; - uint32 colocationId = INVALID_COLOCATION_ID; - - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - - /* generate new and unique colocation id from sequence */ - colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - - SetUserIdAndSecContext(savedUserId, savedSecurityContext); - - colocationId = DatumGetUInt32(colocationIdDatum); - - return colocationId; -} - - /* * CreateHashDistributedTable creates a hash distributed table with given * shard count and shard replication factor. diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index 432fedd27..6c3485894 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -10,20 +10,24 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/xact.h" #include "catalog/indexing.h" +#include "commands/sequence.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" +#include "distributed/pg_dist_colocation.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" +#include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" @@ -36,6 +40,7 @@ static bool ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShardInterval); static int CompareShardPlacementsByNode(const void *leftElement, const void *rightElement); +static uint32 GetNextColocationId(void); static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId); @@ -339,6 +344,129 @@ CompareShardPlacementsByNode(const void *leftElement, const void *rightElement) } +/* + * ColocationId searches pg_dist_colocation for shard count, replication factor + * and distribution column type. If a matching entry is found, it returns the + * colocation id, otherwise it returns INVALID_COLOCATION_ID. + */ +uint32 +ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = INVALID_COLOCATION_ID; + HeapTuple colocationTuple = NULL; + SysScanDesc scanDescriptor; + const int scanKeyCount = 3; + ScanKeyData scanKey[scanKeyCount]; + bool indexOK = true; + + Relation pgDistColocation = heap_open(DistColocationRelationId(), AccessShareLock); + + /* set scan arguments */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_shardcount, + BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(shardCount)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_colocation_replicationfactor, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(replicationFactor)); + ScanKeyInit(&scanKey[2], Anum_pg_dist_colocation_distributioncolumntype, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributionColumnType)); + + scanDescriptor = systable_beginscan(pgDistColocation, + DistColocationConfigurationIndexId(), + indexOK, NULL, scanKeyCount, scanKey); + + colocationTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(colocationTuple)) + { + Form_pg_dist_colocation colocationForm = + (Form_pg_dist_colocation) GETSTRUCT(colocationTuple); + + colocationId = colocationForm->colocationid; + } + + systable_endscan(scanDescriptor); + heap_close(pgDistColocation, AccessShareLock); + + return colocationId; +} + + +/* + * CreateColocationGroup creates a new colocation id and writes it into + * pg_dist_colocation with the given configuration. It also returns the created + * colocation id. + */ +uint32 +CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType) +{ + uint32 colocationId = GetNextColocationId(); + Relation pgDistColocation = NULL; + TupleDesc tupleDescriptor = NULL; + HeapTuple heapTuple = NULL; + Datum values[Natts_pg_dist_colocation]; + bool isNulls[Natts_pg_dist_colocation]; + + /* form new colocation tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[Anum_pg_dist_colocation_colocationid - 1] = UInt32GetDatum(colocationId); + values[Anum_pg_dist_colocation_shardcount - 1] = UInt32GetDatum(shardCount); + values[Anum_pg_dist_colocation_replicationfactor - 1] = + UInt32GetDatum(replicationFactor); + values[Anum_pg_dist_colocation_distributioncolumntype - 1] = + ObjectIdGetDatum(distributionColumnType); + + /* open colocation relation and insert the new tuple */ + pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock); + + tupleDescriptor = RelationGetDescr(pgDistColocation); + heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + + simple_heap_insert(pgDistColocation, heapTuple); + CatalogUpdateIndexes(pgDistColocation, heapTuple); + + /* increment the counter so that next command can see the row */ + CommandCounterIncrement(); + heap_close(pgDistColocation, RowExclusiveLock); + + return colocationId; +} + + +/* + * GetNextColocationId allocates and returns a unique colocationId for the + * colocation group to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having + * colocationId collisions. + * + * Please note that the caller is still responsible for finalizing colocationId + * with the master node. Further note that this function relies on an internal + * sequence created in initdb to generate unique identifiers. + */ +static uint32 +GetNextColocationId() +{ + text *sequenceName = cstring_to_text(COLOCATIONID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum colocationIdDatum = 0; + uint32 colocationId = INVALID_COLOCATION_ID; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique colocation id from sequence */ + colocationIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + colocationId = DatumGetUInt32(colocationIdDatum); + + return colocationId; +} + + /* * UpdateRelationColocationGroup updates colocation group in pg_dist_partition * for the given relation. diff --git a/src/include/distributed/colocation_utils.h b/src/include/distributed/colocation_utils.h index 09de91ee3..dc3754ab3 100644 --- a/src/include/distributed/colocation_utils.h +++ b/src/include/distributed/colocation_utils.h @@ -25,6 +25,7 @@ extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern Oid ColocatedTableId(Oid colocationId); extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex); +uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType); extern uint32 CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionColumnType);