mirror of https://github.com/citusdata/citus.git
likely unnecessary hardening of COPY
parent
e06e9842c0
commit
73e1d81bb3
|
@ -323,6 +323,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
uint64 processedRowCount = 0;
|
uint64 processedRowCount = 0;
|
||||||
|
|
||||||
ErrorContextCallback errorCallback;
|
ErrorContextCallback errorCallback;
|
||||||
|
volatile MemoryContext oldContext;
|
||||||
|
|
||||||
/* allocate column values and nulls arrays */
|
/* allocate column values and nulls arrays */
|
||||||
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||||
|
@ -405,56 +406,69 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
|
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* initialize copy state to read from COPY data source */
|
PG_TRY();
|
||||||
|
{
|
||||||
|
|
||||||
|
/* initialize copy state to read from COPY data source */
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
copyState = BeginCopyFrom(NULL,
|
copyState = BeginCopyFrom(NULL,
|
||||||
copiedDistributedRelation,
|
copiedDistributedRelation,
|
||||||
copyStatement->filename,
|
copyStatement->filename,
|
||||||
copyStatement->is_program,
|
copyStatement->is_program,
|
||||||
NULL,
|
NULL,
|
||||||
copyStatement->attlist,
|
copyStatement->attlist,
|
||||||
copyStatement->options);
|
copyStatement->options);
|
||||||
#else
|
#else
|
||||||
copyState = BeginCopyFrom(copiedDistributedRelation,
|
copyState = BeginCopyFrom(copiedDistributedRelation,
|
||||||
copyStatement->filename,
|
copyStatement->filename,
|
||||||
copyStatement->is_program,
|
copyStatement->is_program,
|
||||||
copyStatement->attlist,
|
copyStatement->attlist,
|
||||||
copyStatement->options);
|
copyStatement->options);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* set up callback to identify error line number */
|
/* set up callback to identify error line number */
|
||||||
errorCallback.callback = CopyFromErrorCallback;
|
errorCallback.callback = CopyFromErrorCallback;
|
||||||
errorCallback.arg = (void *) copyState;
|
errorCallback.arg = (void *) copyState;
|
||||||
errorCallback.previous = error_context_stack;
|
errorCallback.previous = error_context_stack;
|
||||||
error_context_stack = &errorCallback;
|
error_context_stack = &errorCallback;
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
|
{
|
||||||
|
bool nextRowFound = false;
|
||||||
|
|
||||||
|
ResetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
|
/* parse a row from the input */
|
||||||
|
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
||||||
|
columnValues, columnNulls, NULL);
|
||||||
|
|
||||||
|
if (!nextRowFound)
|
||||||
|
{
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
dest->receiveSlot(tupleTableSlot, dest);
|
||||||
|
|
||||||
|
processedRowCount += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
{
|
{
|
||||||
bool nextRowFound = false;
|
if (oldContext != NULL)
|
||||||
MemoryContext oldContext = NULL;
|
|
||||||
|
|
||||||
ResetPerTupleExprContext(executorState);
|
|
||||||
|
|
||||||
oldContext = MemoryContextSwitchTo(executorTupleContext);
|
|
||||||
|
|
||||||
/* parse a row from the input */
|
|
||||||
nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
|
|
||||||
columnValues, columnNulls, NULL);
|
|
||||||
|
|
||||||
if (!nextRowFound)
|
|
||||||
{
|
{
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
AtEOXact_Files();
|
||||||
CHECK_FOR_INTERRUPTS();
|
PG_RE_THROW();
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
dest->receiveSlot(tupleTableSlot, dest);
|
|
||||||
|
|
||||||
processedRowCount += 1;
|
|
||||||
}
|
}
|
||||||
|
PG_END_TRY();
|
||||||
|
|
||||||
EndCopyFrom(copyState);
|
EndCopyFrom(copyState);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue