From 3f44d6ea726b431009e2147e00ec4d119efb0634 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 22 Sep 2021 18:48:10 +0300 Subject: [PATCH] Introduce new column for pg_dist_partition: autoconverted --- .../citus_add_local_table_to_metadata.c | 2 +- .../commands/create_distributed_table.c | 3 +- .../distributed/metadata/metadata_cache.c | 44 +++++++++++++++++++ .../distributed/metadata/metadata_sync.c | 3 +- .../distributed/metadata/metadata_utility.c | 3 +- .../utils/foreign_key_relationship.c | 12 +++++ src/include/distributed/metadata_cache.h | 2 + src/include/distributed/metadata_utility.h | 2 +- src/include/distributed/pg_dist_partition.h | 4 +- 9 files changed, 69 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 8218c68da..06e6bd29b 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -1102,7 +1102,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId) Var *distributionColumn = NULL; InsertIntoPgDistPartition(citusLocalTableId, distributionMethod, distributionColumn, colocationId, - replicationModel); + replicationModel, false); /* set shard storage type according to relation type */ char shardStorageType = ShardStorageType(citusLocalTableId); diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 89d14f2af..c620af6c8 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -482,10 +482,11 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio /* we need to calculate these variables before creating distributed metadata */ bool localTableEmpty = TableEmpty(relationId); Oid colocatedTableId = ColocatedTableId(colocationId); + bool autoConverted = false; /* create an entry for distributed table in pg_dist_partition */ InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, - colocationId, replicationModel); + colocationId, replicationModel, autoConverted); /* * Ensure that the sequences used in column defaults of the table diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index a84009a1a..2dd26c776 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -517,6 +517,44 @@ PartitionMethodViaCatalog(Oid relationId) } +/* + * AutoConvertedViaCatalog gets a relationId and returns the auto converted + * column from pg_dist_partition via reading from catalog. + */ +bool +AutoConvertedViaCatalog(Oid relationId) +{ + HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId); + if (!HeapTupleIsValid(partitionTuple)) + { + ereport(ERROR, (errmsg("relation not found with oid: %u", relationId))); + } + + Datum datumArray[Natts_pg_dist_partition]; + bool isNullArray[Natts_pg_dist_partition]; + + Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); + + TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); + heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); + + if (isNullArray[Anum_pg_dist_partition_autoconverted - 1]) + { + heap_freetuple(partitionTuple); + table_close(pgDistPartition, NoLock); + return false; + } + + Datum autoConvertedDatum = datumArray[Anum_pg_dist_partition_autoconverted - 1]; + bool autoConvertedBool = DatumGetBool(autoConvertedDatum); + + heap_freetuple(partitionTuple); + table_close(pgDistPartition, NoLock); + + return autoConvertedBool; +} + + /* * PgDistPartitionTupleViaCatalog is a helper function that searches * pg_dist_partition for the given relationId. The caller is responsible @@ -1387,6 +1425,12 @@ BuildCitusTableCacheEntry(Oid relationId) cacheEntry->replicationModel = DatumGetChar(replicationModelDatum); } + cacheEntry->autoConverted = datumArray[Anum_pg_dist_partition_autoconverted - 1]; + if (isNullArray[Anum_pg_dist_partition_autoconverted - 1]) + { + cacheEntry->autoConverted = false; + } + heap_freetuple(distPartitionTuple); BuildCachedShardList(cacheEntry); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 2e9f6489b..e2db63e52 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -2074,6 +2074,7 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS) text *distributionColumnText = NULL; char *distributionColumnString = NULL; Var *distributionColumnVar = NULL; + bool autoConverted = false; /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); @@ -2123,7 +2124,7 @@ citus_internal_add_partition_metadata(PG_FUNCTION_ARGS) } InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar, - colocationId, replicationModel); + colocationId, replicationModel, autoConverted); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index ddd8b49bc..9063deb3c 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -1744,7 +1744,7 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId, void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId, - char replicationModel) + char replicationModel, bool autoConverted) { char *distributionColumnString = NULL; @@ -1764,6 +1764,7 @@ InsertIntoPgDistPartition(Oid relationId, char distributionMethod, CharGetDatum(distributionMethod); newValues[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId); newValues[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel); + newValues[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted); /* set partkey column to NULL for reference tables */ if (distributionMethod != DISTRIBUTE_BY_NONE) diff --git a/src/backend/distributed/utils/foreign_key_relationship.c b/src/backend/distributed/utils/foreign_key_relationship.c index dd085191d..ac0cbb059 100644 --- a/src/backend/distributed/utils/foreign_key_relationship.c +++ b/src/backend/distributed/utils/foreign_key_relationship.c @@ -158,6 +158,18 @@ ShouldUndistributeCitusLocalTable(Oid relationId) { if (IsCitusTableType(relationOid, REFERENCE_TABLE)) { + /* + * The relation is connected to a reference table via foreign keys, + * we shouldn't undistribute it. + */ + return false; + } + if (!AutoConvertedViaCatalog(relationId)) + { + /* + * The relation is connected to a (or, is a) Citus Local Table created + * by the user. We shouldn't undistribute it. + */ return false; } } diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index fc76c47e4..89bc094a7 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -63,6 +63,7 @@ typedef struct char partitionMethod; uint32 colocationId; char replicationModel; + bool autoConverted; /* pg_dist_shard metadata (variable-length ShardInterval array) for this table */ int shardIntervalArrayLength; @@ -146,6 +147,7 @@ extern bool IsCitusTable(Oid relationId); extern char PgDistPartitionViaCatalog(Oid relationId); extern List * LookupDistShardTuples(Oid relationId); extern char PartitionMethodViaCatalog(Oid relationId); +extern bool AutoConvertedViaCatalog(Oid relationId); extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel); extern List * CitusTableList(void); extern ShardInterval * LoadShardInterval(uint64 shardId); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 0e4a6e6c8..f6e8a81a5 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -234,7 +234,7 @@ extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId, int32 groupId); extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, Var *distributionColumn, uint32 colocationId, - char replicationModel); + char replicationModel, bool autoConverted); extern void DeletePartitionRow(Oid distributedRelationId); extern void DeleteShardRow(uint64 shardId); extern void UpdateShardPlacementState(uint64 placementId, char shardState); diff --git a/src/include/distributed/pg_dist_partition.h b/src/include/distributed/pg_dist_partition.h index 36a94da7a..032cc195a 100644 --- a/src/include/distributed/pg_dist_partition.h +++ b/src/include/distributed/pg_dist_partition.h @@ -28,6 +28,7 @@ typedef struct FormData_pg_dist_partition uint32 colocationid; /* id of the co-location group of particular table belongs to */ char repmodel; /* replication model; see codes below */ #endif + bool autoconverted; } FormData_pg_dist_partition; /* ---------------- @@ -41,12 +42,13 @@ typedef FormData_pg_dist_partition *Form_pg_dist_partition; * compiler constants for pg_dist_partitions * ---------------- */ -#define Natts_pg_dist_partition 5 +#define Natts_pg_dist_partition 6 #define Anum_pg_dist_partition_logicalrelid 1 #define Anum_pg_dist_partition_partmethod 2 #define Anum_pg_dist_partition_partkey 3 #define Anum_pg_dist_partition_colocationid 4 #define Anum_pg_dist_partition_repmodel 5 +#define Anum_pg_dist_partition_autoconverted 6 /* valid values for partmethod include append, hash, and range */ #define DISTRIBUTE_BY_APPEND 'a'