Refactor the functions that return OID lists for citus tables

pull/4176/head
Onur Tirtir 2020-09-18 16:28:33 +03:00
parent dae2c69fd7
commit 1b31b22635
6 changed files with 35 additions and 77 deletions

View File

@ -555,14 +555,14 @@ MarkExistingObjectDependenciesDistributedIfSupported()
/* resulting object addresses to be marked as distributed */ /* resulting object addresses to be marked as distributed */
List *resultingObjectAddresses = NIL; List *resultingObjectAddresses = NIL;
/* resolve dependencies of distributed tables */ /* resolve dependencies of citus tables */
List *distributedTableOidList = DistTableOidList(); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid distributedTableOid = InvalidOid; Oid citusTableId = InvalidOid;
foreach_oid(distributedTableOid, distributedTableOidList) foreach_oid(citusTableId, citusTableIdList)
{ {
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, distributedTableOid); ObjectAddressSet(tableAddress, RelationRelationId, citusTableId);
List *distributableDependencyObjectAddresses = List *distributableDependencyObjectAddresses =
GetDistributableDependenciesForObject(&tableAddress); GetDistributableDependenciesForObject(&tableAddress);

View File

@ -376,6 +376,11 @@ IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableT
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
} }
case ANY_CITUS_TABLE_TYPE:
{
return true;
}
default: default:
{ {
ereport(ERROR, (errmsg("Unknown table type %d", tableType))); ereport(ERROR, (errmsg("Unknown table type %d", tableType)));
@ -468,10 +473,10 @@ CitusTableList(void)
Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING)); Assert(CitusHasBeenLoaded() && CheckCitusVersion(WARNING));
/* first, we need to iterate over pg_dist_partition */ /* first, we need to iterate over pg_dist_partition */
List *distTableOidList = DistTableOidList(); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
foreach_oid(relationId, distTableOidList) foreach_oid(relationId, citusTableIdList)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
@ -3775,15 +3780,19 @@ InvalidateMetadataSystemCache(void)
/* /*
* DistTableOidList iterates over the pg_dist_partition table and returns * CitusTableTypeIdList function scans pg_dist_partition and returns a
* a list that consists of the logicalrelids. * list of OID's for the tables matching given citusTableType.
* To create the list, it performs sequential scan. Since it is not expected
* that this function will be called frequently, it is OK not to use index
* scan. If this function becomes performance bottleneck, it is possible to
* modify this function to perform index scan.
*/ */
List * List *
DistTableOidList(void) CitusTableTypeIdList(CitusTableType citusTableType)
{ {
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 0; int scanKeyCount = 0;
List *distTableOidList = NIL; List *relationIdList = NIL;
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock); Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
@ -3801,60 +3810,9 @@ DistTableOidList(void)
Anum_pg_dist_partition_logicalrelid, Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull); tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum); Oid relationId = DatumGetObjectId(relationIdDatum);
distTableOidList = lappend_oid(distTableOidList, relationId); if (IsCitusTableType(relationId, citusTableType))
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock);
return distTableOidList;
}
/*
* ReferenceTableOidList function scans pg_dist_partition to create a list of all
* reference tables. To create the list, it performs sequential scan. Since it is not
* expected that this function will be called frequently, it is OK not to use index scan.
* If this function becomes performance bottleneck, it is possible to modify this function
* to perform index scan.
*/
List *
ReferenceTableOidList()
{
ScanKeyData scanKey[1];
int scanKeyCount = 0;
List *referenceTableOidList = NIL;
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
InvalidOid, false,
NULL, scanKeyCount, scanKey);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
char partitionMethod = heap_getattr(heapTuple,
Anum_pg_dist_partition_partmethod,
tupleDescriptor, &isNull);
char replicationModel = heap_getattr(heapTuple,
Anum_pg_dist_partition_repmodel,
tupleDescriptor, &isNull);
if (IsReferenceTableByDistParams(partitionMethod, replicationModel))
{ {
Datum relationIdDatum = heap_getattr(heapTuple, relationIdList = lappend_oid(relationIdList, relationId);
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum);
referenceTableOidList = lappend_oid(referenceTableOidList, relationId);
} }
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
@ -3863,7 +3821,7 @@ ReferenceTableOidList()
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgDistPartition, AccessShareLock); table_close(pgDistPartition, AccessShareLock);
return referenceTableOidList; return relationIdList;
} }
@ -3874,7 +3832,7 @@ ReferenceTableOidList()
bool bool
ClusterHasReferenceTable(void) ClusterHasReferenceTable(void)
{ {
return list_length(ReferenceTableOidList()) > 0; return list_length(CitusTableTypeIdList(REFERENCE_TABLE)) > 0;
} }

View File

@ -111,7 +111,7 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
int colocationId = CreateReferenceTableColocationId(); int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock); LockColocationId(colocationId, ExclusiveLock);
List *referenceTableIdList = ReferenceTableOidList(); List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableIdList == NIL) if (referenceTableIdList == NIL)
{ {
/* no reference tables exist */ /* no reference tables exist */
@ -620,7 +620,7 @@ CreateReferenceTableColocationId()
void void
DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId) DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId)
{ {
List *referenceTableList = ReferenceTableOidList(); List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
List *referenceShardIntervalList = NIL; List *referenceShardIntervalList = NIL;
/* if there are no reference tables, we do not need to do anything */ /* if there are no reference tables, we do not need to do anything */
@ -714,7 +714,7 @@ ReferenceTableReplicationFactor(void)
void void
ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
{ {
List *referenceTableList = ReferenceTableOidList(); List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
/* if there is no reference table, we do not need to replicate anything */ /* if there is no reference table, we do not need to replicate anything */
if (list_length(referenceTableList) > 0) if (list_length(referenceTableList) > 0)

View File

@ -72,7 +72,7 @@ WarnIfSyncDNS(void)
bool bool
CollectBasicUsageStatistics(void) CollectBasicUsageStatistics(void)
{ {
List *distTableOids = NIL; List *citusTableIdList = NIL;
uint64 roundedDistTableCount = 0; uint64 roundedDistTableCount = 0;
uint64 roundedClusterSize = 0; uint64 roundedClusterSize = 0;
uint32 workerNodeCount = 0; uint32 workerNodeCount = 0;
@ -93,9 +93,9 @@ CollectBasicUsageStatistics(void)
PG_TRY(); PG_TRY();
{ {
distTableOids = DistTableOidList(); citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
roundedDistTableCount = NextPow2(list_length(distTableOids)); roundedDistTableCount = NextPow2(list_length(citusTableIdList));
roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); roundedClusterSize = NextPow2(DistributedTablesSize(citusTableIdList));
workerNodeCount = ActivePrimaryNonCoordinatorNodeCount(); workerNodeCount = ActivePrimaryNonCoordinatorNodeCount();
metadataJsonbDatum = DistNodeMetadata(); metadataJsonbDatum = DistNodeMetadata();
metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out,

View File

@ -129,7 +129,9 @@ typedef enum
CITUS_LOCAL_TABLE, CITUS_LOCAL_TABLE,
/* table without a dist key such as reference table */ /* table without a dist key such as reference table */
CITUS_TABLE_WITH_NO_DIST_KEY CITUS_TABLE_WITH_NO_DIST_KEY,
ANY_CITUS_TABLE_TYPE
} CitusTableType; } CitusTableType;
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
@ -151,8 +153,6 @@ extern CitusTableCacheEntry * LookupCitusTableCacheEntry(Oid relationId);
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32 extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
objsubid); objsubid);
extern int32 GetLocalGroupId(void); extern int32 GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern List * ReferenceTableOidList(void);
extern void CitusTableCacheFlushInvalidatedEntries(void); extern void CitusTableCacheFlushInvalidatedEntries(void);
extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok); extern Oid LookupShardRelationFromCatalog(int64 shardId, bool missing_ok);
extern List * ShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId);
@ -161,6 +161,7 @@ extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void InvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraph(void);
extern void FlushDistTableCache(void); extern void FlushDistTableCache(void);
extern void InvalidateMetadataSystemCache(void); extern void InvalidateMetadataSystemCache(void);
extern List * CitusTableTypeIdList(CitusTableType citusTableType);
extern Datum DistNodeMetadata(void); extern Datum DistNodeMetadata(void);
extern bool ClusterHasReferenceTable(void); extern bool ClusterHasReferenceTable(void);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray, extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,

View File

@ -1216,7 +1216,6 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality, cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum; sum = enriched.sum + excluded.sum;
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT DEBUG: performing repartitioned INSERT ... SELECT