From 20763398431394eeccb573eb11f4d76d5af17895 Mon Sep 17 00:00:00 2001 From: naisila Date: Mon, 8 Jul 2024 22:19:23 +0200 Subject: [PATCH] Remove ReorderBufferTupleBuf structure. Relevant PG commit: 08e6344fd6423210b339e92c069bb979ba4e7cd6 https://github.com/postgres/postgres/commit/08e6344fd6423210b339e92c069bb979ba4e7cd6 --- src/backend/distributed/cdc/cdc_decoder.c | 71 ++++++++++++++ .../shardsplit/shardsplit_decoder.c | 97 +++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/src/backend/distributed/cdc/cdc_decoder.c b/src/backend/distributed/cdc/cdc_decoder.c index cf9f4963b..1e71a82a1 100644 --- a/src/backend/distributed/cdc/cdc_decoder.c +++ b/src/backend/distributed/cdc/cdc_decoder.c @@ -22,6 +22,8 @@ #include "utils/rel.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + PG_MODULE_MAGIC; extern void _PG_output_plugin_init(OutputPluginCallbacks *cb); @@ -435,6 +437,74 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation return; } +#if PG_VERSION_NUM >= PG_VERSION_17 + + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + /* For insert action, only new tuple should always be translated*/ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + /* + * For update changes both old and new tuples need to be translated for target relation + * if the REPLICA IDENTITY is set to FULL. Otherwise, only the new tuple needs to be + * translated for target relation. + */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + /* For update action, new tuple should always be translated*/ + /* Get the new tuple from the ReorderBufferChange, and translate it to target relation. */ + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchemaForCdc( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + /* For delete action, only old tuple should be translated*/ + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchemaForCdc( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + default: + { + /* Do nothing for other action types. */ + break; + } + } +#else + /* Check the ReorderBufferChange's action type and handle them accordingly.*/ switch (change->action) { @@ -499,4 +569,5 @@ TranslateChangesIfSchemaChanged(Relation sourceRelation, Relation targetRelation break; } } +#endif } diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 0c3c76510..20dd01b0c 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -14,6 +14,8 @@ #include "utils/lsyscache.h" #include "utils/typcache.h" +#include "pg_version_constants.h" + #include "distributed/listutils.h" #include "distributed/metadata/distobject.h" #include "distributed/shardinterval_utils.h" @@ -180,6 +182,43 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } Oid targetRelationOid = InvalidOid; + +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + /* updating non-partition column value */ + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple newTuple = change->data.tp.newtuple; + targetRelationOid = FindTargetRelationOid(relation, newTuple, + replicationSlotName); + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple oldTuple = change->data.tp.oldtuple; + targetRelationOid = FindTargetRelationOid(relation, oldTuple, + replicationSlotName); + + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -214,6 +253,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif /* Current replication slot is not responsible for handling the change */ if (targetRelationOid == InvalidOid) @@ -231,6 +271,62 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc targetRelationDesc = RelationGetDescr(targetRelation); if (sourceRelationDesc->natts > targetRelationDesc->natts) { +#if PG_VERSION_NUM >= PG_VERSION_17 + switch (change->action) + { + case REORDER_BUFFER_CHANGE_INSERT: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + break; + } + + case REORDER_BUFFER_CHANGE_UPDATE: + { + HeapTuple sourceRelationNewTuple = change->data.tp.newtuple; + HeapTuple targetRelationNewTuple = GetTupleForTargetSchema( + sourceRelationNewTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.newtuple = targetRelationNewTuple; + + /* + * Format oldtuple according to the target relation. If the column values of replica + * identiy change, then the old tuple is non-null and needs to be formatted according + * to the target relation schema. + */ + if (change->data.tp.oldtuple != NULL) + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, + sourceRelationDesc, + targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + } + break; + } + + case REORDER_BUFFER_CHANGE_DELETE: + { + HeapTuple sourceRelationOldTuple = change->data.tp.oldtuple; + HeapTuple targetRelationOldTuple = GetTupleForTargetSchema( + sourceRelationOldTuple, sourceRelationDesc, targetRelationDesc); + + change->data.tp.oldtuple = targetRelationOldTuple; + break; + } + + /* Only INSERT/DELETE/UPDATE actions are visible in the replication path of split shard */ + default: + ereport(ERROR, errmsg( + "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", + change->action)); + } +#else switch (change->action) { case REORDER_BUFFER_CHANGE_INSERT: @@ -285,6 +381,7 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, "Unexpected Action :%d. Expected action is INSERT/DELETE/UPDATE", change->action)); } +#endif } pgOutputPluginChangeCB(ctx, txn, targetRelation, change);