mirror of https://github.com/citusdata/citus.git
Update Progress
parent
c71faad606
commit
0be2e2f0bc
|
@ -65,6 +65,32 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
|
||||||
|
{
|
||||||
|
static int changes_count = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't want to try sending a keepalive message after processing each
|
||||||
|
* change as that can have overhead. Tests revealed that there is no
|
||||||
|
* noticeable overhead in doing it after continuously processing 100 or so
|
||||||
|
* changes.
|
||||||
|
*/
|
||||||
|
#define CHANGES_THRESHOLD 100
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we are at the end of transaction LSN, update progress tracking.
|
||||||
|
* Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
|
||||||
|
* try to send a keepalive message if required.
|
||||||
|
*/
|
||||||
|
if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
|
||||||
|
{
|
||||||
|
OutputPluginUpdateProgress(ctx, skipped_xact);
|
||||||
|
changes_count = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* split_change function emits the incoming tuple change
|
* split_change function emits the incoming tuple change
|
||||||
* to the appropriate destination shard.
|
* to the appropriate destination shard.
|
||||||
|
@ -73,6 +99,8 @@ static void
|
||||||
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
split_change_cb(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
Relation relation, ReorderBufferChange *change)
|
Relation relation, ReorderBufferChange *change)
|
||||||
{
|
{
|
||||||
|
update_replication_progress(ctx, false);
|
||||||
|
|
||||||
if (!is_publishable_relation(relation))
|
if (!is_publishable_relation(relation))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
|
Loading…
Reference in New Issue