mirror of https://github.com/citusdata/citus.git
Removes support for old protocols in Copy functions from PG14
Some Copy related functions copied from Postgres had support for both old and new protocols Postgres removed support for old version so we remove it too Relevant PG commit: 3174d69fb96a66173224e60ec7053b988d5ed4d9talha_pg14_support
parent
7fe3e96850
commit
812dd1be1c
|
@ -67,6 +67,7 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
#include "commands/progress.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
|
@ -1806,9 +1807,11 @@ CreateEmptyShard(char *relationName)
|
||||||
static void
|
static void
|
||||||
SendCopyBegin(CopyOutState cstate)
|
SendCopyBegin(CopyOutState cstate)
|
||||||
{
|
{
|
||||||
|
#if PG_VERSION_NUM < PG_VERSION_14
|
||||||
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
|
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
|
||||||
{
|
{
|
||||||
/* new way */
|
/* new way */
|
||||||
|
#endif
|
||||||
StringInfoData buf;
|
StringInfoData buf;
|
||||||
int natts = list_length(cstate->attnumlist);
|
int natts = list_length(cstate->attnumlist);
|
||||||
int16 format = (cstate->binary ? 1 : 0);
|
int16 format = (cstate->binary ? 1 : 0);
|
||||||
|
@ -1820,7 +1823,8 @@ SendCopyBegin(CopyOutState cstate)
|
||||||
for (i = 0; i < natts; i++)
|
for (i = 0; i < natts; i++)
|
||||||
pq_sendint16(&buf, format); /* per-column formats */
|
pq_sendint16(&buf, format); /* per-column formats */
|
||||||
pq_endmessage(&buf);
|
pq_endmessage(&buf);
|
||||||
cstate->copy_dest = COPY_NEW_FE;
|
cstate->copy_dest = COPY_FRONTEND_COMPAT;
|
||||||
|
#if PG_VERSION_NUM < PG_VERSION_14
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1834,6 +1838,7 @@ SendCopyBegin(CopyOutState cstate)
|
||||||
pq_startcopyout();
|
pq_startcopyout();
|
||||||
cstate->copy_dest = COPY_OLD_FE;
|
cstate->copy_dest = COPY_OLD_FE;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1841,12 +1846,15 @@ SendCopyBegin(CopyOutState cstate)
|
||||||
static void
|
static void
|
||||||
SendCopyEnd(CopyOutState cstate)
|
SendCopyEnd(CopyOutState cstate)
|
||||||
{
|
{
|
||||||
|
#if PG_VERSION_NUM < PG_VERSION_14
|
||||||
if (cstate->copy_dest == COPY_NEW_FE)
|
if (cstate->copy_dest == COPY_NEW_FE)
|
||||||
{
|
{
|
||||||
|
#endif
|
||||||
/* Shouldn't have any unsent data */
|
/* Shouldn't have any unsent data */
|
||||||
Assert(cstate->fe_msgbuf->len == 0);
|
Assert(cstate->fe_msgbuf->len == 0);
|
||||||
/* Send Copy Done message */
|
/* Send Copy Done message */
|
||||||
pq_putemptymessage('c');
|
pq_putemptymessage('c');
|
||||||
|
#if PG_VERSION_NUM < PG_VERSION_14
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
@ -1855,6 +1863,7 @@ SendCopyEnd(CopyOutState cstate)
|
||||||
CopySendEndOfRow(cstate, true);
|
CopySendEndOfRow(cstate, true);
|
||||||
pq_endcopyout(false);
|
pq_endcopyout(false);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1908,6 +1917,7 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
|
||||||
|
|
||||||
switch (cstate->copy_dest)
|
switch (cstate->copy_dest)
|
||||||
{
|
{
|
||||||
|
#if PG_VERSION_NUM < PG_VERSION_14
|
||||||
case COPY_OLD_FE:
|
case COPY_OLD_FE:
|
||||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||||
if (!cstate->binary && includeEndOfLine)
|
if (!cstate->binary && includeEndOfLine)
|
||||||
|
@ -1921,7 +1931,8 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
|
||||||
errmsg("connection lost during COPY to stdout")));
|
errmsg("connection lost during COPY to stdout")));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case COPY_NEW_FE:
|
#endif
|
||||||
|
case COPY_FRONTEND_COMPAT:
|
||||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||||
if (!cstate->binary && includeEndOfLine)
|
if (!cstate->binary && includeEndOfLine)
|
||||||
CopySendChar(cstate, '\n');
|
CopySendChar(cstate, '\n');
|
||||||
|
|
|
@ -31,8 +31,12 @@
|
||||||
typedef enum CitusCopyDest
|
typedef enum CitusCopyDest
|
||||||
{
|
{
|
||||||
COPY_FILE, /* to/from file (or a piped program) */
|
COPY_FILE, /* to/from file (or a piped program) */
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_14
|
||||||
|
COPY_FRONTEND, /* to frontend */
|
||||||
|
#else
|
||||||
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
|
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
|
||||||
COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
|
COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
|
||||||
|
#endif
|
||||||
COPY_CALLBACK /* to/from callback function */
|
COPY_CALLBACK /* to/from callback function */
|
||||||
} CitusCopyDest;
|
} CitusCopyDest;
|
||||||
|
|
||||||
|
|
|
@ -54,6 +54,7 @@
|
||||||
standard_ProcessUtility(a, b, c, d, e, f, g, h)
|
standard_ProcessUtility(a, b, c, d, e, f, g, h)
|
||||||
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
||||||
ProcessUtility(a, b, c, d, e, f, g, h)
|
ProcessUtility(a, b, c, d, e, f, g, h)
|
||||||
|
#define COPY_FRONTEND_COMPAT COPY_FRONTEND
|
||||||
#else
|
#else
|
||||||
#define AlterTableStmtObjType(a) ((a)->relkind)
|
#define AlterTableStmtObjType(a) ((a)->relkind)
|
||||||
#define F_NEXTVAL_COMPAT F_NEXTVAL_OID
|
#define F_NEXTVAL_COMPAT F_NEXTVAL_OID
|
||||||
|
@ -80,6 +81,7 @@
|
||||||
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
||||||
standard_ProcessUtility(a, b, d, e, f, g, h)
|
standard_ProcessUtility(a, b, d, e, f, g, h)
|
||||||
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h)
|
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h)
|
||||||
|
#define COPY_FRONTEND_COMPAT COPY_NEW_FE
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||||
|
|
Loading…
Reference in New Issue