diff --git a/src/backend/distributed/shardsplit/shardsplit_decoder.c b/src/backend/distributed/shardsplit/shardsplit_decoder.c index 1386a21b0..7145b4dfa 100644 --- a/src/backend/distributed/shardsplit/shardsplit_decoder.c +++ b/src/backend/distributed/shardsplit/shardsplit_decoder.c @@ -90,6 +90,46 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id) } +/* + * update_replication_progress is copied from Postgres 15. We use it to send keepalive + * messages when we are filtering out the wal changes resulting from the initial copy. + * If we do not send out messages long enough, wal reciever will time out. + * Postgres 16 has refactored this code such that keepalive messages are sent during + * reordering phase which is above change_cb. So we do not need to send keepalive in + * change_cb. + */ +#if (PG_VERSION_NUM < PG_VERSION_16) +static void +update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * After continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { +#if (PG_VERSION_NUM >= PG_VERSION_15) + OutputPluginUpdateProgress(ctx, skipped_xact); +#else + OutputPluginUpdateProgress(ctx); +#endif + changes_count = 0; + } +} + + +#endif + /* * shard_split_change_cb function emits the incoming tuple change * to the appropriate destination shard. @@ -108,6 +148,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, return; } +#if (PG_VERSION_NUM < PG_VERSION_16) + + /* Send replication keepalive. */ + update_replication_progress(ctx, false); +#endif + /* check if the relation is publishable.*/ if (!is_publishable_relation(relation)) {