diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bdfccda23..ed616780c 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -67,6 +67,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "commands/progress.h" #include "distributed/citus_safe_lib.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" @@ -1806,24 +1807,8 @@ CreateEmptyShard(char *relationName) static void SendCopyBegin(CopyOutState cstate) { - if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) - { - /* new way */ - StringInfoData buf; - int natts = list_length(cstate->attnumlist); - int16 format = (cstate->binary ? 1 : 0); - int i; - - pq_beginmessage(&buf, 'H'); - pq_sendbyte(&buf, format); /* overall format */ - pq_sendint16(&buf, natts); - for (i = 0; i < natts; i++) - pq_sendint16(&buf, format); /* per-column formats */ - pq_endmessage(&buf); - cstate->copy_dest = COPY_NEW_FE; - } - else - { +#if PG_VERSION_NUM < PG_VERSION_14 + if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) { /* old way */ if (cstate->binary) ereport(ERROR, @@ -1833,7 +1818,21 @@ SendCopyBegin(CopyOutState cstate) /* grottiness needed for old COPY OUT protocol */ pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; + return; } +#endif + StringInfoData buf; + int natts = list_length(cstate->attnumlist); + int16 format = (cstate->binary ? 1 : 0); + int i; + + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, format); /* overall format */ + pq_sendint16(&buf, natts); + for (i = 0; i < natts; i++) + pq_sendint16(&buf, format); /* per-column formats */ + pq_endmessage(&buf); + cstate->copy_dest = COPY_FRONTEND; } @@ -1841,20 +1840,20 @@ SendCopyBegin(CopyOutState cstate) static void SendCopyEnd(CopyOutState cstate) { - if (cstate->copy_dest == COPY_NEW_FE) - { - /* Shouldn't have any unsent data */ - Assert(cstate->fe_msgbuf->len == 0); - /* Send Copy Done message */ - pq_putemptymessage('c'); - } - else +#if PG_VERSION_NUM < PG_VERSION_14 + if (cstate->copy_dest != COPY_NEW_FE) { CopySendData(cstate, "\\.", 2); /* Need to flush out the trailer (this also appends a newline) */ CopySendEndOfRow(cstate, true); pq_endcopyout(false); + return; } +#endif + /* Shouldn't have any unsent data */ + Assert(cstate->fe_msgbuf->len == 0); + /* Send Copy Done message */ + pq_putemptymessage('c'); } @@ -1908,6 +1907,7 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) switch (cstate->copy_dest) { +#if PG_VERSION_NUM < PG_VERSION_14 case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) @@ -1921,7 +1921,8 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) errmsg("connection lost during COPY to stdout"))); } break; - case COPY_NEW_FE: +#endif + case COPY_FRONTEND: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) CopySendChar(cstate, '\n'); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index a5f414208..4d1988347 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -31,8 +31,12 @@ typedef enum CitusCopyDest { COPY_FILE, /* to/from file (or a piped program) */ +#if PG_VERSION_NUM >= PG_VERSION_14 + COPY_FRONTEND, /* to frontend */ +#else COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ +#endif COPY_CALLBACK /* to/from callback function */ } CitusCopyDest; diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 1f97ff901..e5df7a264 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -54,6 +54,7 @@ standard_ProcessUtility(a, b, c, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) \ ProcessUtility(a, b, c, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_FRONTEND #else #define AlterTableStmtObjType(a) ((a)->relkind) #define F_NEXTVAL_COMPAT F_NEXTVAL_OID @@ -80,6 +81,7 @@ #define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \ standard_ProcessUtility(a, b, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_NEW_FE #endif #if PG_VERSION_NUM >= PG_VERSION_13