mirror of https://github.com/citusdata/citus.git
Fix an issue with subquery map merge jobs as non-root
Also automated all manual tests around multi user isolation for internal citus udf's automate upgrade_to_reference_table tests add negative tests for lock_relation_if_exists add tests for permissions on worker_cleanup_job_schema_cache add tests for worker_fetch_partition_file add tests for worker_merge_files_into_table fix problem with worker_merge_files_and_run_query when run as non-super user and add tests for behaviourpull/2831/head
parent
9453645860
commit
791cc26a86
|
@ -732,7 +732,7 @@ LockModeToLockModeText(LOCKMODE lockMode)
|
||||||
*
|
*
|
||||||
* The relation name should be qualified with the schema name.
|
* The relation name should be qualified with the schema name.
|
||||||
*
|
*
|
||||||
* The function errors out of the lockmode isn't defined in the PostgreSQL's
|
* The function errors out if the lockmode isn't defined in the PostgreSQL's
|
||||||
* explicit locking table.
|
* explicit locking table.
|
||||||
*/
|
*/
|
||||||
Datum
|
Datum
|
||||||
|
|
|
@ -175,6 +175,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
||||||
int createMergeTableResult = 0;
|
int createMergeTableResult = 0;
|
||||||
int createIntermediateTableResult = 0;
|
int createIntermediateTableResult = 0;
|
||||||
int finished = 0;
|
int finished = 0;
|
||||||
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
Oid userId = GetUserId();
|
Oid userId = GetUserId();
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
@ -221,11 +223,17 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS)
|
||||||
createMergeTableQuery)));
|
createMergeTableQuery)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* need superuser to copy from files */
|
||||||
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data,
|
appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data,
|
||||||
MERGE_TABLE_SUFFIX);
|
MERGE_TABLE_SUFFIX);
|
||||||
CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName,
|
CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName,
|
||||||
userId);
|
userId);
|
||||||
|
|
||||||
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0);
|
createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0);
|
||||||
if (createIntermediateTableResult < 0)
|
if (createIntermediateTableResult < 0)
|
||||||
{
|
{
|
||||||
|
|
|
@ -56,3 +56,7 @@ s/ERROR: failed to execute task [0-9]+/ERROR: failed to execute task X/g
|
||||||
|
|
||||||
# ignore WAL warnings
|
# ignore WAL warnings
|
||||||
/DEBUG: .+creating and filling new WAL file/d
|
/DEBUG: .+creating and filling new WAL file/d
|
||||||
|
|
||||||
|
# normalize file names for partitioned files
|
||||||
|
s/(task_[0-9]+\.)[0-9]+/\1xxxx/g
|
||||||
|
s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g
|
||||||
|
|
|
@ -300,6 +300,15 @@ ERROR: must be owner of table singleshard
|
||||||
-- should not be allowed to co-located tables
|
-- should not be allowed to co-located tables
|
||||||
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
||||||
ERROR: must be owner of table test
|
ERROR: must be owner of table test
|
||||||
|
-- should not be allowed to take any locks
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
ERROR: permission denied for table test
|
||||||
|
ABORT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'EXCLUSIVE');
|
||||||
|
ERROR: permission denied for table test
|
||||||
|
ABORT;
|
||||||
-- table owner should be the same on the shards, even when distributing the table as superuser
|
-- table owner should be the same on the shards, even when distributing the table as superuser
|
||||||
SET ROLE full_access;
|
SET ROLE full_access;
|
||||||
CREATE TABLE my_table (id integer, val integer);
|
CREATE TABLE my_table (id integer, val integer);
|
||||||
|
@ -427,10 +436,238 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
-- a user with all privileges on a schema should be able to upgrade a distributed table to
|
||||||
|
-- a reference table
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r1(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
|
||||||
|
upgrade_to_reference_table
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
-- the super user should be able to upgrade a distributed table to a reference table, even
|
||||||
|
-- if it is owned by another user
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r2(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
-- the usage_access should not be able to upgrade the table
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
ERROR: must be owner of table r2
|
||||||
|
RESET ROLE;
|
||||||
|
-- the super user should be able
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
upgrade_to_reference_table
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify the owner of the shards for the reference table
|
||||||
|
SELECT result FROM run_command_on_workers($cmd$
|
||||||
|
SELECT tableowner FROM pg_tables WHERE
|
||||||
|
true
|
||||||
|
AND schemaname = 'full_access_user_schema'
|
||||||
|
AND tablename LIKE 'r2_%'
|
||||||
|
LIMIT 1;
|
||||||
|
$cmd$);
|
||||||
|
result
|
||||||
|
-------------
|
||||||
|
full_access
|
||||||
|
full_access
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
worker_cleanup_job_schema_cache
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE read_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE no_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
RESET ROLE;
|
||||||
|
-- to test access to files created during repartition we will create some on worker 1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
||||||
|
worker_hash_partition_table
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- all attempts for transfer are initiated from other workers
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
-- super user should not be able to copy files created by a user
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.10": No such file or directory
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
||||||
|
-- different user should not be able to fetch partition file
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.18007": No such file or directory
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
||||||
|
-- only the user whom created the files should be able to fetch
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
worker_fetch_partition_file
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- now we will test that only the user who owns the fetched file is able to merge it into
|
||||||
|
-- a table
|
||||||
|
-- test that no other user can merge the downloaded file before the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
ERROR: job schema does not exist
|
||||||
|
DETAIL: must be superuser to use public schema
|
||||||
|
RESET ROLE;
|
||||||
|
SET ROLE full_access;
|
||||||
|
-- use the side effect of this function to have a schema to use, otherwise only the super
|
||||||
|
-- user could call worker_merge_files_into_table and store the results in public, which is
|
||||||
|
-- not what we want
|
||||||
|
SELECT task_tracker_assign_task(42, 1, 'SELECT 1');
|
||||||
|
task_tracker_assign_task
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that no other user can merge the downloaded file after the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0042
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that the super user is unable to read the contents of the intermediate file,
|
||||||
|
-- although it does create the table
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
WARNING: Task file "task_000001.18003" does not have expected suffix ".10"
|
||||||
|
worker_merge_files_into_table
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
worker_merge_files_into_table
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that no other user can merge files and run query on the already fetched files
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0042
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that the super user is unable to read the contents of the partitioned files after
|
||||||
|
-- trying to merge with run query
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
WARNING: Task file "task_000001.18003" does not have expected suffix ".10"
|
||||||
|
worker_merge_files_and_run_query
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
-- test that the owner of the task can merge files and run query correctly
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
worker_merge_files_and_run_query
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
\c - - - :master_port
|
||||||
DROP SCHEMA full_access_user_schema CASCADE;
|
DROP SCHEMA full_access_user_schema CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to 4 other objects
|
||||||
DETAIL: drop cascades to table full_access_user_schema.t1
|
DETAIL: drop cascades to table full_access_user_schema.t1
|
||||||
drop cascades to table full_access_user_schema.t2
|
drop cascades to table full_access_user_schema.t2
|
||||||
|
drop cascades to table full_access_user_schema.r1
|
||||||
|
drop cascades to table full_access_user_schema.r2
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
my_table,
|
my_table,
|
||||||
my_table_with_data,
|
my_table_with_data,
|
||||||
|
|
|
@ -300,6 +300,15 @@ ERROR: must be owner of relation singleshard
|
||||||
-- should not be allowed to co-located tables
|
-- should not be allowed to co-located tables
|
||||||
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
||||||
ERROR: must be owner of relation test
|
ERROR: must be owner of relation test
|
||||||
|
-- should not be allowed to take any locks
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
ERROR: permission denied for relation test
|
||||||
|
ABORT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'EXCLUSIVE');
|
||||||
|
ERROR: permission denied for relation test
|
||||||
|
ABORT;
|
||||||
-- table owner should be the same on the shards, even when distributing the table as superuser
|
-- table owner should be the same on the shards, even when distributing the table as superuser
|
||||||
SET ROLE full_access;
|
SET ROLE full_access;
|
||||||
CREATE TABLE my_table (id integer, val integer);
|
CREATE TABLE my_table (id integer, val integer);
|
||||||
|
@ -427,10 +436,238 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
-- a user with all privileges on a schema should be able to upgrade a distributed table to
|
||||||
|
-- a reference table
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r1(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
|
||||||
|
upgrade_to_reference_table
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
-- the super user should be able to upgrade a distributed table to a reference table, even
|
||||||
|
-- if it is owned by another user
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r2(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
-- the usage_access should not be able to upgrade the table
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
ERROR: must be owner of relation r2
|
||||||
|
RESET ROLE;
|
||||||
|
-- the super user should be able
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
upgrade_to_reference_table
|
||||||
|
----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- verify the owner of the shards for the reference table
|
||||||
|
SELECT result FROM run_command_on_workers($cmd$
|
||||||
|
SELECT tableowner FROM pg_tables WHERE
|
||||||
|
true
|
||||||
|
AND schemaname = 'full_access_user_schema'
|
||||||
|
AND tablename LIKE 'r2_%'
|
||||||
|
LIMIT 1;
|
||||||
|
$cmd$);
|
||||||
|
result
|
||||||
|
-------------
|
||||||
|
full_access
|
||||||
|
full_access
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
worker_cleanup_job_schema_cache
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE read_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
SET ROLE no_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
ERROR: permission denied for function worker_cleanup_job_schema_cache
|
||||||
|
RESET ROLE;
|
||||||
|
-- to test access to files created during repartition we will create some on worker 1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
||||||
|
worker_hash_partition_table
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- all attempts for transfer are initiated from other workers
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
-- super user should not be able to copy files created by a user
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.10": No such file or directory
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
||||||
|
-- different user should not be able to fetch partition file
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.18058": No such file or directory
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
|
||||||
|
-- only the user whom created the files should be able to fetch
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
worker_fetch_partition_file
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- now we will test that only the user who owns the fetched file is able to merge it into
|
||||||
|
-- a table
|
||||||
|
-- test that no other user can merge the downloaded file before the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
ERROR: job schema does not exist
|
||||||
|
DETAIL: must be superuser to use public schema
|
||||||
|
RESET ROLE;
|
||||||
|
SET ROLE full_access;
|
||||||
|
-- use the side effect of this function to have a schema to use, otherwise only the super
|
||||||
|
-- user could call worker_merge_files_into_table and store the results in public, which is
|
||||||
|
-- not what we want
|
||||||
|
SELECT task_tracker_assign_task(42, 1, 'SELECT 1');
|
||||||
|
task_tracker_assign_task
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that no other user can merge the downloaded file after the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0042
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that the super user is unable to read the contents of the intermediate file,
|
||||||
|
-- although it does create the table
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
WARNING: Task file "task_000001.18054" does not have expected suffix ".10"
|
||||||
|
worker_merge_files_into_table
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
worker_merge_files_into_table
|
||||||
|
-------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that no other user can merge files and run query on the already fetched files
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
ERROR: must be owner of schema pg_merge_job_0042
|
||||||
|
RESET ROLE;
|
||||||
|
-- test that the super user is unable to read the contents of the partitioned files after
|
||||||
|
-- trying to merge with run query
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
WARNING: Task file "task_000001.18054" does not have expected suffix ".10"
|
||||||
|
worker_merge_files_and_run_query
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
-- test that the owner of the task can merge files and run query correctly
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
worker_merge_files_and_run_query
|
||||||
|
----------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
\c - - - :master_port
|
||||||
DROP SCHEMA full_access_user_schema CASCADE;
|
DROP SCHEMA full_access_user_schema CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to 4 other objects
|
||||||
DETAIL: drop cascades to table full_access_user_schema.t1
|
DETAIL: drop cascades to table full_access_user_schema.t1
|
||||||
drop cascades to table full_access_user_schema.t2
|
drop cascades to table full_access_user_schema.t2
|
||||||
|
drop cascades to table full_access_user_schema.r1
|
||||||
|
drop cascades to table full_access_user_schema.r2
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
my_table,
|
my_table,
|
||||||
my_table_with_data,
|
my_table_with_data,
|
||||||
|
|
|
@ -197,6 +197,14 @@ SELECT upgrade_to_reference_table('singleshard');
|
||||||
-- should not be allowed to co-located tables
|
-- should not be allowed to co-located tables
|
||||||
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
||||||
|
|
||||||
|
-- should not be allowed to take any locks
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'ACCESS SHARE');
|
||||||
|
ABORT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT lock_relation_if_exists('test', 'EXCLUSIVE');
|
||||||
|
ABORT;
|
||||||
|
|
||||||
-- table owner should be the same on the shards, even when distributing the table as superuser
|
-- table owner should be the same on the shards, even when distributing the table as superuser
|
||||||
SET ROLE full_access;
|
SET ROLE full_access;
|
||||||
CREATE TABLE my_table (id integer, val integer);
|
CREATE TABLE my_table (id integer, val integer);
|
||||||
|
@ -277,6 +285,140 @@ CREATE TABLE full_access_user_schema.t2(id int);
|
||||||
SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- a user with all privileges on a schema should be able to upgrade a distributed table to
|
||||||
|
-- a reference table
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r1(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- the super user should be able to upgrade a distributed table to a reference table, even
|
||||||
|
-- if it is owned by another user
|
||||||
|
SET ROLE full_access;
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE full_access_user_schema.r2(id int);
|
||||||
|
SET LOCAL citus.shard_count TO 1;
|
||||||
|
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
|
||||||
|
COMMIT;
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- the usage_access should not be able to upgrade the table
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- the super user should be able
|
||||||
|
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||||
|
|
||||||
|
-- verify the owner of the shards for the reference table
|
||||||
|
SELECT result FROM run_command_on_workers($cmd$
|
||||||
|
SELECT tableowner FROM pg_tables WHERE
|
||||||
|
true
|
||||||
|
AND schemaname = 'full_access_user_schema'
|
||||||
|
AND tablename LIKE 'r2_%'
|
||||||
|
LIMIT 1;
|
||||||
|
$cmd$);
|
||||||
|
|
||||||
|
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
SET ROLE read_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
SET ROLE no_access;
|
||||||
|
SELECT worker_cleanup_job_schema_cache();
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- to test access to files created during repartition we will create some on worker 1
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- all attempts for transfer are initiated from other workers
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
-- super user should not be able to copy files created by a user
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
|
||||||
|
-- different user should not be able to fetch partition file
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
|
||||||
|
-- only the user whom created the files should be able to fetch
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- now we will test that only the user who owns the fetched file is able to merge it into
|
||||||
|
-- a table
|
||||||
|
-- test that no other user can merge the downloaded file before the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
SET ROLE full_access;
|
||||||
|
-- use the side effect of this function to have a schema to use, otherwise only the super
|
||||||
|
-- user could call worker_merge_files_into_table and store the results in public, which is
|
||||||
|
-- not what we want
|
||||||
|
SELECT task_tracker_assign_task(42, 1, 'SELECT 1');
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- test that no other user can merge the downloaded file after the task is being tracked
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- test that the super user is unable to read the contents of the intermediate file,
|
||||||
|
-- although it does create the table
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- test that no other user can merge files and run query on the already fetched files
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- test that the super user is unable to read the contents of the partitioned files after
|
||||||
|
-- trying to merge with run query
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
|
||||||
|
-- test that the owner of the task can merge files and run query correctly
|
||||||
|
SET ROLE full_access;
|
||||||
|
SELECT worker_merge_files_and_run_query(42, 1,
|
||||||
|
'CREATE TABLE task_000001_merge(merge_column_0 int)',
|
||||||
|
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
|
||||||
|
);
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001_merge;
|
||||||
|
SELECT count(*) FROM pg_merge_job_0042.task_000001;
|
||||||
|
DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
|
||||||
DROP SCHEMA full_access_user_schema CASCADE;
|
DROP SCHEMA full_access_user_schema CASCADE;
|
||||||
DROP TABLE
|
DROP TABLE
|
||||||
my_table,
|
my_table,
|
||||||
|
|
Loading…
Reference in New Issue