From d7daea153391279f7b86bf500116e2eb1bce45f7 Mon Sep 17 00:00:00 2001 From: Halil Ozan Akgul Date: Sat, 14 Aug 2021 12:22:40 +0300 Subject: [PATCH] Removes support for old protocols in Copy functions from PG14 Some Copy related functions copied from Postgres had support for both old and new protocols Postgres removed support for old version so we remove it too Relevant PG commit: 3174d69fb96a66173224e60ec7053b988d5ed4d9 --- src/backend/distributed/commands/multi_copy.c | 15 +++++++++++++-- src/include/distributed/commands/multi_copy.h | 4 ++++ src/include/distributed/version_compat.h | 2 ++ 3 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index a144ff193..4437450e4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -67,6 +67,7 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "commands/progress.h" #include "distributed/citus_safe_lib.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" @@ -1806,9 +1807,11 @@ CreateEmptyShard(char *relationName) static void SendCopyBegin(CopyOutState cstate) { +#if PG_VERSION_NUM < PG_VERSION_14 if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3) { /* new way */ +#endif StringInfoData buf; int natts = list_length(cstate->attnumlist); int16 format = (cstate->binary ? 1 : 0); @@ -1820,7 +1823,8 @@ SendCopyBegin(CopyOutState cstate) for (i = 0; i < natts; i++) pq_sendint16(&buf, format); /* per-column formats */ pq_endmessage(&buf); - cstate->copy_dest = COPY_NEW_FE; + cstate->copy_dest = COPY_FRONTEND_COMPAT; +#if PG_VERSION_NUM < PG_VERSION_14 } else { @@ -1834,6 +1838,7 @@ SendCopyBegin(CopyOutState cstate) pq_startcopyout(); cstate->copy_dest = COPY_OLD_FE; } +#endif } @@ -1841,12 +1846,15 @@ SendCopyBegin(CopyOutState cstate) static void SendCopyEnd(CopyOutState cstate) { +#if PG_VERSION_NUM < PG_VERSION_14 if (cstate->copy_dest == COPY_NEW_FE) { +#endif /* Shouldn't have any unsent data */ Assert(cstate->fe_msgbuf->len == 0); /* Send Copy Done message */ pq_putemptymessage('c'); +#if PG_VERSION_NUM < PG_VERSION_14 } else { @@ -1855,6 +1863,7 @@ SendCopyEnd(CopyOutState cstate) CopySendEndOfRow(cstate, true); pq_endcopyout(false); } +#endif } @@ -1908,6 +1917,7 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) switch (cstate->copy_dest) { +#if PG_VERSION_NUM < PG_VERSION_14 case COPY_OLD_FE: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) @@ -1921,7 +1931,8 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine) errmsg("connection lost during COPY to stdout"))); } break; - case COPY_NEW_FE: +#endif + case COPY_FRONTEND_COMPAT: /* The FE/BE protocol uses \n as newline for all platforms */ if (!cstate->binary && includeEndOfLine) CopySendChar(cstate, '\n'); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index a5f414208..4d1988347 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -31,8 +31,12 @@ typedef enum CitusCopyDest { COPY_FILE, /* to/from file (or a piped program) */ +#if PG_VERSION_NUM >= PG_VERSION_14 + COPY_FRONTEND, /* to frontend */ +#else COPY_OLD_FE, /* to/from frontend (2.0 protocol) */ COPY_NEW_FE, /* to/from frontend (3.0 protocol) */ +#endif COPY_CALLBACK /* to/from callback function */ } CitusCopyDest; diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index 1f97ff901..e5df7a264 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -54,6 +54,7 @@ standard_ProcessUtility(a, b, c, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) \ ProcessUtility(a, b, c, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_FRONTEND #else #define AlterTableStmtObjType(a) ((a)->relkind) #define F_NEXTVAL_COMPAT F_NEXTVAL_OID @@ -80,6 +81,7 @@ #define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \ standard_ProcessUtility(a, b, d, e, f, g, h) #define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h) +#define COPY_FRONTEND_COMPAT COPY_NEW_FE #endif #if PG_VERSION_NUM >= PG_VERSION_13