Run some commands as superuser to allow normal users to execute queries.

Some small parts of citus currently require superuser privileges; which
is obviously not desirable for production scenarios. Run these small
parts under superuser privileges (we use the extension owner) to avoid
that.

This does not yet coordinate grants between master and workers. Thus it
allows to create shards, load data, and run queries as a non-superuser,
but it is not easily possible to allow differentiated accesses to
several users.
pull/471/head
Andres Freund 2016-02-25 18:40:11 -08:00
parent 25615ee9d7
commit a5b3dcddb3
10 changed files with 300 additions and 34 deletions

View File

@ -22,3 +22,6 @@ CREATE FUNCTION pg_catalog.master_stage_shard_placement_row(shardid int8,
AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$; AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$;
COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4) COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4)
IS 'deprecated function to insert a row into pg_dist_shard_placement'; IS 'deprecated function to insert a row into pg_dist_shard_placement';
ALTER FUNCTION pg_catalog.citus_drop_trigger() SECURITY DEFINER;

View File

@ -327,6 +327,7 @@ CREATE OR REPLACE FUNCTION citus_drop_trigger()
RETURNS event_trigger RETURNS event_trigger
LANGUAGE plpgsql LANGUAGE plpgsql
SET search_path = pg_catalog SET search_path = pg_catalog
/* declared as SECURITY DEFINER in upgrade script */
AS $cdbdt$ AS $cdbdt$
DECLARE v_obj record; DECLARE v_obj record;
BEGIN BEGIN

View File

@ -49,6 +49,8 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags)
MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST;
Job *workerJob = multiPlan->workerJob; Job *workerJob = multiPlan->workerJob;
ExecCheckRTPerms(planStatement->rtable, true);
executorType = JobExecutorType(multiPlan); executorType = JobExecutorType(multiPlan);
if (executorType == MULTI_EXECUTOR_ROUTER) if (executorType == MULTI_EXECUTOR_ROUTER)
{ {

View File

@ -10,6 +10,7 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/sysattr.h"
#include "catalog/catalog.h" #include "catalog/catalog.h"
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
@ -23,6 +24,7 @@
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "parser/parser.h" #include "parser/parser.h"
#include "parser/parse_utilcmd.h" #include "parser/parse_utilcmd.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
@ -51,7 +53,8 @@ static bool IsTransmitStmt(Node *parsetree);
static void VerifyTransmitStmt(CopyStmt *copyStatement); static void VerifyTransmitStmt(CopyStmt *copyStatement);
/* Local functions forward declarations for processing distributed table commands */ /* Local functions forward declarations for processing distributed table commands */
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag); static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner);
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
const char *createIndexCommand); const char *createIndexCommand);
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
@ -73,6 +76,8 @@ static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandStri
static bool AllFinalizedPlacementsAccessible(Oid relationId); static bool AllFinalizedPlacementsAccessible(Oid relationId);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg); void *arg);
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
/* /*
@ -95,6 +100,10 @@ multi_ProcessUtility(Node *parsetree,
DestReceiver *dest, DestReceiver *dest,
char *completionTag) char *completionTag)
{ {
bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
/* /*
* TRANSMIT used to be separate command, but to avoid patching the grammar * TRANSMIT used to be separate command, but to avoid patching the grammar
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the * it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
@ -122,7 +131,8 @@ multi_ProcessUtility(Node *parsetree,
if (IsA(parsetree, CopyStmt)) if (IsA(parsetree, CopyStmt))
{ {
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag); parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
&commandMustRunAsOwner);
if (parsetree == NULL) if (parsetree == NULL)
{ {
@ -191,15 +201,26 @@ multi_ProcessUtility(Node *parsetree,
} }
else if (IsA(parsetree, CreateRoleStmt) && CitusHasBeenLoaded()) else if (IsA(parsetree, CreateRoleStmt) && CitusHasBeenLoaded())
{ {
ereport(NOTICE, (errmsg("Citus does not support CREATE ROLE/USER " ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker"
"for distributed databases"), " nodes"),
errdetail("Multiple roles are currently supported " errhint("Connect to worker nodes directly to manually create all"
"only for local tables"))); " necessary users and roles.")));
}
if (commandMustRunAsOwner)
{
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
} }
/* now drop into standard process utility */ /* now drop into standard process utility */
standard_ProcessUtility(parsetree, queryString, context, standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag); params, dest, completionTag);
if (commandMustRunAsOwner)
{
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}
} }
@ -270,27 +291,17 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
* COPYing from distributed tables and preventing unsupported actions. The * COPYing from distributed tables and preventing unsupported actions. The
* function returns a modified COPY statement to be executed, or NULL if no * function returns a modified COPY statement to be executed, or NULL if no
* further processing is needed. * further processing is needed.
*
* commandMustRunAsOwner is an output parameter used to communicate to the caller whether
* the copy statement should be executed using elevated privileges. If
* ProcessCopyStmt that is required, a call to CheckCopyPermissions will take
* care of verifying the current user's permissions before ProcessCopyStmt
* returns.
*/ */
static Node * static Node *
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag) ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner)
{ {
/* *commandMustRunAsOwner = false; /* make sure variable is initialized */
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
* accept relative file paths. However, SQL tasks that get assigned to worker nodes
* have relative paths. We therefore convert relative paths to absolute ones here.
*/
if (copyStatement->relation == NULL &&
!copyStatement->is_from &&
!copyStatement->is_program &&
copyStatement->filename != NULL)
{
const char *filename = copyStatement->filename;
if (!is_absolute_path(filename) && JobDirectoryElement(filename))
{
copyStatement->filename = make_absolute_path(filename);
}
}
/* /*
* We check whether a distributed relation is affected. For that, we need to open the * We check whether a distributed relation is affected. For that, we need to open the
@ -301,8 +312,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
{ {
Relation copiedRelation = NULL; Relation copiedRelation = NULL;
bool isDistributedRelation = false; bool isDistributedRelation = false;
bool isFrom = copyStatement->is_from;
copiedRelation = heap_openrv(copyStatement->relation, AccessShareLock); copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock : AccessShareLock);
isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
@ -349,6 +362,50 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
} }
} }
if (copyStatement->filename != NULL && !copyStatement->is_program)
{
const char *filename = copyStatement->filename;
if (CacheDirectoryElement(filename))
{
/*
* Only superusers are allowed to copy from a file, so we have to
* become superuser to execute copies to/from files used by citus'
* query execution.
*
* XXX: This is a decidedly suboptimal solution, as that means
* that triggers, input functions, etc. run with elevated
* privileges. But this is better than not being able to run
* queries as normal user.
*/
*commandMustRunAsOwner = true;
/*
* Have to manually check permissions here as the COPY is will be
* run as a superuser.
*/
if (copyStatement->relation != NULL)
{
CheckCopyPermissions(copyStatement);
}
/*
* Check if we have a "COPY (query) TO filename". If we do, copy
* doesn't accept relative file paths. However, SQL tasks that get
* assigned to worker nodes have relative paths. We therefore
* convert relative paths to absolute ones here.
*/
if (copyStatement->relation == NULL &&
!copyStatement->is_from &&
!is_absolute_path(filename))
{
copyStatement->filename = make_absolute_path(filename);
}
}
}
return (Node *) copyStatement; return (Node *) copyStatement;
} }
@ -1088,3 +1145,149 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi
} }
/* *INDENT-ON* */ /* *INDENT-ON* */
} }
/*
* Check whether the current user has the permission to execute a COPY
* statement, raise ERROR if not. In some cases we have to do this separately
* from postgres' copy.c, because we have to execute the copy with elevated
* privileges.
*
* Copied from postgres, where it's part of DoCopy().
*/
static void
CheckCopyPermissions(CopyStmt *copyStatement)
{
/* *INDENT-OFF* */
bool is_from = copyStatement->is_from;
Relation rel;
Oid relid;
List *range_table = NIL;
TupleDesc tupDesc;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
List *attnums;
ListCell *cur;
RangeTblEntry *rte;
rel = heap_openrv(copyStatement->relation,
is_from ? RowExclusiveLock : AccessShareLock);
relid = RelationGetRelid(rel);
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = relid;
rte->relkind = rel->rd_rel->relkind;
rte->requiredPerms = required_access;
range_table = list_make1(rte);
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist);
foreach(cur, attnums)
{
int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
if (is_from)
{
#if (PG_VERSION_NUM >= 90500)
rte->insertedCols = bms_add_member(rte->insertedCols, attno);
#else
rte->modifiedCols = bms_add_member(rte->modifiedCols, attno);
#endif
}
else
{
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
}
ExecCheckRTPerms(range_table, true);
/* TODO: Perform RLS checks once supported */
heap_close(rel, NoLock);
/* *INDENT-ON* */
}
/* Helper for CheckCopyPermissions(), copied from postgres */
static List *
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
/* *INDENT-OFF* */
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
Form_pg_attribute *attr = tupDesc->attrs;
int attr_count = tupDesc->natts;
int i;
for (i = 0; i < attr_count; i++)
{
if (attr[i]->attisdropped)
{
continue;
}
attnums = lappend_int(attnums, i + 1);
}
}
else
{
/* Validate the user-supplied list and extract attnums */
ListCell *l;
foreach(l, attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
int i;
/* Lookup column name */
attnum = InvalidAttrNumber;
for (i = 0; i < tupDesc->natts; i++)
{
if (tupDesc->attrs[i]->attisdropped)
{
continue;
}
if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0)
{
attnum = tupDesc->attrs[i]->attnum;
break;
}
}
if (attnum == InvalidAttrNumber)
{
if (rel != NULL)
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
name, RelationGetRelationName(rel))));
}
else
{
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
name)));
}
}
/* Check for duplicates */
if (list_member_int(attnums, attnum))
{
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" specified more than once",
name)));
}
attnums = lappend_int(attnums, attnum);
}
}
return attnums;
/* *INDENT-ON* */
}

View File

@ -247,6 +247,10 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
* Please note that the caller is still responsible for finalizing shard data * Please note that the caller is still responsible for finalizing shard data
* and the shardId with the master node. Further note that this function relies * and the shardId with the master node. Further note that this function relies
* on an internal sequence created in initdb to generate unique identifiers. * on an internal sequence created in initdb to generate unique identifiers.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
* at some later point.
*/ */
Datum Datum
master_get_new_shardid(PG_FUNCTION_ARGS) master_get_new_shardid(PG_FUNCTION_ARGS)
@ -254,12 +258,19 @@ master_get_new_shardid(PG_FUNCTION_ARGS)
text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME); text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName); Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
Datum shardIdDatum = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */ /* generate new and unique shardId from sequence */
Datum shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
int64 shardId = DatumGetInt64(shardIdDatum);
PG_RETURN_INT64(shardId); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum);
} }

View File

@ -1650,11 +1650,19 @@ UniqueJobId(void)
text *sequenceName = cstring_to_text(JOBID_SEQUENCE_NAME); text *sequenceName = cstring_to_text(JOBID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName); Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Datum jobIdDatum = 0;
int64 jobId = 0;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique jobId from sequence */ /* generate new and unique jobId from sequence */
Datum jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
int64 jobId = DatumGetInt64(jobIdDatum); jobId = DatumGetInt64(jobIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
return jobId; return jobId;
} }

View File

@ -259,6 +259,10 @@ CreateJobSchema(StringInfo schemaName)
const char *queryString = NULL; const char *queryString = NULL;
bool oldAllowSystemTableMods = false; bool oldAllowSystemTableMods = false;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
/* build a CREATE SCHEMA statement */
CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt);
createSchemaStmt->schemaname = schemaName->data; createSchemaStmt->schemaname = schemaName->data;
#if (PG_VERSION_NUM >= 90500) #if (PG_VERSION_NUM >= 90500)
@ -272,8 +276,16 @@ CreateJobSchema(StringInfo schemaName)
oldAllowSystemTableMods = allowSystemTableMods; oldAllowSystemTableMods = allowSystemTableMods;
allowSystemTableMods = true; allowSystemTableMods = true;
/* ensure we're allowed to create this schema */
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* actually create schema, and make it visible */
CreateSchemaCommand(createSchemaStmt, queryString); CreateSchemaCommand(createSchemaStmt, queryString);
CommandCounterIncrement(); CommandCounterIncrement();
/* and reset environment */
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
allowSystemTableMods = oldAllowSystemTableMods; allowSystemTableMods = oldAllowSystemTableMods;
} }

View File

@ -522,6 +522,31 @@ JobDirectoryElement(const char *filename)
} }
/*
* CacheDirectoryElement takes in a filename, and checks if this name lives in
* the directory path that is used for job, task, table etc. files.
*/
bool
CacheDirectoryElement(const char *filename)
{
bool directoryElement = false;
char *directoryPathFound = NULL;
StringInfo directoryPath = makeStringInfo();
appendStringInfo(directoryPath, "base/%s/", PG_JOB_CACHE_DIR);
directoryPathFound = strstr(filename, directoryPath->data);
if (directoryPathFound != NULL)
{
directoryElement = true;
}
pfree(directoryPath);
return directoryElement;
}
/* Checks if a directory exists for the given directory name. */ /* Checks if a directory exists for the given directory name. */
bool bool
DirectoryExists(StringInfo directoryName) DirectoryExists(StringInfo directoryName)

View File

@ -106,6 +106,7 @@ extern bool JobSchemaExists(StringInfo schemaName);
extern StringInfo JobDirectoryName(uint64 jobId); extern StringInfo JobDirectoryName(uint64 jobId);
extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId); extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId);
extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId); extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId);
extern bool CacheDirectoryElement(const char *filename);
extern bool JobDirectoryElement(const char *filename); extern bool JobDirectoryElement(const char *filename);
extern bool DirectoryExists(StringInfo directoryName); extern bool DirectoryExists(StringInfo directoryName);
extern void CreateDirectory(StringInfo directoryName); extern void CreateDirectory(StringInfo directoryName);

View File

@ -11,8 +11,8 @@ CREATE SCHEMA new_schema;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE ROLE new_role; CREATE ROLE new_role;
NOTICE: Citus does not support CREATE ROLE/USER for distributed databases NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
DETAIL: Multiple roles are currently supported only for local tables HINT: Connect to worker nodes directly to manually create all necessary users and roles.
CREATE USER new_user; CREATE USER new_user;
NOTICE: Citus does not support CREATE ROLE/USER for distributed databases NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
DETAIL: Multiple roles are currently supported only for local tables HINT: Connect to worker nodes directly to manually create all necessary users and roles.