diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a144ff193..4437450e4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -67,6 +67,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "commands/progress.h" #include "distributed/citus_safe_lib.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" @@ -1806,9 +1807,11 @@ CreateEmptyShard(char *relationName) static void SendCopyBegin(CopyOutState cstate) { +#if PG_VERSION_NUM < PG_VERSION_14 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ +#endif StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); @@ -1820,7 +1823,8 @@ SendCopyBegin(CopyOutState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_NEW_FE; + cstate->copy_dest = COPY_FRONTEND_COMPAT; +#if PG_VERSION_NUM < PG_VERSION_14 } else { @@ -1834,6 +1838,7 @@ SendCopyBegin(CopyOutState cstate) pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } +#endif } @@ -1841,12 +1846,15 @@ SendCopyBegin(CopyOutState cstate) static void SendCopyEnd(CopyOutState cstate) { +#if PG_VERSION_NUM < PG_VERSION_14 if (cstate->copy_dest == COPY_NEW_FE) { +#endif /* Shouldn't have any unsent data */ Assert(cstate->fe_msgbuf->len == 0); /* Send Copy Done message */ pq_putemptymessage('c'); +#if PG_VERSION_NUM < PG_VERSION_14 } else { @@ -1855,6 +1863,7 @@ SendCopyEnd(CopyOutState cstate) CopySendEndOfRow(cstate, true); pq_endcopyout(false); } +#endif } @@ -1908,6 +1917,7 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) switch (cstate->copy_dest) { +#if PG_VERSION_NUM < PG_VERSION_14 case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) @@ -1921,7 +1931,8 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) errmsg("connection lost during COPY to stdout"))); } break; - case COPY_NEW_FE: +#endif + case COPY_FRONTEND_COMPAT: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) CopySendChar(cstate, '\n'); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index a5f414208..4d1988347 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -31,8 +31,12 @@ typedef enum CitusCopyDest { COPY_FILE, /* to/from file (or a piped program) */ +#if PG_VERSION_NUM >= PG_VERSION_14 + COPY_FRONTEND, /* to frontend */ +#else COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ +#endif COPY_CALLBACK /* to/from callback function */ } CitusCopyDest; diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 1f97ff901..e5df7a264 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -54,6 +54,7 @@ standard_ProcessUtility(a, b, c, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) \ ProcessUtility(a, b, c, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_FRONTEND #else #define AlterTableStmtObjType(a) ((a)->relkind) #define F_NEXTVAL_COMPAT F_NEXTVAL_OID @@ -80,6 +81,7 @@ #define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \ standard_ProcessUtility(a, b, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_NEW_FE #endif #if PG_VERSION_NUM >= PG_VERSION_13