From 1b31b22635eeb1679372eceb56ae9e7a92a5d156 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Fri, 18 Sep 2020 16:28:33 +0300 Subject: [PATCH] Refactor the functions that return OID lists for citus tables --- src/backend/distributed/commands/extension.c | 10 +-- .../distributed/metadata/metadata_cache.c | 80 +++++-------------- .../distributed/utils/reference_table_utils.c | 6 +- .../distributed/utils/statistics_collection.c | 8 +- src/include/distributed/metadata_cache.h | 7 +- .../expected/insert_select_repartition.out | 1 - 6 files changed, 35 insertions(+), 77 deletions(-) diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index 7b91877c9..d7e167488 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -555,14 +555,14 @@ MarkExistingObjectDependenciesDistributedIfSupported() /* resulting object addresses to be marked as distributed */ List *resultingObjectAddresses = NIL; - /* resolve dependencies of distributed tables */ - List *distributedTableOidList = DistTableOidList(); + /* resolve dependencies of citus tables */ + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); - Oid distributedTableOid = InvalidOid; - foreach_oid(distributedTableOid, distributedTableOidList) + Oid citusTableId = InvalidOid; + foreach_oid(citusTableId, citusTableIdList) { ObjectAddress tableAddress = { 0 }; - ObjectAddressSet(tableAddress, RelationRelationId, distributedTableOid); + ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); List *distributableDependencyObjectAddresses = GetDistributableDependenciesForObject(&tableAddress); diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 58045747d..62941966f 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -376,6 +376,11 @@ IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableT return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; } + case ANY_CITUS_TABLE_TYPE: + { + return true; + } + default: { ereport(ERROR, (errmsg("Unknown table type %d", tableType))); @@ -468,10 +473,10 @@ CitusTableList(void) Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); /* first, we need to iterate over pg_dist_partition */ - List *distTableOidList = DistTableOidList(); + List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); Oid relationId = InvalidOid; - foreach_oid(relationId, distTableOidList) + foreach_oid(relationId, citusTableIdList) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); @@ -3775,15 +3780,19 @@ InvalidateMetadataSystemCache(void) /* - * DistTableOidList iterates over the pg_dist_partition table and returns - * a list that consists of the logicalrelids. + * CitusTableTypeIdList function scans pg_dist_partition and returns a + * list of OID's for the tables matching given citusTableType. + * To create the list, it performs sequential scan. Since it is not expected + * that this function will be called frequently, it is OK not to use index + * scan. If this function becomes performance bottleneck, it is possible to + * modify this function to perform index scan. */ List * -DistTableOidList(void) +CitusTableTypeIdList(CitusTableType citusTableType) { ScanKeyData scanKey[1]; int scanKeyCount = 0; - List *distTableOidList = NIL; + List *relationIdList = NIL; Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); @@ -3801,60 +3810,9 @@ DistTableOidList(void) Anum_pg_dist_partition_logicalrelid, tupleDescriptor, &isNull); Oid relationId = DatumGetObjectId(relationIdDatum); - distTableOidList = lappend_oid(distTableOidList, relationId); - - heapTuple = systable_getnext(scanDescriptor); - } - - systable_endscan(scanDescriptor); - table_close(pgDistPartition, AccessShareLock); - - return distTableOidList; -} - - -/* - * ReferenceTableOidList function scans pg_dist_partition to create a list of all - * reference tables. To create the list, it performs sequential scan. Since it is not - * expected that this function will be called frequently, it is OK not to use index scan. - * If this function becomes performance bottleneck, it is possible to modify this function - * to perform index scan. - */ -List * -ReferenceTableOidList() -{ - ScanKeyData scanKey[1]; - int scanKeyCount = 0; - List *referenceTableOidList = NIL; - - Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); - - SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition, - InvalidOid, false, - NULL, scanKeyCount, scanKey); - - TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); - - HeapTuple heapTuple = systable_getnext(scanDescriptor); - while (HeapTupleIsValid(heapTuple)) - { - bool isNull = false; - char partitionMethod = heap_getattr(heapTuple, - Anum_pg_dist_partition_partmethod, - tupleDescriptor, &isNull); - char replicationModel = heap_getattr(heapTuple, - Anum_pg_dist_partition_repmodel, - tupleDescriptor, &isNull); - - if (IsReferenceTableByDistParams(partitionMethod, replicationModel)) + if (IsCitusTableType(relationId, citusTableType)) { - Datum relationIdDatum = heap_getattr(heapTuple, - Anum_pg_dist_partition_logicalrelid, - tupleDescriptor, &isNull); - - Oid relationId = DatumGetObjectId(relationIdDatum); - - referenceTableOidList = lappend_oid(referenceTableOidList, relationId); + relationIdList = lappend_oid(relationIdList, relationId); } heapTuple = systable_getnext(scanDescriptor); @@ -3863,7 +3821,7 @@ ReferenceTableOidList() systable_endscan(scanDescriptor); table_close(pgDistPartition, AccessShareLock); - return referenceTableOidList; + return relationIdList; } @@ -3874,7 +3832,7 @@ ReferenceTableOidList() bool ClusterHasReferenceTable(void) { - return list_length(ReferenceTableOidList()) > 0; + return list_length(CitusTableTypeIdList(REFERENCE_TABLE)) > 0; } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index f75b0015e..db2a05d06 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -111,7 +111,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode) int colocationId = CreateReferenceTableColocationId(); LockColocationId(colocationId, ExclusiveLock); - List *referenceTableIdList = ReferenceTableOidList(); + List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE); if (referenceTableIdList == NIL) { /* no reference tables exist */ @@ -620,7 +620,7 @@ CreateReferenceTableColocationId() void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId) { - List *referenceTableList = ReferenceTableOidList(); + List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *referenceShardIntervalList = NIL; /* if there are no reference tables, we do not need to do anything */ @@ -714,7 +714,7 @@ ReferenceTableReplicationFactor(void) void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { - List *referenceTableList = ReferenceTableOidList(); + List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); /* if there is no reference table, we do not need to replicate anything */ if (list_length(referenceTableList) > 0) diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index 16df47da0..3783414e6 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -72,7 +72,7 @@ WarnIfSyncDNS(void) bool CollectBasicUsageStatistics(void) { - List *distTableOids = NIL; + List *citusTableIdList = NIL; uint64 roundedDistTableCount = 0; uint64 roundedClusterSize = 0; uint32 workerNodeCount = 0; @@ -93,9 +93,9 @@ CollectBasicUsageStatistics(void) PG_TRY(); { - distTableOids = DistTableOidList(); - roundedDistTableCount = NextPow2(list_length(distTableOids)); - roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); + citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); + roundedDistTableCount = NextPow2(list_length(citusTableIdList)); + roundedClusterSize = NextPow2(DistributedTablesSize(citusTableIdList)); workerNodeCount = ActivePrimaryNonCoordinatorNodeCount(); metadataJsonbDatum = DistNodeMetadata(); metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 82e609abc..63e3296c1 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -129,7 +129,9 @@ typedef enum CITUS_LOCAL_TABLE, /* table without a dist key such as reference table */ - CITUS_TABLE_WITH_NO_DIST_KEY + CITUS_TABLE_WITH_NO_DIST_KEY, + + ANY_CITUS_TABLE_TYPE } CitusTableType; extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); @@ -151,8 +153,6 @@ extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId); extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 objsubid); extern int32 GetLocalGroupId(void); -extern List * DistTableOidList(void); -extern List * ReferenceTableOidList(void); extern void CitusTableCacheFlushInvalidatedEntries(void); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); extern List * ShardPlacementList(uint64 shardId); @@ -161,6 +161,7 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void InvalidateForeignKeyGraph(void); extern void FlushDistTableCache(void); extern void InvalidateMetadataSystemCache(void); +extern List * CitusTableTypeIdList(CitusTableType citusTableType); extern Datum DistNodeMetadata(void); extern bool ClusterHasReferenceTable(void); extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 8845e7001..e65c2266c 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -1216,7 +1216,6 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6) DO UPDATE SET cardinality = enriched.cardinality + excluded.cardinality, sum = enriched.sum + excluded.sum; -DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224 DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT