From 42ff472721c352a07ddc06ad045acc387559419e Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 16 Dec 2016 12:04:28 +0100 Subject: [PATCH 1/2] Set user as pg_merge_job_* schema owner --- .../worker/task_tracker_protocol.c | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index 0718487f7..aea2cdbd5 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -261,12 +261,8 @@ CreateJobSchema(StringInfo schemaName) Oid savedUserId = InvalidOid; int savedSecurityContext = 0; - - /* build a CREATE SCHEMA statement */ - CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); - createSchemaStmt->schemaname = schemaName->data; - createSchemaStmt->authrole = NULL; - createSchemaStmt->schemaElts = NIL; + CreateSchemaStmt *createSchemaStmt = NULL; + RoleSpec currentUserRole = { 0 }; /* allow schema names that start with pg_ */ oldAllowSystemTableMods = allowSystemTableMods; @@ -276,7 +272,18 @@ CreateJobSchema(StringInfo schemaName) GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - /* actually create schema, and make it visible */ + /* build a CREATE SCHEMA statement */ + currentUserRole.type = T_RoleSpec; + currentUserRole.roletype = ROLESPEC_CSTRING; + currentUserRole.rolename = GetUserNameFromId(savedUserId, false); + currentUserRole.location = -1; + + createSchemaStmt = makeNode(CreateSchemaStmt); + createSchemaStmt->schemaname = schemaName->data; + createSchemaStmt->authrole = (Node *) ¤tUserRole; + createSchemaStmt->schemaElts = NIL; + + /* actually create schema with the current user as owner */ CreateSchemaCommand(createSchemaStmt, queryString); CommandCounterIncrement(); From dd094bc3727792579ee88b2fcc9ff67665a44edf Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Fri, 16 Dec 2016 12:05:08 +0100 Subject: [PATCH 2/2] Run copy commands in worker_merge_files_into_table as superuser --- src/backend/distributed/worker/worker_merge_protocol.c | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 38d9cbc45..6754f4acd 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -23,6 +23,7 @@ #include "catalog/pg_namespace.h" #include "commands/copy.h" #include "commands/tablecmds.h" +#include "distributed/metadata_cache.h" #include "distributed/worker_protocol.h" #include "executor/spi.h" #include "nodes/makefuncs.h" @@ -74,6 +75,8 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) bool schemaExists = false; List *columnNameList = NIL; List *columnTypeList = NIL; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; /* we should have the same number of column names and types */ int32 columnNameCount = ArrayObjectCount(columnNameObject); @@ -101,8 +104,14 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList); + /* need superuser to copy from files */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + PG_RETURN_VOID(); }