Add keepalive messages

pull/7229/head
EmelSimsek 2023-09-25 16:14:46 +03:00 committed by Emel Şimşek
parent 76fdfa3c0f
commit b45a5fffc3
1 changed files with 41 additions and 0 deletions

View File

@ -90,6 +90,40 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
}
/*
* update_replication_progress is copied from Postgres 15. We use it to send keepalive
* messages when we are filtering out the wal changes resulting from the initial copy.
* If we do not send out messages long enough, wal reciever will time out.
*/
static void
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
{
static int changes_count = 0;
/*
* We don't want to try sending a keepalive message after processing each
* change as that can have overhead. Tests revealed that there is no
* noticeable overhead in doing it after continuously processing 100 or so
* changes.
*/
#define CHANGES_THRESHOLD 100
/*
* After continuously processing CHANGES_THRESHOLD changes, we
* try to send a keepalive message if required.
*/
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
{
#if (PG_VERSION_NUM >= PG_VERSION_15)
OutputPluginUpdateProgress(ctx, skipped_xact);
#else
OutputPluginUpdateProgress(ctx);
#endif
changes_count = 0;
}
}
/*
* shard_split_change_cb function emits the incoming tuple change
* to the appropriate destination shard.
@ -108,6 +142,13 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
return;
}
/*
* Send replication keepalive. Keepalive is needed to prevent logical replication
* worker from timing out when decoder spends a long time filtering out
* change records without communicating with the wal receiver.
*/
update_replication_progress(ctx, false);
/* check if the relation is publishable.*/
if (!is_publishable_relation(relation))
{