Fix Reference Table Check for CDC (#7025)

Previously reference table check only looked at `partition method =
'n'`. This PR adds `replication model = 't'` to that.
flaky-multi_sequence_default
Halil Ozan Akgül 2023-06-23 16:37:35 +03:00 committed by GitHub
parent 387b5f80f9
commit 03a4769c3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 23 additions and 11 deletions

View File

@ -203,8 +203,7 @@ AddShardIdToHashTable(uint64 shardId, ShardIdHashEntry *entry)
{ {
entry->shardId = shardId; entry->shardId = shardId;
entry->distributedTableId = CdcLookupShardRelationFromCatalog(shardId, true); entry->distributedTableId = CdcLookupShardRelationFromCatalog(shardId, true);
entry->isReferenceTable = CdcPartitionMethodViaCatalog(entry->distributedTableId) == entry->isReferenceTable = CdcIsReferenceTableViaCatalog(entry->distributedTableId);
'n';
return entry->distributedTableId; return entry->distributedTableId;
} }
@ -361,12 +360,14 @@ GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple,
targetNulls[targetIndex] = true; targetNulls[targetIndex] = true;
targetIndex++; targetIndex++;
} }
/* If this source attribute has been dropped, just skip this source attribute.*/ /* If this source attribute has been dropped, just skip this source attribute.*/
else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped) else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped)
{ {
sourceIndex++; sourceIndex++;
continue; continue;
} }
/* If both source and target attributes are not dropped, add the attribute field to targetValues. */ /* If both source and target attributes are not dropped, add the attribute field to targetValues. */
else if (sourceIndex < sourceRelDesc->natts) else if (sourceIndex < sourceRelDesc->natts)
{ {

View File

@ -331,16 +331,16 @@ CdcPgDistPartitionTupleViaCatalog(Oid relationId)
/* /*
* CdcPartitionMethodViaCatalog gets a relationId and returns the partition * CdcIsReferenceTableViaCatalog gets a relationId and returns true if the relation
* method column from pg_dist_partition via reading from catalog. * is a reference table and false otherwise.
*/ */
char char
CdcPartitionMethodViaCatalog(Oid relationId) CdcIsReferenceTableViaCatalog(Oid relationId)
{ {
HeapTuple partitionTuple = CdcPgDistPartitionTupleViaCatalog(relationId); HeapTuple partitionTuple = CdcPgDistPartitionTupleViaCatalog(relationId);
if (!HeapTupleIsValid(partitionTuple)) if (!HeapTupleIsValid(partitionTuple))
{ {
return DISTRIBUTE_BY_INVALID; return false;
} }
Datum datumArray[Natts_pg_dist_partition]; Datum datumArray[Natts_pg_dist_partition];
@ -351,21 +351,32 @@ CdcPartitionMethodViaCatalog(Oid relationId)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray);
if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) if (isNullArray[Anum_pg_dist_partition_partmethod - 1] ||
isNullArray[Anum_pg_dist_partition_repmodel - 1])
{ {
/* partition method cannot be NULL, still let's make sure */ /*
* partition method and replication model cannot be NULL,
* still let's make sure
*/
heap_freetuple(partitionTuple); heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock); table_close(pgDistPartition, NoLock);
return DISTRIBUTE_BY_INVALID; return false;
} }
Datum partitionMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; Datum partitionMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
char partitionMethodChar = DatumGetChar(partitionMethodDatum); char partitionMethodChar = DatumGetChar(partitionMethodDatum);
Datum replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1];
char replicationModelChar = DatumGetChar(replicationModelDatum);
heap_freetuple(partitionTuple); heap_freetuple(partitionTuple);
table_close(pgDistPartition, NoLock); table_close(pgDistPartition, NoLock);
return partitionMethodChar; /*
* A table is a reference table when its partition method is 'none'
* and replication model is 'two phase commit'
*/
return partitionMethodChar == 'n' && replicationModelChar == 't';
} }

View File

@ -25,7 +25,7 @@ uint64 CdcExtractShardIdFromTableName(const char *tableName, bool missingOk);
Oid CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk); Oid CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk);
char CdcPartitionMethodViaCatalog(Oid relationId); char CdcIsReferenceTableViaCatalog(Oid relationId);
bool CdcCitusHasBeenLoaded(void); bool CdcCitusHasBeenLoaded(void);