From b45a5fffc3d93ae9231db2382a2aa2b4a59a9579 Mon Sep 17 00:00:00 2001 From: EmelSimsek Date: Mon, 25 Sep 2023 16:14:46 +0300 Subject: [PATCH] Add keepalive messages --- .../shardsplit/shardsplit_decoder.c | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 1386a21b0..3c1fd5f1d 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -90,6 +90,40 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) } +/* + * update_replication_progress is copied from Postgres 15. We use it to send keepalive + * messages when we are filtering out the wal changes resulting from the initial copy. + * If we do not send out messages long enough, wal reciever will time out. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * After continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { +#if (PG_VERSION_NUM >= PG_VERSION_15) + OutputPluginUpdateProgress(ctx, skipped_xact); +#else + OutputPluginUpdateProgress(ctx); +#endif + changes_count = 0; + } +} + + /* * shard_split_change_cb function emits the incoming tuple change * to the appropriate destination shard. @@ -108,6 +142,13 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; } + /* + * Send replication keepalive. Keepalive is needed to prevent logical replication + * worker from timing out when decoder spends a long time filtering out + * change records without communicating with the wal receiver. + */ + update_replication_progress(ctx, false); + /* check if the relation is publishable.*/ if (!is_publishable_relation(relation)) {