Prevent cache usage on citus_drop_trigger codepaths

pull/5372/head
Hanefi Onaldi 2021-11-18 20:24:51 +03:00
parent e6160ad131
commit c0d43d4905
No known key found for this signature in database
GPG Key ID: F18CDB10BA0DFDC7
9 changed files with 137 additions and 14 deletions

View File

@ -74,7 +74,7 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
* user-friendly, but this function is really only meant to be called
* from the trigger.
*/
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
{
PG_RETURN_VOID();
}
@ -134,14 +134,14 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName
* user-friendly, but this function is really only meant to be called
* from the trigger.
*/
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
{
return;
}
EnsureCoordinator();
if (!ShouldSyncTableMetadata(relationId))
if (!ShouldSyncTableMetadataViaCatalog(relationId))
{
return;
}

View File

@ -207,7 +207,6 @@ static ScanKeyData DistObjectScanKey[3];
/* local function forward declarations */
static bool IsCitusTableViaCatalog(Oid relationId);
static HeapTuple PgDistPartitionTupleViaCatalog(Oid relationId);
static ShardIdCacheEntry * LookupShardIdCacheEntry(int64 shardId);
static CitusTableCacheEntry * BuildCitusTableCacheEntry(Oid relationId);
@ -484,7 +483,7 @@ IsCitusTable(Oid relationId)
* offset and the corresponding index. If we ever come close to changing
* that, we'll have to work a bit harder.
*/
static bool
bool
IsCitusTableViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
@ -538,6 +537,51 @@ PartitionMethodViaCatalog(Oid relationId)
}
/*
* PartitionColumnViaCatalog gets a relationId and returns the partition
* key column from pg_dist_partition via reading from catalog.
*/
Var *
PartitionColumnViaCatalog(Oid relationId)
{
HeapTuple partitionTuple = PgDistPartitionTupleViaCatalog(relationId);
if (!HeapTupleIsValid(partitionTuple))
{
return NULL;
}
Datum datumArray[Natts_pg_dist_partition];
bool isNullArray[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partkey - 1])
{
/* partition key cannot be NULL, still let's make sure */
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return NULL;
}
Datum partitionKeyDatum = datumArray[Anum_pg_dist_partition_partkey - 1];
char *partitionKeyString = TextDatumGetCString(partitionKeyDatum);
/* convert the string to a Node and ensure it is a Var */
Node *partitionNode = stringToNode(partitionKeyString);
Assert(IsA(partitionNode, Var));
Var *partitionColumn = (Var *) partitionNode;
heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock);
return partitionColumn;
}
/*
* PgDistPartitionTupleViaCatalog is a helper function that searches
* pg_dist_partition for the given relationId. The caller is responsible

View File

@ -88,6 +88,8 @@ static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static List * DetachPartitionCommandList(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey);
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
@ -380,15 +382,51 @@ ShouldSyncTableMetadata(Oid relationId)
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
if (IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) ||
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{
return true;
}
else
bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED);
bool citusTableWithNoDistKey =
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY);
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
}
/*
* ShouldSyncTableMetadataViaCatalog checks if the metadata of a distributed table should
* be propagated to metadata workers, i.e. the table is an MX table or reference table.
* Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
*
* ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads
* from catalog tables directly.
*/
bool
ShouldSyncTableMetadataViaCatalog(Oid relationId)
{
if (!OidIsValid(relationId) || !IsCitusTableViaCatalog(relationId))
{
return false;
}
char partitionMethod = PartitionMethodViaCatalog(relationId);
bool hashDistributed = partitionMethod == DISTRIBUTE_BY_HASH;
bool citusTableWithNoDistKey = partitionMethod == DISTRIBUTE_BY_NONE;
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
}
/*
* ShouldSyncTableMetadataInternal decides whether we should sync the metadata for a table
* based on whether it is a hash distributed table, or a citus table with no distribution
* key.
*
* This function is here to make sure that ShouldSyncTableMetadata and
* ShouldSyncTableMetadataViaCatalog behaves the same way.
*/
static bool
ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey)
{
return hashDistributed || citusTableWithNoDistKey;
}

View File

@ -1143,6 +1143,44 @@ LoadShardIntervalList(Oid relationId)
}
/*
* LoadUnsortedShardIntervalListViaCatalog returns a list of shard intervals related for a
* given distributed table. The function returns an empty list if no shards can be found
* for the given relation.
*
* This function does not use CitusTableCache and instead reads from catalog tables
* directly.
*/
List *
LoadUnsortedShardIntervalListViaCatalog(Oid relationId)
{
List *shardIntervalList = NIL;
List *distShardTuples = LookupDistShardTuples(relationId);
Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock);
TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation);
Oid intervalTypeId = InvalidOid;
int32 intervalTypeMod = -1;
char partitionMethod = PartitionMethodViaCatalog(relationId);
Var *partitionColumn = PartitionColumnViaCatalog(relationId);
GetIntervalTypeInfo(partitionMethod, partitionColumn, &intervalTypeId,
&intervalTypeMod);
HeapTuple distShardTuple = NULL;
foreach_ptr(distShardTuple, distShardTuples)
{
ShardInterval *interval = TupleToShardInterval(distShardTuple,
distShardTupleDesc,
intervalTypeId,
intervalTypeMod);
shardIntervalList = lappend(shardIntervalList, interval);
}
table_close(distShardRelation, AccessShareLock);
return shardIntervalList;
}
/*
* LoadShardIntervalWithLongestShardName is a utility function that returns
* the shard interaval with the largest shardId for the given relationId. Note

View File

@ -123,7 +123,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
* The SQL_DROP trigger calls this function even for tables that are
* not distributed. In that case, silently ignore and return -1.
*/
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation)
{
PG_RETURN_INT32(-1);
}
@ -139,7 +139,7 @@ citus_drop_all_shards(PG_FUNCTION_ARGS)
*/
LockRelationOid(relationId, AccessExclusiveLock);
List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardIntervalList = LoadUnsortedShardIntervalListViaCatalog(relationId);
int droppedShardCount = DropShards(relationId, schemaName, relationName,
shardIntervalList, dropShardsMetadataOnly);

View File

@ -144,9 +144,11 @@ extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
CitusTableType tableType);
extern bool IsCitusTable(Oid relationId);
extern bool IsCitusTableViaCatalog(Oid relationId);
extern char PgDistPartitionViaCatalog(Oid relationId);
extern List * LookupDistShardTuples(Oid relationId);
extern char PartitionMethodViaCatalog(Oid relationId);
extern Var * PartitionColumnViaCatalog(Oid relationId);
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
extern List * CitusTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);

View File

@ -31,6 +31,7 @@ typedef enum
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId);
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
extern List * MetadataCreateCommands(void);
extern List * MetadataDropCommands(void);
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);

View File

@ -202,6 +202,7 @@ extern Datum citus_relation_size(PG_FUNCTION_ARGS);
/* Function declarations to read shard and shard placement data */
extern uint32 TableShardReplicationFactor(Oid relationId);
extern List * LoadShardIntervalList(Oid relationId);
extern List * LoadUnsortedShardIntervalListViaCatalog(Oid relationId);
extern ShardInterval * LoadShardIntervalWithLongestShardName(Oid relationId);
extern int ShardIntervalCount(Oid relationId);
extern List * LoadShardList(Oid relationId);

View File

@ -14,4 +14,3 @@ SELECT create_distributed_table('range_dist_table_2', 'dist_col', 'range');
\set VERBOSITY TERSE
DROP SCHEMA issue_5099 CASCADE;
NOTICE: drop cascades to 2 other objects
ERROR: cache lookup failed for type 17048