mirror of https://github.com/citusdata/citus.git
Handle BeginCopyFrom changes
BeginCopyFrom now accepts a ParseState and callback argument, but it's again fine to just NULL out both arguments.pull/1439/head
parent
caaaa2c9f7
commit
300349e646
|
@ -356,11 +356,21 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
dest->rStartup(dest, 0, tupleDescriptor);
|
dest->rStartup(dest, 0, tupleDescriptor);
|
||||||
|
|
||||||
/* initialize copy state to read from COPY data source */
|
/* initialize copy state to read from COPY data source */
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
copyState = BeginCopyFrom(NULL,
|
||||||
|
distributedRelation,
|
||||||
|
copyStatement->filename,
|
||||||
|
copyStatement->is_program,
|
||||||
|
NULL,
|
||||||
|
copyStatement->attlist,
|
||||||
|
copyStatement->options);
|
||||||
|
#else
|
||||||
copyState = BeginCopyFrom(distributedRelation,
|
copyState = BeginCopyFrom(distributedRelation,
|
||||||
copyStatement->filename,
|
copyStatement->filename,
|
||||||
copyStatement->is_program,
|
copyStatement->is_program,
|
||||||
copyStatement->attlist,
|
copyStatement->attlist,
|
||||||
copyStatement->options);
|
copyStatement->options);
|
||||||
|
#endif
|
||||||
|
|
||||||
/* set up callback to identify error line number */
|
/* set up callback to identify error line number */
|
||||||
errorCallback.callback = CopyFromErrorCallback;
|
errorCallback.callback = CopyFromErrorCallback;
|
||||||
|
@ -455,11 +465,21 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
(ShardConnections *) palloc0(sizeof(ShardConnections));
|
(ShardConnections *) palloc0(sizeof(ShardConnections));
|
||||||
|
|
||||||
/* initialize copy state to read from COPY data source */
|
/* initialize copy state to read from COPY data source */
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
CopyState copyState = BeginCopyFrom(NULL,
|
||||||
|
distributedRelation,
|
||||||
|
copyStatement->filename,
|
||||||
|
copyStatement->is_program,
|
||||||
|
NULL,
|
||||||
|
copyStatement->attlist,
|
||||||
|
copyStatement->options);
|
||||||
|
#else
|
||||||
CopyState copyState = BeginCopyFrom(distributedRelation,
|
CopyState copyState = BeginCopyFrom(distributedRelation,
|
||||||
copyStatement->filename,
|
copyStatement->filename,
|
||||||
copyStatement->is_program,
|
copyStatement->is_program,
|
||||||
copyStatement->attlist,
|
copyStatement->attlist,
|
||||||
copyStatement->options);
|
copyStatement->options);
|
||||||
|
#endif
|
||||||
|
|
||||||
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||||
copyOutState->delim = (char *) delimiterCharacter;
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
|
|
|
@ -310,8 +310,13 @@ LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob)
|
||||||
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
|
jobDirectoryName = MasterJobDirectoryName(workerTask->jobId);
|
||||||
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
|
taskFilename = TaskFilename(jobDirectoryName, workerTask->taskId);
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
copyState = BeginCopyFrom(NULL, stubRelation, taskFilename->data, false, NULL,
|
||||||
|
NULL, copyOptions);
|
||||||
|
#else
|
||||||
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
|
copyState = BeginCopyFrom(stubRelation, taskFilename->data, false, NULL,
|
||||||
copyOptions);
|
copyOptions);
|
||||||
|
#endif
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue