mirror of https://github.com/citusdata/citus.git
Send keepalive messages in split decoder periodically to avoid wal receiver timeouts during large shard splits. (#7229)
DESCRIPTION: Send keepalive messages during the logical replication phase of large shard splits to avoid timeouts. During the logical replication part of the shard split process, split decoder filters out the wal records produced by the initial copy. If the number of wal records is big, then split decoder ends up processing for a long time before sending out any wal records through pgoutput. Hence the wal receiver may time out and restarts repeatedly causing our split driver code catch up logic to fail. Notes: 1. If the wal_receiver_timeout is set to a very small number e.g. 600ms, it may time out before receiving the keepalives. My tests show that this code works best when the` wal_receiver_timeout `is set to 1minute, which is the default value. 2. Once a logical replication worker time outs, a new one gets launched. The new logical replication worker sets the pg_stat_subscription columns to initial values. E.g. the latest_end_lsn is set to 0. Our driver logic in `WaitForGroupedLogicalRepTargetsToCatchUp` can not handle LSN value to go back. This is the main reason for it to get stuck in the infinite loop.pull/7154/head^2
parent
76fdfa3c0f
commit
e9035f6d32
|
@ -90,6 +90,46 @@ replication_origin_filter_cb(LogicalDecodingContext *ctx, RepOriginId origin_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* update_replication_progress is copied from Postgres 15. We use it to send keepalive
|
||||||
|
* messages when we are filtering out the wal changes resulting from the initial copy.
|
||||||
|
* If we do not send out messages long enough, wal reciever will time out.
|
||||||
|
* Postgres 16 has refactored this code such that keepalive messages are sent during
|
||||||
|
* reordering phase which is above change_cb. So we do not need to send keepalive in
|
||||||
|
* change_cb.
|
||||||
|
*/
|
||||||
|
#if (PG_VERSION_NUM < PG_VERSION_16)
|
||||||
|
static void
|
||||||
|
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
|
||||||
|
{
|
||||||
|
static int changes_count = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't want to try sending a keepalive message after processing each
|
||||||
|
* change as that can have overhead. Tests revealed that there is no
|
||||||
|
* noticeable overhead in doing it after continuously processing 100 or so
|
||||||
|
* changes.
|
||||||
|
*/
|
||||||
|
#define CHANGES_THRESHOLD 100
|
||||||
|
|
||||||
|
/*
|
||||||
|
* After continuously processing CHANGES_THRESHOLD changes, we
|
||||||
|
* try to send a keepalive message if required.
|
||||||
|
*/
|
||||||
|
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
|
||||||
|
{
|
||||||
|
#if (PG_VERSION_NUM >= PG_VERSION_15)
|
||||||
|
OutputPluginUpdateProgress(ctx, skipped_xact);
|
||||||
|
#else
|
||||||
|
OutputPluginUpdateProgress(ctx);
|
||||||
|
#endif
|
||||||
|
changes_count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* shard_split_change_cb function emits the incoming tuple change
|
* shard_split_change_cb function emits the incoming tuple change
|
||||||
* to the appropriate destination shard.
|
* to the appropriate destination shard.
|
||||||
|
@ -108,6 +148,12 @@ shard_split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM < PG_VERSION_16)
|
||||||
|
|
||||||
|
/* Send replication keepalive. */
|
||||||
|
update_replication_progress(ctx, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* check if the relation is publishable.*/
|
/* check if the relation is publishable.*/
|
||||||
if (!is_publishable_relation(relation))
|
if (!is_publishable_relation(relation))
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue