diff --git a/src/backend/distributed/citus--5.0--5.0-1.sql b/src/backend/distributed/citus--5.0--5.0-1.sql index c4b18cff6..249ae23fe 100644 --- a/src/backend/distributed/citus--5.0--5.0-1.sql +++ b/src/backend/distributed/citus--5.0--5.0-1.sql @@ -22,3 +22,6 @@ CREATE FUNCTION pg_catalog.master_stage_shard_placement_row(shardid int8, AS 'MODULE_PATHNAME', $$master_stage_shard_placement_row$$; COMMENT ON FUNCTION pg_catalog.master_stage_shard_placement_row(int8, int4, int8, text, int4) IS 'deprecated function to insert a row into pg_dist_shard_placement'; + + +ALTER FUNCTION pg_catalog.citus_drop_trigger() SECURITY DEFINER; diff --git a/src/backend/distributed/citus.sql b/src/backend/distributed/citus.sql index 9fb23bb7f..beff1d468 100644 --- a/src/backend/distributed/citus.sql +++ b/src/backend/distributed/citus.sql @@ -327,6 +327,7 @@ CREATE OR REPLACE FUNCTION citus_drop_trigger() RETURNS event_trigger LANGUAGE plpgsql SET search_path = pg_catalog + /* declared as SECURITY DEFINER in upgrade script */ AS $cdbdt$ DECLARE v_obj record; BEGIN diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index caa416d7f..929af593c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -49,6 +49,8 @@ multi_ExecutorStart(QueryDesc *queryDesc, int eflags) MultiExecutorType executorType = MULTI_EXECUTOR_INVALID_FIRST; Job *workerJob = multiPlan->workerJob; + ExecCheckRTPerms(planStatement->rtable, true); + executorType = JobExecutorType(multiPlan); if (executorType == MULTI_EXECUTOR_ROUTER) { diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 3e21b1ac6..731bf773d 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -10,6 +10,7 @@ #include "miscadmin.h" #include "access/htup_details.h" +#include "access/sysattr.h" #include "catalog/catalog.h" #include "catalog/index.h" #include "catalog/namespace.h" @@ -23,6 +24,7 @@ #include "distributed/transmit.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" +#include "executor/executor.h" #include "parser/parser.h" #include "parser/parse_utilcmd.h" #include "storage/lmgr.h" @@ -51,7 +53,8 @@ static bool IsTransmitStmt(Node *parsetree); static void VerifyTransmitStmt(CopyStmt *copyStatement); /* Local functions forward declarations for processing distributed table commands */ -static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag); +static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, + bool *commandMustRunAsOwner); static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand); static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, @@ -73,6 +76,8 @@ static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandStri static bool AllFinalizedPlacementsAccessible(Oid relationId); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); +static void CheckCopyPermissions(CopyStmt *copyStatement); +static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); /* @@ -95,6 +100,10 @@ multi_ProcessUtility(Node *parsetree, DestReceiver *dest, char *completionTag) { + bool commandMustRunAsOwner = false; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + /* * TRANSMIT used to be separate command, but to avoid patching the grammar * it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the @@ -122,7 +131,8 @@ multi_ProcessUtility(Node *parsetree, if (IsA(parsetree, CopyStmt)) { - parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag); + parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag, + &commandMustRunAsOwner); if (parsetree == NULL) { @@ -191,15 +201,26 @@ multi_ProcessUtility(Node *parsetree, } else if (IsA(parsetree, CreateRoleStmt) && CitusHasBeenLoaded()) { - ereport(NOTICE, (errmsg("Citus does not support CREATE ROLE/USER " - "for distributed databases"), - errdetail("Multiple roles are currently supported " - "only for local tables"))); + ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker" + " nodes"), + errhint("Connect to worker nodes directly to manually create all" + " necessary users and roles."))); + } + + if (commandMustRunAsOwner) + { + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); } /* now drop into standard process utility */ standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); + + if (commandMustRunAsOwner) + { + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + } } @@ -270,27 +291,17 @@ VerifyTransmitStmt(CopyStmt *copyStatement) * COPYing from distributed tables and preventing unsupported actions. The * function returns a modified COPY statement to be executed, or NULL if no * further processing is needed. + * + * commandMustRunAsOwner is an output parameter used to communicate to the caller whether + * the copy statement should be executed using elevated privileges. If + * ProcessCopyStmt that is required, a call to CheckCopyPermissions will take + * care of verifying the current user's permissions before ProcessCopyStmt + * returns. */ static Node * -ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag) +ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner) { - /* - * We first check if we have a "COPY (query) TO filename". If we do, copy doesn't - * accept relative file paths. However, SQL tasks that get assigned to worker nodes - * have relative paths. We therefore convert relative paths to absolute ones here. - */ - if (copyStatement->relation == NULL && - !copyStatement->is_from && - !copyStatement->is_program && - copyStatement->filename != NULL) - { - const char *filename = copyStatement->filename; - - if (!is_absolute_path(filename) && JobDirectoryElement(filename)) - { - copyStatement->filename = make_absolute_path(filename); - } - } + *commandMustRunAsOwner = false; /* make sure variable is initialized */ /* * We check whether a distributed relation is affected. For that, we need to open the @@ -301,8 +312,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag) { Relation copiedRelation = NULL; bool isDistributedRelation = false; + bool isFrom = copyStatement->is_from; - copiedRelation = heap_openrv(copyStatement->relation, AccessShareLock); + copiedRelation = heap_openrv(copyStatement->relation, + isFrom ? RowExclusiveLock : AccessShareLock); isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); @@ -349,6 +362,50 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag) } } + + if (copyStatement->filename != NULL && !copyStatement->is_program) + { + const char *filename = copyStatement->filename; + + if (CacheDirectoryElement(filename)) + { + /* + * Only superusers are allowed to copy from a file, so we have to + * become superuser to execute copies to/from files used by citus' + * query execution. + * + * XXX: This is a decidedly suboptimal solution, as that means + * that triggers, input functions, etc. run with elevated + * privileges. But this is better than not being able to run + * queries as normal user. + */ + *commandMustRunAsOwner = true; + + /* + * Have to manually check permissions here as the COPY is will be + * run as a superuser. + */ + if (copyStatement->relation != NULL) + { + CheckCopyPermissions(copyStatement); + } + + /* + * Check if we have a "COPY (query) TO filename". If we do, copy + * doesn't accept relative file paths. However, SQL tasks that get + * assigned to worker nodes have relative paths. We therefore + * convert relative paths to absolute ones here. + */ + if (copyStatement->relation == NULL && + !copyStatement->is_from && + !is_absolute_path(filename)) + { + copyStatement->filename = make_absolute_path(filename); + } + } + } + + return (Node *) copyStatement; } @@ -1088,3 +1145,149 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi } /* *INDENT-ON* */ } + + +/* + * Check whether the current user has the permission to execute a COPY + * statement, raise ERROR if not. In some cases we have to do this separately + * from postgres' copy.c, because we have to execute the copy with elevated + * privileges. + * + * Copied from postgres, where it's part of DoCopy(). + */ +static void +CheckCopyPermissions(CopyStmt *copyStatement) +{ + /* *INDENT-OFF* */ + bool is_from = copyStatement->is_from; + Relation rel; + Oid relid; + List *range_table = NIL; + TupleDesc tupDesc; + AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); + List *attnums; + ListCell *cur; + RangeTblEntry *rte; + + rel = heap_openrv(copyStatement->relation, + is_from ? RowExclusiveLock : AccessShareLock); + + relid = RelationGetRelid(rel); + + rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = relid; + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = required_access; + range_table = list_make1(rte); + + tupDesc = RelationGetDescr(rel); + + attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist); + foreach(cur, attnums) + { + int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber; + + if (is_from) + { +#if (PG_VERSION_NUM >= 90500) + rte->insertedCols = bms_add_member(rte->insertedCols, attno); +#else + rte->modifiedCols = bms_add_member(rte->modifiedCols, attno); +#endif + } + else + { + rte->selectedCols = bms_add_member(rte->selectedCols, attno); + } + } + + ExecCheckRTPerms(range_table, true); + + /* TODO: Perform RLS checks once supported */ + + heap_close(rel, NoLock); + /* *INDENT-ON* */ +} + + +/* Helper for CheckCopyPermissions(), copied from postgres */ +static List * +CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) +{ + /* *INDENT-OFF* */ + List *attnums = NIL; + + if (attnamelist == NIL) + { + /* Generate default column list */ + Form_pg_attribute *attr = tupDesc->attrs; + int attr_count = tupDesc->natts; + int i; + + for (i = 0; i < attr_count; i++) + { + if (attr[i]->attisdropped) + { + continue; + } + attnums = lappend_int(attnums, i + 1); + } + } + else + { + /* Validate the user-supplied list and extract attnums */ + ListCell *l; + + foreach(l, attnamelist) + { + char *name = strVal(lfirst(l)); + int attnum; + int i; + + /* Lookup column name */ + attnum = InvalidAttrNumber; + for (i = 0; i < tupDesc->natts; i++) + { + if (tupDesc->attrs[i]->attisdropped) + { + continue; + } + if (namestrcmp(&(tupDesc->attrs[i]->attname), name) == 0) + { + attnum = tupDesc->attrs[i]->attnum; + break; + } + } + if (attnum == InvalidAttrNumber) + { + if (rel != NULL) + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + name, RelationGetRelationName(rel)))); + } + else + { + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" does not exist", + name))); + } + } + /* Check for duplicates */ + if (list_member_int(attnums, attnum)) + { + ereport(ERROR, + (errcode(ERRCODE_DUPLICATE_COLUMN), + errmsg("column \"%s\" specified more than once", + name))); + } + attnums = lappend_int(attnums, attnum); + } + } + + return attnums; + /* *INDENT-ON* */ +} diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index b36f2503e..2351bf838 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -247,6 +247,10 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) * Please note that the caller is still responsible for finalizing shard data * and the shardId with the master node. Further note that this function relies * on an internal sequence created in initdb to generate unique identifiers. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. */ Datum master_get_new_shardid(PG_FUNCTION_ARGS) @@ -254,12 +258,19 @@ master_get_new_shardid(PG_FUNCTION_ARGS) text *sequenceName = cstring_to_text(SHARDID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum shardIdDatum = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); /* generate new and unique shardId from sequence */ - Datum shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - int64 shardId = DatumGetInt64(shardIdDatum); + shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - PG_RETURN_INT64(shardId); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(shardIdDatum); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index 62eb14a1a..7943968bd 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -1650,11 +1650,19 @@ UniqueJobId(void) text *sequenceName = cstring_to_text(JOBID_SEQUENCE_NAME); Oid sequenceId = ResolveRelationId(sequenceName); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Datum jobIdDatum = 0; + int64 jobId = 0; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); /* generate new and unique jobId from sequence */ - Datum jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); - int64 jobId = DatumGetInt64(jobIdDatum); + jobIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + jobId = DatumGetInt64(jobIdDatum); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); return jobId; } diff --git a/src/backend/distributed/worker/task_tracker_protocol.c b/src/backend/distributed/worker/task_tracker_protocol.c index a2b2628ea..d167ccfaa 100644 --- a/src/backend/distributed/worker/task_tracker_protocol.c +++ b/src/backend/distributed/worker/task_tracker_protocol.c @@ -259,6 +259,10 @@ CreateJobSchema(StringInfo schemaName) const char *queryString = NULL; bool oldAllowSystemTableMods = false; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + + /* build a CREATE SCHEMA statement */ CreateSchemaStmt *createSchemaStmt = makeNode(CreateSchemaStmt); createSchemaStmt->schemaname = schemaName->data; #if (PG_VERSION_NUM >= 90500) @@ -272,8 +276,16 @@ CreateJobSchema(StringInfo schemaName) oldAllowSystemTableMods = allowSystemTableMods; allowSystemTableMods = true; + /* ensure we're allowed to create this schema */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* actually create schema, and make it visible */ CreateSchemaCommand(createSchemaStmt, queryString); CommandCounterIncrement(); + + /* and reset environment */ + SetUserIdAndSecContext(savedUserId, savedSecurityContext); allowSystemTableMods = oldAllowSystemTableMods; } diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index d391fd1fa..7eee958b2 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -522,6 +522,31 @@ JobDirectoryElement(const char *filename) } +/* + * CacheDirectoryElement takes in a filename, and checks if this name lives in + * the directory path that is used for job, task, table etc. files. + */ +bool +CacheDirectoryElement(const char *filename) +{ + bool directoryElement = false; + char *directoryPathFound = NULL; + + StringInfo directoryPath = makeStringInfo(); + appendStringInfo(directoryPath, "base/%s/", PG_JOB_CACHE_DIR); + + directoryPathFound = strstr(filename, directoryPath->data); + if (directoryPathFound != NULL) + { + directoryElement = true; + } + + pfree(directoryPath); + + return directoryElement; +} + + /* Checks if a directory exists for the given directory name. */ bool DirectoryExists(StringInfo directoryName) diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index ac526a2be..a09b27fbf 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -106,6 +106,7 @@ extern bool JobSchemaExists(StringInfo schemaName); extern StringInfo JobDirectoryName(uint64 jobId); extern StringInfo TaskDirectoryName(uint64 jobId, uint32 taskId); extern StringInfo PartitionFilename(StringInfo directoryName, uint32 partitionId); +extern bool CacheDirectoryElement(const char *filename); extern bool JobDirectoryElement(const char *filename); extern bool DirectoryExists(StringInfo directoryName); extern void CreateDirectory(StringInfo directoryName); diff --git a/src/test/regress/expected/multi_utility_warnings.out b/src/test/regress/expected/multi_utility_warnings.out index b10cc7598..b5ce4ec44 100644 --- a/src/test/regress/expected/multi_utility_warnings.out +++ b/src/test/regress/expected/multi_utility_warnings.out @@ -11,8 +11,8 @@ CREATE SCHEMA new_schema; NOTICE: Citus partially supports CREATE SCHEMA for distributed databases DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet CREATE ROLE new_role; -NOTICE: Citus does not support CREATE ROLE/USER for distributed databases -DETAIL: Multiple roles are currently supported only for local tables +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. CREATE USER new_user; -NOTICE: Citus does not support CREATE ROLE/USER for distributed databases -DETAIL: Multiple roles are currently supported only for local tables +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles.