From bd245b5fbb3e0daa83def3ea55fffc05c27a3732 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 9 Sep 2021 10:59:05 +0200 Subject: [PATCH] Avoid switch to superuser in worker_merge_files_into_table --- .../worker/worker_merge_protocol.c | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index b32204aa6..4a794085d 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -37,6 +37,7 @@ #include "executor/spi.h" #include "nodes/makefuncs.h" +#include "parser/parse_relation.h" #include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/acl.h" @@ -183,8 +184,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) StringInfo jobSchemaName = JobSchemaName(jobId); StringInfo taskTableName = TaskTableName(taskId); StringInfo taskDirectoryName = TaskDirectoryName(jobId, taskId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; Oid userId = GetUserId(); /* we should have the same number of column names and types */ @@ -233,14 +232,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList); - /* need superuser to copy from files */ - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName, userId); - SetUserIdAndSecContext(savedUserId, savedSecurityContext); PG_RETURN_VOID(); } @@ -569,8 +563,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename); /* build relation object and copy statement */ - RangeVar *relation = makeRangeVar(schemaName->data, relationName->data, -1); - CopyStmt *copyStatement = CopyStatement(relation, fullFilename->data); + RangeVar *rangeVar = makeRangeVar(schemaName->data, relationName->data, -1); + CopyStmt *copyStatement = CopyStatement(rangeVar, fullFilename->data); if (BinaryWorkerCopyFormat) { DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"), @@ -579,12 +573,27 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, } { - ParseState *pstate = make_parsestate(NULL); - pstate->p_sourcetext = queryString; + ParseState *parseState = make_parsestate(NULL); + parseState->p_sourcetext = queryString; - DoCopy(pstate, copyStatement, -1, -1, &copiedRowCount); + Relation relation = table_openrv(rangeVar, RowExclusiveLock); +#if PG_VERSION_NUM >= PG_VERSION_12 + (void) addRangeTableEntryForRelation(parseState, relation, RowExclusiveLock, + NULL, false, false); +#endif - free_parsestate(pstate); + CopyState copyState = BeginCopyFrom(parseState, + relation, + copyStatement->filename, + copyStatement->is_program, + NULL, + copyStatement->attlist, + copyStatement->options); + copiedRowCount = CopyFrom(copyState); + EndCopyFrom(copyState); + + free_parsestate(parseState); + table_close(relation, NoLock); } copiedRowTotal += copiedRowCount;