diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index a4ad3e094..1f8ba949e 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -88,30 +88,33 @@ recover_prepared_transactions(PG_FUNCTION_ARGS) void LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId outerXid) { - Datum values[Natts_pg_dist_transaction]; - bool isNulls[Natts_pg_dist_transaction]; - - /* form new transaction tuple */ - memset(values, 0, sizeof(values)); - memset(isNulls, false, sizeof(isNulls)); - - values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); - values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); - values[Anum_pg_dist_transaction_outerxid - 1] = FullTransactionIdGetDatum(outerXid); - - /* open transaction relation and insert new tuple */ + /* open transaction relation */ Relation pgDistTransaction = table_open(DistTransactionRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(pgDistTransaction); + + /* form new transaction tuple */ + Datum *values = (Datum *) palloc0(tupleDescriptor->natts * sizeof(Datum)); + bool *isNulls = (bool *) palloc0(tupleDescriptor->natts * sizeof(bool)); + + values[Anum_pg_dist_transaction_groupid - 1] = Int32GetDatum(groupId); + values[Anum_pg_dist_transaction_gid - 1] = CStringGetTextDatum(transactionName); + values[GetOuterXidAttrIndexInPgDistTransaction(tupleDescriptor)] = + FullTransactionIdGetDatum(outerXid); + HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls); + /* insert new tuple */ CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple); CommandCounterIncrement(); /* close relation and invalidate previous cache entry */ table_close(pgDistTransaction, NoLock); + + pfree(values); + pfree(isNulls); } @@ -309,9 +312,10 @@ RecoverWorkerTransactions(WorkerNode *workerNode, MultiConnection *connection) SearchSysCacheExistsAttName(DistTransactionRelationId(), "outer_xid")) { /* Check if the transaction is created by an outer transaction from a non-main database */ - outerXidDatum = heap_getattr(heapTuple, - Anum_pg_dist_transaction_outerxid, - tupleDescriptor, &outerXidIsNull); + outerXidDatum = + heap_getattr(heapTuple, + GetOuterXidAttrIndexInPgDistTransaction(tupleDescriptor) + 1, + tupleDescriptor, &outerXidIsNull); } else { @@ -665,3 +669,23 @@ DeleteWorkerTransactions(WorkerNode *workerNode) systable_endscan(scanDescriptor); table_close(pgDistTransaction, NoLock); } + + +/* + * GetOuterXidAttrIndexInPgDistTransaction returns attrnum for outer_xid attr. + * + * outer_xid attr was added to table pg_dist_transaction using alter operation after + * the version where Citus started supporting downgrades, and it's only column that we've + * introduced to pg_dist_transaction since then. + * + * And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than + * Natts_pg_dist_transaction and when this happens, then we know that attrnum outer_xid is + * not Anum_pg_dist_transaction_outerxid anymore but tupleDesc->natts - 1. + */ +int +GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc) +{ + return TupleDescSize(tupleDesc) == Natts_pg_dist_transaction + ? (Anum_pg_dist_transaction_outerxid - 1) + : tupleDesc->natts - 1; +} diff --git a/src/include/distributed/pg_dist_transaction.h b/src/include/distributed/pg_dist_transaction.h index 95658f782..50b8518dd 100644 --- a/src/include/distributed/pg_dist_transaction.h +++ b/src/include/distributed/pg_dist_transaction.h @@ -40,5 +40,7 @@ typedef FormData_pg_dist_transaction *Form_pg_dist_transaction; #define Anum_pg_dist_transaction_gid 2 #define Anum_pg_dist_transaction_outerxid 3 +extern int GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc); + #endif /* PG_DIST_TRANSACTION_H */