From 791cc26a860640e4476ac906ec875baba3dd7359 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Mon, 8 Jul 2019 14:47:27 +0200 Subject: [PATCH] Fix an issue with subquery map merge jobs as non-root Also automated all manual tests around multi user isolation for internal citus udf's automate upgrade_to_reference_table tests add negative tests for lock_relation_if_exists add tests for permissions on worker_cleanup_job_schema_cache add tests for worker_fetch_partition_file add tests for worker_merge_files_into_table fix problem with worker_merge_files_and_run_query when run as non-super user and add tests for behaviour --- src/backend/distributed/utils/resource_lock.c | 2 +- .../worker/worker_merge_protocol.c | 8 + src/test/regress/bin/normalize.sed | 4 + src/test/regress/expected/multi_multiuser.out | 239 +++++++++++++++++- .../regress/expected/multi_multiuser_0.out | 239 +++++++++++++++++- src/test/regress/sql/multi_multiuser.sql | 142 +++++++++++ 6 files changed, 631 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 57a4d7ccc..b04ecc66b 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -732,7 +732,7 @@ LockModeToLockModeText(LOCKMODE lockMode) * * The relation name should be qualified with the schema name. * - * The function errors out of the lockmode isn't defined in the PostgreSQL's + * The function errors out if the lockmode isn't defined in the PostgreSQL's * explicit locking table. */ Datum diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index 3505fd246..07012516d 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -175,6 +175,8 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) int createMergeTableResult = 0; int createIntermediateTableResult = 0; int finished = 0; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; Oid userId = GetUserId(); CheckCitusVersion(ERROR); @@ -221,11 +223,17 @@ worker_merge_files_and_run_query(PG_FUNCTION_ARGS) createMergeTableQuery))); } + /* need superuser to copy from files */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + appendStringInfo(mergeTableName, "%s%s", intermediateTableName->data, MERGE_TABLE_SUFFIX); CopyTaskFilesFromDirectory(jobSchemaName, mergeTableName, taskDirectoryName, userId); + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + createIntermediateTableResult = SPI_exec(createIntermediateTableQuery, 0); if (createIntermediateTableResult < 0) { diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 12f764caa..856c2146a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -56,3 +56,7 @@ s/ERROR: failed to execute task [0-9]+/ERROR: failed to execute task X/g # ignore WAL warnings /DEBUG: .+creating and filling new WAL file/d + +# normalize file names for partitioned files +s/(task_[0-9]+\.)[0-9]+/\1xxxx/g +s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index 966a0999f..bd4eb312f 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -300,6 +300,15 @@ ERROR: must be owner of table singleshard -- should not be allowed to co-located tables SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]); ERROR: must be owner of table test +-- should not be allowed to take any locks +BEGIN; +SELECT lock_relation_if_exists('test', 'ACCESS SHARE'); +ERROR: permission denied for table test +ABORT; +BEGIN; +SELECT lock_relation_if_exists('test', 'EXCLUSIVE'); +ERROR: permission denied for table test +ABORT; -- table owner should be the same on the shards, even when distributing the table as superuser SET ROLE full_access; CREATE TABLE my_table (id integer, val integer); @@ -427,10 +436,238 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id'); (1 row) RESET ROLE; +-- a user with all privileges on a schema should be able to upgrade a distributed table to +-- a reference table +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r1(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('full_access_user_schema.r1'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +COMMIT; +RESET ROLE; +-- the super user should be able to upgrade a distributed table to a reference table, even +-- if it is owned by another user +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r2(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +RESET ROLE; +-- the usage_access should not be able to upgrade the table +SET ROLE usage_access; +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); +ERROR: must be owner of table r2 +RESET ROLE; +-- the super user should be able +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- verify the owner of the shards for the reference table +SELECT result FROM run_command_on_workers($cmd$ + SELECT tableowner FROM pg_tables WHERE + true + AND schemaname = 'full_access_user_schema' + AND tablename LIKE 'r2_%' + LIMIT 1; +$cmd$); + result +------------- + full_access + full_access +(2 rows) + +-- super user should be the only one being able to call worker_cleanup_job_schema_cache +SELECT worker_cleanup_job_schema_cache(); + worker_cleanup_job_schema_cache +--------------------------------- + +(1 row) + +SET ROLE full_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE usage_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE read_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE no_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +RESET ROLE; +-- to test access to files created during repartition we will create some on worker 1 +\c - - - :worker_1_port +SET ROLE full_access; +SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); + worker_hash_partition_table +----------------------------- + +(1 row) + +RESET ROLE; +-- all attempts for transfer are initiated from other workers +\c - - - :worker_2_port +-- super user should not be able to copy files created by a user +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.10": No such file or directory +CONTEXT: while executing command on localhost:57637 +ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 +-- different user should not be able to fetch partition file +SET ROLE usage_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.18007": No such file or directory +CONTEXT: while executing command on localhost:57637 +ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 +-- only the user whom created the files should be able to fetch +SET ROLE full_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); + worker_fetch_partition_file +----------------------------- + +(1 row) + +RESET ROLE; +-- now we will test that only the user who owns the fetched file is able to merge it into +-- a table +-- test that no other user can merge the downloaded file before the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +ERROR: job schema does not exist +DETAIL: must be superuser to use public schema +RESET ROLE; +SET ROLE full_access; +-- use the side effect of this function to have a schema to use, otherwise only the super +-- user could call worker_merge_files_into_table and store the results in public, which is +-- not what we want +SELECT task_tracker_assign_task(42, 1, 'SELECT 1'); + task_tracker_assign_task +-------------------------- + +(1 row) + +RESET ROLE; +-- test that no other user can merge the downloaded file after the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +ERROR: must be owner of schema pg_merge_job_0042 +RESET ROLE; +-- test that the super user is unable to read the contents of the intermediate file, +-- although it does create the table +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +WARNING: Task file "task_000001.18003" does not have expected suffix ".10" + worker_merge_files_into_table +------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 0 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests +SET ROLE full_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); + worker_merge_files_into_table +------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 25 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests +RESET ROLE; +-- test that no other user can merge files and run query on the already fetched files +SET ROLE usage_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +ERROR: must be owner of schema pg_merge_job_0042 +RESET ROLE; +-- test that the super user is unable to read the contents of the partitioned files after +-- trying to merge with run query +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +WARNING: Task file "task_000001.18003" does not have expected suffix ".10" + worker_merge_files_and_run_query +---------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 1 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests +-- test that the owner of the task can merge files and run query correctly +SET ROLE full_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); + worker_merge_files_and_run_query +---------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; + count +------- + 25 +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 1 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests +RESET ROLE; +\c - - - :master_port DROP SCHEMA full_access_user_schema CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table full_access_user_schema.t1 drop cascades to table full_access_user_schema.t2 +drop cascades to table full_access_user_schema.r1 +drop cascades to table full_access_user_schema.r2 DROP TABLE my_table, my_table_with_data, diff --git a/src/test/regress/expected/multi_multiuser_0.out b/src/test/regress/expected/multi_multiuser_0.out index 32723752c..9759d0fe6 100644 --- a/src/test/regress/expected/multi_multiuser_0.out +++ b/src/test/regress/expected/multi_multiuser_0.out @@ -300,6 +300,15 @@ ERROR: must be owner of relation singleshard -- should not be allowed to co-located tables SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]); ERROR: must be owner of relation test +-- should not be allowed to take any locks +BEGIN; +SELECT lock_relation_if_exists('test', 'ACCESS SHARE'); +ERROR: permission denied for relation test +ABORT; +BEGIN; +SELECT lock_relation_if_exists('test', 'EXCLUSIVE'); +ERROR: permission denied for relation test +ABORT; -- table owner should be the same on the shards, even when distributing the table as superuser SET ROLE full_access; CREATE TABLE my_table (id integer, val integer); @@ -427,10 +436,238 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id'); (1 row) RESET ROLE; +-- a user with all privileges on a schema should be able to upgrade a distributed table to +-- a reference table +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r1(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r1', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT upgrade_to_reference_table('full_access_user_schema.r1'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +COMMIT; +RESET ROLE; +-- the super user should be able to upgrade a distributed table to a reference table, even +-- if it is owned by another user +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r2(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r2', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +RESET ROLE; +-- the usage_access should not be able to upgrade the table +SET ROLE usage_access; +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); +ERROR: must be owner of relation r2 +RESET ROLE; +-- the super user should be able +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); + upgrade_to_reference_table +---------------------------- + +(1 row) + +-- verify the owner of the shards for the reference table +SELECT result FROM run_command_on_workers($cmd$ + SELECT tableowner FROM pg_tables WHERE + true + AND schemaname = 'full_access_user_schema' + AND tablename LIKE 'r2_%' + LIMIT 1; +$cmd$); + result +------------- + full_access + full_access +(2 rows) + +-- super user should be the only one being able to call worker_cleanup_job_schema_cache +SELECT worker_cleanup_job_schema_cache(); + worker_cleanup_job_schema_cache +--------------------------------- + +(1 row) + +SET ROLE full_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE usage_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE read_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +SET ROLE no_access; +SELECT worker_cleanup_job_schema_cache(); +ERROR: permission denied for function worker_cleanup_job_schema_cache +RESET ROLE; +-- to test access to files created during repartition we will create some on worker 1 +\c - - - :worker_1_port +SET ROLE full_access; +SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); + worker_hash_partition_table +----------------------------- + +(1 row) + +RESET ROLE; +-- all attempts for transfer are initiated from other workers +\c - - - :worker_2_port +-- super user should not be able to copy files created by a user +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.10": No such file or directory +CONTEXT: while executing command on localhost:57637 +ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 +-- different user should not be able to fetch partition file +SET ROLE usage_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); +WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.18058": No such file or directory +CONTEXT: while executing command on localhost:57637 +ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637 +-- only the user whom created the files should be able to fetch +SET ROLE full_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); + worker_fetch_partition_file +----------------------------- + +(1 row) + +RESET ROLE; +-- now we will test that only the user who owns the fetched file is able to merge it into +-- a table +-- test that no other user can merge the downloaded file before the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +ERROR: job schema does not exist +DETAIL: must be superuser to use public schema +RESET ROLE; +SET ROLE full_access; +-- use the side effect of this function to have a schema to use, otherwise only the super +-- user could call worker_merge_files_into_table and store the results in public, which is +-- not what we want +SELECT task_tracker_assign_task(42, 1, 'SELECT 1'); + task_tracker_assign_task +-------------------------- + +(1 row) + +RESET ROLE; +-- test that no other user can merge the downloaded file after the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +ERROR: must be owner of schema pg_merge_job_0042 +RESET ROLE; +-- test that the super user is unable to read the contents of the intermediate file, +-- although it does create the table +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +WARNING: Task file "task_000001.18054" does not have expected suffix ".10" + worker_merge_files_into_table +------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 0 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests +SET ROLE full_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); + worker_merge_files_into_table +------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 25 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests +RESET ROLE; +-- test that no other user can merge files and run query on the already fetched files +SET ROLE usage_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +ERROR: must be owner of schema pg_merge_job_0042 +RESET ROLE; +-- test that the super user is unable to read the contents of the partitioned files after +-- trying to merge with run query +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +WARNING: Task file "task_000001.18054" does not have expected suffix ".10" + worker_merge_files_and_run_query +---------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; + count +------- + 0 +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 1 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests +-- test that the owner of the task can merge files and run query correctly +SET ROLE full_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); + worker_merge_files_and_run_query +---------------------------------- + +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; + count +------- + 25 +(1 row) + +SELECT count(*) FROM pg_merge_job_0042.task_000001; + count +------- + 1 +(1 row) + +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests +RESET ROLE; +\c - - - :master_port DROP SCHEMA full_access_user_schema CASCADE; -NOTICE: drop cascades to 2 other objects +NOTICE: drop cascades to 4 other objects DETAIL: drop cascades to table full_access_user_schema.t1 drop cascades to table full_access_user_schema.t2 +drop cascades to table full_access_user_schema.r1 +drop cascades to table full_access_user_schema.r2 DROP TABLE my_table, my_table_with_data, diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index d709dc61d..6ed300ae2 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -197,6 +197,14 @@ SELECT upgrade_to_reference_table('singleshard'); -- should not be allowed to co-located tables SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]); +-- should not be allowed to take any locks +BEGIN; +SELECT lock_relation_if_exists('test', 'ACCESS SHARE'); +ABORT; +BEGIN; +SELECT lock_relation_if_exists('test', 'EXCLUSIVE'); +ABORT; + -- table owner should be the same on the shards, even when distributing the table as superuser SET ROLE full_access; CREATE TABLE my_table (id integer, val integer); @@ -277,6 +285,140 @@ CREATE TABLE full_access_user_schema.t2(id int); SELECT create_distributed_table('full_access_user_schema.t2', 'id'); RESET ROLE; +-- a user with all privileges on a schema should be able to upgrade a distributed table to +-- a reference table +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r1(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r1', 'id'); +SELECT upgrade_to_reference_table('full_access_user_schema.r1'); +COMMIT; +RESET ROLE; + +-- the super user should be able to upgrade a distributed table to a reference table, even +-- if it is owned by another user +SET ROLE full_access; +BEGIN; +CREATE TABLE full_access_user_schema.r2(id int); +SET LOCAL citus.shard_count TO 1; +SELECT create_distributed_table('full_access_user_schema.r2', 'id'); +COMMIT; +RESET ROLE; + +-- the usage_access should not be able to upgrade the table +SET ROLE usage_access; +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); +RESET ROLE; + +-- the super user should be able +SELECT upgrade_to_reference_table('full_access_user_schema.r2'); + +-- verify the owner of the shards for the reference table +SELECT result FROM run_command_on_workers($cmd$ + SELECT tableowner FROM pg_tables WHERE + true + AND schemaname = 'full_access_user_schema' + AND tablename LIKE 'r2_%' + LIMIT 1; +$cmd$); + +-- super user should be the only one being able to call worker_cleanup_job_schema_cache +SELECT worker_cleanup_job_schema_cache(); +SET ROLE full_access; +SELECT worker_cleanup_job_schema_cache(); +SET ROLE usage_access; +SELECT worker_cleanup_job_schema_cache(); +SET ROLE read_access; +SELECT worker_cleanup_job_schema_cache(); +SET ROLE no_access; +SELECT worker_cleanup_job_schema_cache(); +RESET ROLE; + +-- to test access to files created during repartition we will create some on worker 1 +\c - - - :worker_1_port +SET ROLE full_access; +SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[-2147483648, -1073741824, 0, 1073741824]::int4[]); +RESET ROLE; + +-- all attempts for transfer are initiated from other workers + +\c - - - :worker_2_port +-- super user should not be able to copy files created by a user +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); + +-- different user should not be able to fetch partition file +SET ROLE usage_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); + +-- only the user whom created the files should be able to fetch +SET ROLE full_access; +SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); +RESET ROLE; + +-- now we will test that only the user who owns the fetched file is able to merge it into +-- a table +-- test that no other user can merge the downloaded file before the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +RESET ROLE; + +SET ROLE full_access; +-- use the side effect of this function to have a schema to use, otherwise only the super +-- user could call worker_merge_files_into_table and store the results in public, which is +-- not what we want +SELECT task_tracker_assign_task(42, 1, 'SELECT 1'); +RESET ROLE; + +-- test that no other user can merge the downloaded file after the task is being tracked +SET ROLE usage_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +RESET ROLE; + +-- test that the super user is unable to read the contents of the intermediate file, +-- although it does create the table +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +SELECT count(*) FROM pg_merge_job_0042.task_000001; +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests + +SET ROLE full_access; +SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']); +SELECT count(*) FROM pg_merge_job_0042.task_000001; +DROP TABLE pg_merge_job_0042.task_000001; -- drop table so we can reuse the same files for more tests +RESET ROLE; + +-- test that no other user can merge files and run query on the already fetched files +SET ROLE usage_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +RESET ROLE; + +-- test that the super user is unable to read the contents of the partitioned files after +-- trying to merge with run query +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; +SELECT count(*) FROM pg_merge_job_0042.task_000001; +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests + +-- test that the owner of the task can merge files and run query correctly +SET ROLE full_access; +SELECT worker_merge_files_and_run_query(42, 1, + 'CREATE TABLE task_000001_merge(merge_column_0 int)', + 'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge' +); +SELECT count(*) FROM pg_merge_job_0042.task_000001_merge; +SELECT count(*) FROM pg_merge_job_0042.task_000001; +DROP TABLE pg_merge_job_0042.task_000001, pg_merge_job_0042.task_000001_merge; -- drop table so we can reuse the same files for more tests +RESET ROLE; + +\c - - - :master_port + + DROP SCHEMA full_access_user_schema CASCADE; DROP TABLE my_table,