Introduce new column for pg_dist_partition: autoconverted

talha_tes1
Ahmet Gedemenli 2021-09-22 18:48:10 +03:00
parent 2cfac21382
commit 3f44d6ea72
9 changed files with 69 additions and 6 deletions

View File

@ -1102,7 +1102,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId)
Var *distributionColumn = NULL;
InsertIntoPgDistPartition(citusLocalTableId, distributionMethod,
distributionColumn, colocationId,
replicationModel);
replicationModel, false);
/* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(citusLocalTableId);

View File

@ -482,10 +482,11 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
/* we need to calculate these variables before creating distributed metadata */
bool localTableEmpty = TableEmpty(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);
bool autoConverted = false;
/* create an entry for distributed table in pg_dist_partition */
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel);
colocationId, replicationModel, autoConverted);
/*
* Ensure that the sequences used in column defaults of the table

View File

@ -517,6 +517,44 @@ PartitionMethodViaCatalog(Oid relationId)
}
/*
* AutoConvertedViaCatalog gets a relationId and returns the auto converted
* column from pg_dist_partition via reading from catalog.
*/
bool
AutoConvertedViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
if (!HeapTupleIsValid(partitionTuple))
{
ereport(ERROR, (errmsg("relation not found with oid: %u", relationId)));
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_autoconverted - 1])
{
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return false;
}
Datum autoConvertedDatum = datumArray[Anum_pg_dist_partition_autoconverted - 1];
bool autoConvertedBool = DatumGetBool(autoConvertedDatum);
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return autoConvertedBool;
}
/*
* PgDistPartitionTupleViaCatalog is a helper function that searches
* pg_dist_partition for the given relationId. The caller is responsible
@ -1387,6 +1425,12 @@ BuildCitusTableCacheEntry(Oid relationId)
cacheEntry->replicationModel = DatumGetChar(replicationModelDatum);
}
cacheEntry->autoConverted = datumArray[Anum_pg_dist_partition_autoconverted - 1];
if (isNullArray[Anum_pg_dist_partition_autoconverted - 1])
{
cacheEntry->autoConverted = false;
}
heap_freetuple(distPartitionTuple);
BuildCachedShardList(cacheEntry);

View File

@ -2074,6 +2074,7 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
text *distributionColumnText = NULL;
char *distributionColumnString = NULL;
Var *distributionColumnVar = NULL;
bool autoConverted = false;
/* only owner of the table (or superuser) is allowed to add the Citus metadata */
EnsureTableOwner(relationId);
@ -2123,7 +2124,7 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS)
}
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar,
colocationId, replicationModel);
colocationId, replicationModel, autoConverted);
PG_RETURN_VOID();
}

View File

@ -1744,7 +1744,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
void
InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId,
char replicationModel)
char replicationModel, bool autoConverted)
{
char *distributionColumnString = NULL;
@ -1764,6 +1764,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
CharGetDatum(distributionMethod);
newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
/* set partkey column to NULL for reference tables */
if (distributionMethod != DISTRIBUTE_BY_NONE)

View File

@ -158,6 +158,18 @@ ShouldUndistributeCitusLocalTable(Oid relationId)
{
if (IsCitusTableType(relationOid, REFERENCE_TABLE))
{
/*
* The relation is connected to a reference table via foreign keys,
* we shouldn't undistribute it.
*/
return false;
}
if (!AutoConvertedViaCatalog(relationId))
{
/*
* The relation is connected to a (or, is a) Citus Local Table created
* by the user. We shouldn't undistribute it.
*/
return false;
}
}

View File

@ -63,6 +63,7 @@ typedef struct
char partitionMethod;
uint32 colocationId;
char replicationModel;
bool autoConverted;
/* pg_dist_shard metadata (variable-length ShardInterval array) for this table */
int shardIntervalArrayLength;
@ -146,6 +147,7 @@ extern bool IsCitusTable(Oid relationId);
extern char PgDistPartitionViaCatalog(Oid relationId);
extern List * LookupDistShardTuples(Oid relationId);
extern char PartitionMethodViaCatalog(Oid relationId);
extern bool AutoConvertedViaCatalog(Oid relationId);
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);

View File

@ -234,7 +234,7 @@ extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
int32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId,
char replicationModel);
char replicationModel, bool autoConverted);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState);

View File

@ -28,6 +28,7 @@ typedef struct FormData_pg_dist_partition
uint32 colocationid; /* id of the co-location group of particular table belongs to */
char repmodel; /* replication model; see codes below */
#endif
bool autoconverted;
} FormData_pg_dist_partition;
/* ----------------
@ -41,12 +42,13 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition;
* compiler constants for pg_dist_partitions
* ----------------
*/
#define Natts_pg_dist_partition 5
#define Natts_pg_dist_partition 6
#define Anum_pg_dist_partition_logicalrelid 1
#define Anum_pg_dist_partition_partmethod 2
#define Anum_pg_dist_partition_partkey 3
#define Anum_pg_dist_partition_colocationid 4
#define Anum_pg_dist_partition_repmodel 5
#define Anum_pg_dist_partition_autoconverted 6
/* valid values for partmethod include append, hash, and range */
#define DISTRIBUTE_BY_APPEND 'a'