Avoid switch to superuser in worker_merge_files_into_table

pull/5978/head
Marco Slot 2021-09-09 10:59:05 +02:00
parent 25c71fb3d0
commit bd245b5fbb
1 changed files with 22 additions and 13 deletions

View File

@ -37,6 +37,7 @@
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "parser/parse_relation.h"
#include "parser/parse_type.h"
#include "storage/lmgr.h"
#include "utils/acl.h"
@ -183,8 +184,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
StringInfo jobSchemaName = JobSchemaName(jobId);
StringInfo taskTableName = TaskTableName(taskId);
StringInfo taskDirectoryName = TaskDirectoryName(jobId, taskId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Oid userId = GetUserId();
/* we should have the same number of column names and types */
@ -233,14 +232,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList);
/* need superuser to copy from files */
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName,
userId);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_VOID();
}
@ -569,8 +563,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
/* build relation object and copy statement */
RangeVar *relation = makeRangeVar(schemaName->data, relationName->data, -1);
CopyStmt *copyStatement = CopyStatement(relation, fullFilename->data);
RangeVar *rangeVar = makeRangeVar(schemaName->data, relationName->data, -1);
CopyStmt *copyStatement = CopyStatement(rangeVar, fullFilename->data);
if (BinaryWorkerCopyFormat)
{
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"),
@ -579,12 +573,27 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
}
{
ParseState *pstate = make_parsestate(NULL);
pstate->p_sourcetext = queryString;
ParseState *parseState = make_parsestate(NULL);
parseState->p_sourcetext = queryString;
DoCopy(pstate, copyStatement, -1, -1, &copiedRowCount);
Relation relation = table_openrv(rangeVar, RowExclusiveLock);
#if PG_VERSION_NUM >= PG_VERSION_12
(void) addRangeTableEntryForRelation(parseState, relation, RowExclusiveLock,
NULL, false, false);
#endif
free_parsestate(pstate);
CopyState copyState = BeginCopyFrom(parseState,
relation,
copyStatement->filename,
copyStatement->is_program,
NULL,
copyStatement->attlist,
copyStatement->options);
copiedRowCount = CopyFrom(copyState);
EndCopyFrom(copyState);
free_parsestate(parseState);
table_close(relation, NoLock);
}
copiedRowTotal += copiedRowCount;