mirror of https://github.com/citusdata/citus.git
Removes support for old protocols in Copy functions from PG14
Some Copy related functions copied from Postgres had support for both old and new protocols Postgres removed support for old version so we remove it too Relevant PG commit: 3174d69fb96a66173224e60ec7053b988d5ed4d9pull/5209/head
parent
82858ca8fe
commit
1d5053b652
|
@ -67,6 +67,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "commands/copy.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/progress.h"
|
||||
#include "distributed/citus_safe_lib.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
|
@ -1806,24 +1807,8 @@ CreateEmptyShard(char *relationName)
|
|||
static void
|
||||
SendCopyBegin(CopyOutState cstate)
|
||||
{
|
||||
if (PG_PROTOCOL_MAJOR(FrontendProtocol) >= 3)
|
||||
{
|
||||
/* new way */
|
||||
StringInfoData buf;
|
||||
int natts = list_length(cstate->attnumlist);
|
||||
int16 format = (cstate->binary ? 1 : 0);
|
||||
int i;
|
||||
|
||||
pq_beginmessage(&buf, 'H');
|
||||
pq_sendbyte(&buf, format); /* overall format */
|
||||
pq_sendint16(&buf, natts);
|
||||
for (i = 0; i < natts; i++)
|
||||
pq_sendint16(&buf, format); /* per-column formats */
|
||||
pq_endmessage(&buf);
|
||||
cstate->copy_dest = COPY_NEW_FE;
|
||||
}
|
||||
else
|
||||
{
|
||||
#if PG_VERSION_NUM < PG_VERSION_14
|
||||
if (PG_PROTOCOL_MAJOR(FrontendProtocol) < 3) {
|
||||
/* old way */
|
||||
if (cstate->binary)
|
||||
ereport(ERROR,
|
||||
|
@ -1833,7 +1818,21 @@ SendCopyBegin(CopyOutState cstate)
|
|||
/* grottiness needed for old COPY OUT protocol */
|
||||
pq_startcopyout();
|
||||
cstate->copy_dest = COPY_OLD_FE;
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
StringInfoData buf;
|
||||
int natts = list_length(cstate->attnumlist);
|
||||
int16 format = (cstate->binary ? 1 : 0);
|
||||
int i;
|
||||
|
||||
pq_beginmessage(&buf, 'H');
|
||||
pq_sendbyte(&buf, format); /* overall format */
|
||||
pq_sendint16(&buf, natts);
|
||||
for (i = 0; i < natts; i++)
|
||||
pq_sendint16(&buf, format); /* per-column formats */
|
||||
pq_endmessage(&buf);
|
||||
cstate->copy_dest = COPY_FRONTEND;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1841,20 +1840,20 @@ SendCopyBegin(CopyOutState cstate)
|
|||
static void
|
||||
SendCopyEnd(CopyOutState cstate)
|
||||
{
|
||||
if (cstate->copy_dest == COPY_NEW_FE)
|
||||
{
|
||||
/* Shouldn't have any unsent data */
|
||||
Assert(cstate->fe_msgbuf->len == 0);
|
||||
/* Send Copy Done message */
|
||||
pq_putemptymessage('c');
|
||||
}
|
||||
else
|
||||
#if PG_VERSION_NUM < PG_VERSION_14
|
||||
if (cstate->copy_dest != COPY_NEW_FE)
|
||||
{
|
||||
CopySendData(cstate, "\\.", 2);
|
||||
/* Need to flush out the trailer (this also appends a newline) */
|
||||
CopySendEndOfRow(cstate, true);
|
||||
pq_endcopyout(false);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
/* Shouldn't have any unsent data */
|
||||
Assert(cstate->fe_msgbuf->len == 0);
|
||||
/* Send Copy Done message */
|
||||
pq_putemptymessage('c');
|
||||
}
|
||||
|
||||
|
||||
|
@ -1908,6 +1907,7 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
|
|||
|
||||
switch (cstate->copy_dest)
|
||||
{
|
||||
#if PG_VERSION_NUM < PG_VERSION_14
|
||||
case COPY_OLD_FE:
|
||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||
if (!cstate->binary && includeEndOfLine)
|
||||
|
@ -1921,7 +1921,8 @@ CopySendEndOfRow(CopyOutState cstate, bool includeEndOfLine)
|
|||
errmsg("connection lost during COPY to stdout")));
|
||||
}
|
||||
break;
|
||||
case COPY_NEW_FE:
|
||||
#endif
|
||||
case COPY_FRONTEND:
|
||||
/* The FE/BE protocol uses \n as newline for all platforms */
|
||||
if (!cstate->binary && includeEndOfLine)
|
||||
CopySendChar(cstate, '\n');
|
||||
|
|
|
@ -31,8 +31,12 @@
|
|||
typedef enum CitusCopyDest
|
||||
{
|
||||
COPY_FILE, /* to/from file (or a piped program) */
|
||||
#if PG_VERSION_NUM >= PG_VERSION_14
|
||||
COPY_FRONTEND, /* to frontend */
|
||||
#else
|
||||
COPY_OLD_FE, /* to/from frontend (2.0 protocol) */
|
||||
COPY_NEW_FE, /* to/from frontend (3.0 protocol) */
|
||||
#endif
|
||||
COPY_CALLBACK /* to/from callback function */
|
||||
} CitusCopyDest;
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@
|
|||
standard_ProcessUtility(a, b, c, d, e, f, g, h)
|
||||
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
||||
ProcessUtility(a, b, c, d, e, f, g, h)
|
||||
#define COPY_FRONTEND_COMPAT COPY_FRONTEND
|
||||
#else
|
||||
#define AlterTableStmtObjType(a) ((a)->relkind)
|
||||
#define F_NEXTVAL_COMPAT F_NEXTVAL_OID
|
||||
|
@ -80,6 +81,7 @@
|
|||
#define standard_ProcessUtility_compat(a, b, c, d, e, f, g, h) \
|
||||
standard_ProcessUtility(a, b, d, e, f, g, h)
|
||||
#define ProcessUtility_compat(a, b, c, d, e, f, g, h) ProcessUtility(a, b, d, e, f, g, h)
|
||||
#define COPY_FRONTEND_COMPAT COPY_NEW_FE
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
|
Loading…
Reference in New Issue