Add support for "COPY dist/ref tables FROM" progress report

Simply call Postgres' function to report the progress on
each row recieved.

Note that we currently do not support "COPY dist/ref TO .." progress
report nicely. Citus has some specialized logic to support
"COPY dist/ref TO .." such that it either converts the underlying
command into "COPY (SELECT * FROM dist/ref ) ..." or sends COPY
command to shards directly. In the former case, "tuples_processed"
is only updated when the executor returns all the tuples, so the
progress is not accurate. In the latter case, Citus can actually
implement the progress report. But, for the sake of consistency,
we prefer to not implement at all.

Added to PG 14 with https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=8a4f618e7ae3cb11b0b37d0f06f05c8ff905833f
pull/5209/head
Onder Kalaci 2021-08-31 11:02:42 +02:00 committed by Sait Talha Nisanci
parent 66303785f3
commit c431bb2e45
1 changed files with 18 additions and 1 deletions

View File

@ -50,6 +50,7 @@
#include "postgres.h" #include "postgres.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "pgstat.h"
#include <arpa/inet.h> /* for htons */ #include <arpa/inet.h> /* for htons */
#include <netinet/in.h> /* for htons */ #include <netinet/in.h> /* for htons */
@ -560,7 +561,11 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
dest->receiveSlot(tupleTableSlot, dest); dest->receiveSlot(tupleTableSlot, dest);
processedRowCount += 1; ++processedRowCount;
#if PG_VERSION_NUM >= PG_VERSION_14
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processedRowCount);
#endif
} }
EndCopyFrom(copyState); EndCopyFrom(copyState);
@ -741,6 +746,10 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O
} }
processedRowCount += 1; processedRowCount += 1;
#if PG_VERSION_NUM >= PG_VERSION_14
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processedRowCount);
#endif
} }
/* /*
@ -2984,6 +2993,13 @@ ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, c
{ {
if (copyStatement->whereClause) if (copyStatement->whereClause)
{ {
/*
* Update progress reporting for tuples progressed so that the
* progress is reflected on pg_stat_progress_copy. Citus currently
* does not support COPY .. WHERE clause so TUPLES_EXCLUDED is not
* handled. When we remove this check, we should implement progress
* reporting as well.
*/
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Citus does not support COPY FROM with WHERE"))); "Citus does not support COPY FROM with WHERE")));
} }
@ -3148,6 +3164,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
PQclear(result); PQclear(result);
tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection); tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection);
break; break;
} }