From c431bb2e45c284aac87d2252a86d833b748018c2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 31 Aug 2021 11:02:42 +0200 Subject: [PATCH] Add support for "COPY dist/ref tables FROM" progress report Simply call Postgres' function to report the progress on each row recieved. Note that we currently do not support "COPY dist/ref TO .." progress report nicely. Citus has some specialized logic to support "COPY dist/ref TO .." such that it either converts the underlying command into "COPY (SELECT * FROM dist/ref ) ..." or sends COPY command to shards directly. In the former case, "tuples_processed" is only updated when the executor returns all the tuples, so the progress is not accurate. In the latter case, Citus can actually implement the progress report. But, for the sake of consistency, we prefer to not implement at all. Added to PG 14 with https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=8a4f618e7ae3cb11b0b37d0f06f05c8ff905833f --- src/backend/distributed/commands/multi_copy.c | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index ed616780c..6e32de61e 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -50,6 +50,7 @@ #include "postgres.h" #include "libpq-fe.h" #include "miscadmin.h" +#include "pgstat.h" #include /* for htons */ #include /* for htons */ @@ -560,7 +561,11 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT dest->receiveSlot(tupleTableSlot, dest); - processedRowCount += 1; + ++processedRowCount; + +#if PG_VERSION_NUM >= PG_VERSION_14 + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processedRowCount); +#endif } EndCopyFrom(copyState); @@ -741,6 +746,10 @@ CopyToNewShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, O } processedRowCount += 1; + +#if PG_VERSION_NUM >= PG_VERSION_14 + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processedRowCount); +#endif } /* @@ -2984,6 +2993,13 @@ ProcessCopyStmt(CopyStmt *copyStatement, QueryCompletionCompat *completionTag, c { if (copyStatement->whereClause) { + /* + * Update progress reporting for tuples progressed so that the + * progress is reflected on pg_stat_progress_copy. Citus currently + * does not support COPY .. WHERE clause so TUPLES_EXCLUDED is not + * handled. When we remove this check, we should implement progress + * reporting as well. + */ ereport(ERROR, (errmsg( "Citus does not support COPY FROM with WHERE"))); } @@ -3148,6 +3164,7 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) PQclear(result); tuplesSent += ForwardCopyDataFromConnection(copyOutState, connection); + break; }