From 73e1d81bb3f33fbac23f2cd543e9cf33fe7751c6 Mon Sep 17 00:00:00 2001 From: Brian Cloutier Date: Tue, 10 Apr 2018 22:43:30 +0000 Subject: [PATCH] likely unnecessary hardening of COPY --- src/backend/distributed/commands/multi_copy.c | 94 +++++++++++-------- 1 file changed, 54 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 66610c45b..8a7b19276 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -323,6 +323,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) uint64 processedRowCount = 0; ErrorContextCallback errorCallback; + volatile MemoryContext oldContext; /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); @@ -405,56 +406,69 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) copiedDistributedRelationTuple->relkind = RELKIND_RELATION; } - /* initialize copy state to read from COPY data source */ + PG_TRY(); + { + + /* initialize copy state to read from COPY data source */ #if (PG_VERSION_NUM >= 100000) - copyState = BeginCopyFrom(NULL, - copiedDistributedRelation, - copyStatement->filename, - copyStatement->is_program, - NULL, - copyStatement->attlist, - copyStatement->options); + copyState = BeginCopyFrom(NULL, + copiedDistributedRelation, + copyStatement->filename, + copyStatement->is_program, + NULL, + copyStatement->attlist, + copyStatement->options); #else - copyState = BeginCopyFrom(copiedDistributedRelation, - copyStatement->filename, - copyStatement->is_program, - copyStatement->attlist, - copyStatement->options); + copyState = BeginCopyFrom(copiedDistributedRelation, + copyStatement->filename, + copyStatement->is_program, + copyStatement->attlist, + copyStatement->options); #endif - /* set up callback to identify error line number */ - errorCallback.callback = CopyFromErrorCallback; - errorCallback.arg = (void *) copyState; - errorCallback.previous = error_context_stack; - error_context_stack = &errorCallback; + /* set up callback to identify error line number */ + errorCallback.callback = CopyFromErrorCallback; + errorCallback.arg = (void *) copyState; + errorCallback.previous = error_context_stack; + error_context_stack = &errorCallback; - while (true) + while (true) + { + bool nextRowFound = false; + + ResetPerTupleExprContext(executorState); + + oldContext = MemoryContextSwitchTo(executorTupleContext); + + /* parse a row from the input */ + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, + columnValues, columnNulls, NULL); + + if (!nextRowFound) + { + MemoryContextSwitchTo(oldContext); + break; + } + + CHECK_FOR_INTERRUPTS(); + + MemoryContextSwitchTo(oldContext); + + dest->receiveSlot(tupleTableSlot, dest); + + processedRowCount += 1; + } + } + PG_CATCH(); { - bool nextRowFound = false; - MemoryContext oldContext = NULL; - - ResetPerTupleExprContext(executorState); - - oldContext = MemoryContextSwitchTo(executorTupleContext); - - /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, executorExpressionContext, - columnValues, columnNulls, NULL); - - if (!nextRowFound) + if (oldContext != NULL) { MemoryContextSwitchTo(oldContext); - break; } - - CHECK_FOR_INTERRUPTS(); - - MemoryContextSwitchTo(oldContext); - - dest->receiveSlot(tupleTableSlot, dest); - - processedRowCount += 1; + AtEOXact_Files(); + PG_RE_THROW(); } + PG_END_TRY(); EndCopyFrom(copyState);