From 0be2e2f0bc7abb4312f425aec8be7ad154944faf Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Wed, 20 Sep 2023 04:55:23 +0300 Subject: [PATCH] Update Progress --- .../shardsplit/shardsplit_decoder.c | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 51a56b36e..21927774f 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -65,6 +65,32 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) } +static void +update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx, skipped_xact); + changes_count = 0; + } +} + + /* * split_change function emits the incoming tuple change * to the appropriate destination shard. @@ -73,6 +99,8 @@ static void split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { + update_replication_progress(ctx, false); + if (!is_publishable_relation(relation)) { return;