mirror of https://github.com/citusdata/citus.git
Merge pull request #2490 from citusdata/fix_tt_protocol
Check ownership in task-tracker protocol functionspull/2492/head
commit
2afbb89673
|
@ -1346,6 +1346,21 @@ EnsureTableOwner(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check that the current user has owner rights to the schema, error out if
|
||||||
|
* not. Superusers are regarded as owners.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
EnsureSchemaOwner(Oid schemaId)
|
||||||
|
{
|
||||||
|
if (!pg_namespace_ownercheck(schemaId, GetUserId()))
|
||||||
|
{
|
||||||
|
aclcheck_error(ACLCHECK_NOT_OWNER, ACLCHECK_OBJECT_SCHEMA,
|
||||||
|
get_namespace_name(schemaId));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Check that the current user has owner rights to sequenceRelationId, error out if
|
* Check that the current user has owner rights to sequenceRelationId, error out if
|
||||||
* not. Superusers are regarded as owners.
|
* not. Superusers are regarded as owners.
|
||||||
|
|
|
@ -19,7 +19,10 @@
|
||||||
|
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
|
#include "access/htup_details.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
|
#include "catalog/pg_namespace.h"
|
||||||
|
#include "catalog/namespace.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
#include "commands/schemacmds.h"
|
#include "commands/schemacmds.h"
|
||||||
#include "commands/trigger.h"
|
#include "commands/trigger.h"
|
||||||
|
@ -33,6 +36,8 @@
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
#include "storage/pmsignal.h"
|
#include "storage/pmsignal.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/syscache.h"
|
||||||
|
#include "utils/lsyscache.h"
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
|
@ -105,6 +110,10 @@ task_tracker_assign_task(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||||
|
|
||||||
|
EnsureSchemaOwner(schemaId);
|
||||||
|
|
||||||
UnlockJobResource(jobId, AccessExclusiveLock);
|
UnlockJobResource(jobId, AccessExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -136,7 +145,7 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
WorkerTask *workerTask = NULL;
|
WorkerTask *workerTask = NULL;
|
||||||
uint32 taskStatus = 0;
|
uint32 taskStatus = 0;
|
||||||
|
char *userName = CurrentUserName();
|
||||||
bool taskTrackerRunning = false;
|
bool taskTrackerRunning = false;
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
@ -148,7 +157,8 @@ task_tracker_task_status(PG_FUNCTION_ARGS)
|
||||||
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
LWLockAcquire(&WorkerTasksSharedState->taskHashLock, LW_SHARED);
|
||||||
|
|
||||||
workerTask = WorkerTasksHashFind(jobId, taskId);
|
workerTask = WorkerTasksHashFind(jobId, taskId);
|
||||||
if (workerTask == NULL)
|
if (workerTask == NULL ||
|
||||||
|
(!superuser() && strncmp(userName, workerTask->userName, NAMEDATALEN) != 0))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not find the worker task"),
|
ereport(ERROR, (errmsg("could not find the worker task"),
|
||||||
errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u",
|
errdetail("Task jobId: " UINT64_FORMAT " and taskId: %u",
|
||||||
|
@ -178,6 +188,7 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
uint64 jobId = PG_GETARG_INT64(0);
|
uint64 jobId = PG_GETARG_INT64(0);
|
||||||
|
|
||||||
|
bool schemaExists = false;
|
||||||
HASH_SEQ_STATUS status;
|
HASH_SEQ_STATUS status;
|
||||||
WorkerTask *currentTask = NULL;
|
WorkerTask *currentTask = NULL;
|
||||||
StringInfo jobDirectoryName = NULL;
|
StringInfo jobDirectoryName = NULL;
|
||||||
|
@ -185,6 +196,22 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
jobSchemaName = JobSchemaName(jobId);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We'll keep this lock for a while, but that's ok because nothing
|
||||||
|
* else should be happening on this job.
|
||||||
|
*/
|
||||||
|
LockJobResource(jobId, AccessExclusiveLock);
|
||||||
|
|
||||||
|
schemaExists = JobSchemaExists(jobSchemaName);
|
||||||
|
if (schemaExists)
|
||||||
|
{
|
||||||
|
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||||
|
|
||||||
|
EnsureSchemaOwner(schemaId);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We first clean up any open connections, and remove tasks belonging to
|
* We first clean up any open connections, and remove tasks belonging to
|
||||||
* this job from the shared hash.
|
* this job from the shared hash.
|
||||||
|
@ -215,8 +242,6 @@ task_tracker_cleanup_job(PG_FUNCTION_ARGS)
|
||||||
jobDirectoryName = JobDirectoryName(jobId);
|
jobDirectoryName = JobDirectoryName(jobId);
|
||||||
CitusRemoveDirectory(jobDirectoryName);
|
CitusRemoveDirectory(jobDirectoryName);
|
||||||
|
|
||||||
LockJobResource(jobId, AccessExclusiveLock);
|
|
||||||
jobSchemaName = JobSchemaName(jobId);
|
|
||||||
RemoveJobSchema(jobSchemaName);
|
RemoveJobSchema(jobSchemaName);
|
||||||
UnlockJobResource(jobId, AccessExclusiveLock);
|
UnlockJobResource(jobId, AccessExclusiveLock);
|
||||||
|
|
||||||
|
|
|
@ -100,9 +100,26 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
||||||
schemaExists = JobSchemaExists(jobSchemaName);
|
schemaExists = JobSchemaExists(jobSchemaName);
|
||||||
if (!schemaExists)
|
if (!schemaExists)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* For testing purposes, we allow merging into a table in the public schema,
|
||||||
|
* but only when running as superuser.
|
||||||
|
*/
|
||||||
|
|
||||||
|
if (!superuser())
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("job schema does not exist"),
|
||||||
|
errdetail("must be superuser to use public schema")));
|
||||||
|
}
|
||||||
|
|
||||||
resetStringInfo(jobSchemaName);
|
resetStringInfo(jobSchemaName);
|
||||||
appendStringInfoString(jobSchemaName, "public");
|
appendStringInfoString(jobSchemaName, "public");
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||||
|
|
||||||
|
EnsureSchemaOwner(schemaId);
|
||||||
|
}
|
||||||
|
|
||||||
/* create the task table and copy files into the table */
|
/* create the task table and copy files into the table */
|
||||||
columnNameList = ArrayObjectToCStringList(columnNameObject);
|
columnNameList = ArrayObjectToCStringList(columnNameObject);
|
||||||
|
@ -172,6 +189,12 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
||||||
resetStringInfo(jobSchemaName);
|
resetStringInfo(jobSchemaName);
|
||||||
appendStringInfoString(jobSchemaName, "public");
|
appendStringInfoString(jobSchemaName, "public");
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Oid schemaId = get_namespace_oid(jobSchemaName->data, false);
|
||||||
|
|
||||||
|
EnsureSchemaOwner(schemaId);
|
||||||
|
}
|
||||||
|
|
||||||
appendStringInfo(setSearchPathString, SET_SEARCH_PATH_COMMAND, jobSchemaName->data);
|
appendStringInfo(setSearchPathString, SET_SEARCH_PATH_COMMAND, jobSchemaName->data);
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ extern void CreateTruncateTrigger(Oid relationId);
|
||||||
extern char * TableOwner(Oid relationId);
|
extern char * TableOwner(Oid relationId);
|
||||||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||||
extern void EnsureTableOwner(Oid relationId);
|
extern void EnsureTableOwner(Oid relationId);
|
||||||
|
extern void EnsureSchemaOwner(Oid schemaId);
|
||||||
extern void EnsureSequenceOwner(Oid sequenceOid);
|
extern void EnsureSequenceOwner(Oid sequenceOid);
|
||||||
extern void EnsureSuperUser(void);
|
extern void EnsureSuperUser(void);
|
||||||
extern void EnsureReplicationSettings(Oid relationId, char replicationModel);
|
extern void EnsureReplicationSettings(Oid relationId, char replicationModel);
|
||||||
|
|
|
@ -130,6 +130,13 @@ SET citus.task_executor_type TO 'real-time';
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
ERROR: operation is not allowed
|
ERROR: operation is not allowed
|
||||||
HINT: Run the command with a superuser.
|
HINT: Run the command with a superuser.
|
||||||
|
-- create a task that other users should not be able to inspect
|
||||||
|
SELECT task_tracker_assign_task(1, 1, 'SELECT 1');
|
||||||
|
task_tracker_assign_task
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- check read permission
|
-- check read permission
|
||||||
SET ROLE read_access;
|
SET ROLE read_access;
|
||||||
EXECUTE prepare_insert(1);
|
EXECUTE prepare_insert(1);
|
||||||
|
@ -172,6 +179,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
ERROR: operation is not allowed
|
ERROR: operation is not allowed
|
||||||
HINT: Run the command with a superuser.
|
HINT: Run the command with a superuser.
|
||||||
|
-- should not be able to access tasks or jobs belonging to a different user
|
||||||
|
SELECT task_tracker_task_status(1, 1);
|
||||||
|
ERROR: could not find the worker task
|
||||||
|
DETAIL: Task jobId: 1 and taskId: 1
|
||||||
|
SELECT task_tracker_assign_task(1, 2, 'SELECT 1');
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0001
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0001
|
||||||
-- should not be allowed to take aggressive locks on table
|
-- should not be allowed to take aggressive locks on table
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
@ -261,6 +276,12 @@ SELECT result FROM run_command_on_workers($$SELECT tableowner FROM pg_tables WHE
|
||||||
full_access
|
full_access
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
|
task_tracker_cleanup_job
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE my_table, singleshard, test, test_coloc;
|
DROP TABLE my_table, singleshard, test, test_coloc;
|
||||||
DROP USER full_access;
|
DROP USER full_access;
|
||||||
DROP USER read_access;
|
DROP USER read_access;
|
||||||
|
|
|
@ -130,6 +130,13 @@ SET citus.task_executor_type TO 'real-time';
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
ERROR: operation is not allowed
|
ERROR: operation is not allowed
|
||||||
HINT: Run the command with a superuser.
|
HINT: Run the command with a superuser.
|
||||||
|
-- create a task that other users should not be able to inspect
|
||||||
|
SELECT task_tracker_assign_task(1, 1, 'SELECT 1');
|
||||||
|
task_tracker_assign_task
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- check read permission
|
-- check read permission
|
||||||
SET ROLE read_access;
|
SET ROLE read_access;
|
||||||
EXECUTE prepare_insert(1);
|
EXECUTE prepare_insert(1);
|
||||||
|
@ -172,6 +179,14 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
ERROR: operation is not allowed
|
ERROR: operation is not allowed
|
||||||
HINT: Run the command with a superuser.
|
HINT: Run the command with a superuser.
|
||||||
|
-- should not be able to access tasks or jobs belonging to a different user
|
||||||
|
SELECT task_tracker_task_status(1, 1);
|
||||||
|
ERROR: could not find the worker task
|
||||||
|
DETAIL: Task jobId: 1 and taskId: 1
|
||||||
|
SELECT task_tracker_assign_task(1, 2, 'SELECT 1');
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0001
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0001
|
||||||
-- should not be allowed to take aggressive locks on table
|
-- should not be allowed to take aggressive locks on table
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
@ -261,6 +276,12 @@ SELECT result FROM run_command_on_workers($$SELECT tableowner FROM pg_tables WHE
|
||||||
full_access
|
full_access
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
|
task_tracker_cleanup_job
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE my_table, singleshard, test, test_coloc;
|
DROP TABLE my_table, singleshard, test, test_coloc;
|
||||||
DROP USER full_access;
|
DROP USER full_access;
|
||||||
DROP USER read_access;
|
DROP USER read_access;
|
||||||
|
|
|
@ -90,6 +90,9 @@ SET citus.task_executor_type TO 'real-time';
|
||||||
-- should not be able to transmit directly
|
-- should not be able to transmit directly
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
|
|
||||||
|
-- create a task that other users should not be able to inspect
|
||||||
|
SELECT task_tracker_assign_task(1, 1, 'SELECT 1');
|
||||||
|
|
||||||
-- check read permission
|
-- check read permission
|
||||||
SET ROLE read_access;
|
SET ROLE read_access;
|
||||||
|
|
||||||
|
@ -109,6 +112,11 @@ SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.
|
||||||
-- should not be able to transmit directly
|
-- should not be able to transmit directly
|
||||||
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
|
||||||
|
|
||||||
|
-- should not be able to access tasks or jobs belonging to a different user
|
||||||
|
SELECT task_tracker_task_status(1, 1);
|
||||||
|
SELECT task_tracker_assign_task(1, 2, 'SELECT 1');
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
|
|
||||||
-- should not be allowed to take aggressive locks on table
|
-- should not be allowed to take aggressive locks on table
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
@ -164,6 +172,7 @@ RESET ROLE;
|
||||||
SELECT create_distributed_table('my_table', 'id');
|
SELECT create_distributed_table('my_table', 'id');
|
||||||
SELECT result FROM run_command_on_workers($$SELECT tableowner FROM pg_tables WHERE tablename LIKE 'my_table_%' LIMIT 1$$);
|
SELECT result FROM run_command_on_workers($$SELECT tableowner FROM pg_tables WHERE tablename LIKE 'my_table_%' LIMIT 1$$);
|
||||||
|
|
||||||
|
SELECT task_tracker_cleanup_job(1);
|
||||||
DROP TABLE my_table, singleshard, test, test_coloc;
|
DROP TABLE my_table, singleshard, test, test_coloc;
|
||||||
DROP USER full_access;
|
DROP USER full_access;
|
||||||
DROP USER read_access;
|
DROP USER read_access;
|
||||||
|
|
Loading…
Reference in New Issue