diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index 9dfb8bc12..2beb27772 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -203,8 +203,7 @@ AddShardIdToHashTable(uint64 shardId, ShardIdHashEntry *entry) { entry->shardId = shardId; entry->distributedTableId = CdcLookupShardRelationFromCatalog(shardId, true); - entry->isReferenceTable = CdcPartitionMethodViaCatalog(entry->distributedTableId) == - 'n'; + entry->isReferenceTable = CdcIsReferenceTableViaCatalog(entry->distributedTableId); return entry->distributedTableId; } @@ -361,12 +360,14 @@ GetTupleForTargetSchemaForCdc(HeapTuple sourceRelationTuple, targetNulls[targetIndex] = true; targetIndex++; } + /* If this source attribute has been dropped, just skip this source attribute.*/ else if (TupleDescAttr(sourceRelDesc, sourceIndex)->attisdropped) { sourceIndex++; continue; } + /* If both source and target attributes are not dropped, add the attribute field to targetValues. */ else if (sourceIndex < sourceRelDesc->natts) { diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.c b/src/backend/distributed/cdc/cdc_decoder_utils.c index 272221a5f..a69f307ba 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.c +++ b/src/backend/distributed/cdc/cdc_decoder_utils.c @@ -331,16 +331,16 @@ CdcPgDistPartitionTupleViaCatalog(Oid relationId) /* - * CdcPartitionMethodViaCatalog gets a relationId and returns the partition - * method column from pg_dist_partition via reading from catalog. + * CdcIsReferenceTableViaCatalog gets a relationId and returns true if the relation + * is a reference table and false otherwise. */ char -CdcPartitionMethodViaCatalog(Oid relationId) +CdcIsReferenceTableViaCatalog(Oid relationId) { HeapTuple partitionTuple = CdcPgDistPartitionTupleViaCatalog(relationId); if (!HeapTupleIsValid(partitionTuple)) { - return DISTRIBUTE_BY_INVALID; + return false; } Datum datumArray[Natts_pg_dist_partition]; @@ -351,21 +351,32 @@ CdcPartitionMethodViaCatalog(Oid relationId) TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition); heap_deform_tuple(partitionTuple, tupleDescriptor, datumArray, isNullArray); - if (isNullArray[Anum_pg_dist_partition_partmethod - 1]) + if (isNullArray[Anum_pg_dist_partition_partmethod - 1] || + isNullArray[Anum_pg_dist_partition_repmodel - 1]) { - /* partition method cannot be NULL, still let's make sure */ + /* + * partition method and replication model cannot be NULL, + * still let's make sure + */ heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); - return DISTRIBUTE_BY_INVALID; + return false; } Datum partitionMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1]; char partitionMethodChar = DatumGetChar(partitionMethodDatum); + Datum replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1]; + char replicationModelChar = DatumGetChar(replicationModelDatum); + heap_freetuple(partitionTuple); table_close(pgDistPartition, NoLock); - return partitionMethodChar; + /* + * A table is a reference table when its partition method is 'none' + * and replication model is 'two phase commit' + */ + return partitionMethodChar == 'n' && replicationModelChar == 't'; } diff --git a/src/backend/distributed/cdc/cdc_decoder_utils.h b/src/backend/distributed/cdc/cdc_decoder_utils.h index d30500de4..46d1e4ae5 100644 --- a/src/backend/distributed/cdc/cdc_decoder_utils.h +++ b/src/backend/distributed/cdc/cdc_decoder_utils.h @@ -25,7 +25,7 @@ uint64 CdcExtractShardIdFromTableName(const char *tableName, bool missingOk); Oid CdcLookupShardRelationFromCatalog(int64 shardId, bool missingOk); -char CdcPartitionMethodViaCatalog(Oid relationId); +char CdcIsReferenceTableViaCatalog(Oid relationId); bool CdcCitusHasBeenLoaded(void);