From 14e71b1b0a91ebfee8bde68fbc620d4fecc868be Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Tue, 22 Mar 2016 14:17:40 -0700 Subject: [PATCH] Create an executor state, and use its memory context --- src/backend/distributed/commands/multi_copy.c | 39 ++++++++----------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 6ce4cd926..6a2db3e49 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -149,6 +149,9 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) RangeVar *relation = copyStatement->relation; Oid tableId = RangeVarGetRelid(relation, NoLock, false); char *relationName = get_rel_name(tableId); + EState *executorState = NULL; + MemoryContext executorTupleContext = NULL; + ExprContext *executorExpressionContext = NULL; List *shardIntervalList = NULL; ListCell *shardIntervalCell = NULL; char partitionMethod = '\0'; @@ -157,7 +160,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) HASH_SEQ_STATUS status; ShardConnections *shardConnections = NULL; List *connectionList = NIL; - MemoryContext tupleContext = NULL; CopyState copyState = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; @@ -174,7 +176,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) ErrorContextCallback errorCallback; CopyOutState copyOutState = NULL; FmgrInfo *columnOutputFunctions = NULL; - ExprContext *expressionContext = NULL; /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -283,26 +284,14 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; - /* - * We create a new memory context called tuple context, and read and write - * each row's values within this memory context. After each read and write, - * we reset the memory context. That way, we immediately release memory - * allocated for each row, and don't bloat memory usage with large input - * files. - */ - tupleContext = AllocSetContextCreate(CurrentMemoryContext, - "COPY Row Memory Context", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - - expressionContext = CreateStandaloneExprContext(); - expressionContext->ecxt_per_tuple_memory = tupleContext; + executorState = CreateExecutorState(); + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); copyOutState->binary = true; copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = tupleContext; + copyOutState->rowcontext = executorTupleContext; columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); @@ -318,17 +307,18 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) bool found = false; MemoryContext oldContext = NULL; - oldContext = MemoryContextSwitchTo(tupleContext); + ResetPerTupleExprContext(executorState); + + oldContext = MemoryContextSwitchTo(executorTupleContext); /* parse a row from the input */ - nextRowFound = NextCopyFrom(copyState, expressionContext, + nextRowFound = NextCopyFrom(copyState, executorExpressionContext, columnValues,columnNulls, NULL); MemoryContextSwitchTo(oldContext); if (!nextRowFound) { - MemoryContextReset(tupleContext); break; } @@ -381,8 +371,6 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) SendCopyDataToPlacements(copyOutState->fe_msgbuf, shardConnections); processedRowCount += 1; - - MemoryContextReset(tupleContext); } /* send binary footers to all shards */ @@ -406,6 +394,11 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) PrepareTransactions(connectionList); } + pfree(columnValues); + pfree(columnNulls); + + FreeExecutorState(executorState); + CHECK_FOR_INTERRUPTS(); } PG_CATCH();