From 4faa49775bc05ce5ed58c1fc5998234e37bd4d5b Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 7 Sep 2021 17:53:15 +0200 Subject: [PATCH 1/3] Perform copy command as regular user in worker_append_table_to_shard --- src/backend/distributed/utils/resource_lock.c | 2 - .../worker/worker_data_fetch_protocol.c | 52 +++++++++++++------ src/include/distributed/worker_protocol.h | 1 - 3 files changed, 37 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 668436092..f3c6b449f 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -577,8 +577,6 @@ LockShardResource(uint64 shardId, LOCKMODE lockmode) const bool sessionLock = false; const bool dontWait = false; - AssertArg(shardId != INVALID_SHARD_ID); - SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId); (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 6a7581bef..187eeee05 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -28,13 +28,14 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" -#include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" +#include "distributed/intermediate_results.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" -#include "distributed/commands/multi_copy.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" @@ -45,6 +46,7 @@ #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "nodes/makefuncs.h" +#include "parser/parse_relation.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" @@ -594,9 +596,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) char *sourceSchemaName = NULL; char *sourceTableName = NULL; - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); @@ -611,10 +610,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) uint64 shardId = ExtractShardIdFromTableName(shardTableName, false); LockShardResource(shardId, AccessExclusiveLock); - /* copy remote table's data to this node */ + /* + * Copy into intermediate results directory, which is automatically cleaned on + * error. + */ StringInfo localFilePath = makeStringInfo(); - appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, - PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); + appendStringInfo(localFilePath, "%s/worker_append_table_to_shard_" UINT64_FORMAT, + CreateIntermediateResultsDirectory(), shardId); char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName); @@ -639,7 +641,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); } - bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, + char *userName = CurrentUserName(); + bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, userName, sourceCopyCommand, localFilePath); if (!received) @@ -662,17 +665,36 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) /* make sure we are allowed to execute the COPY command */ CheckCopyPermissions(localCopyCommand); - /* need superuser to copy from files */ - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + Relation shardRelation = table_openrv(localCopyCommand->relation, RowExclusiveLock); - ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data, - PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL); + /* mimic check from copy.c */ + if (XactReadOnly && !shardRelation->rd_islocaltemp) + { + PreventCommandIfReadOnly("COPY FROM"); + } - SetUserIdAndSecContext(savedUserId, savedSecurityContext); + ParseState *parseState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(parseState, shardRelation, RowExclusiveLock, + NULL, false, false); + + CopyFromState copyState = BeginCopyFrom_compat(parseState, + shardRelation, + NULL, + localCopyCommand->filename, + localCopyCommand->is_program, + NULL, + localCopyCommand->attlist, + localCopyCommand->options); + + + CopyFrom(copyState); + EndCopyFrom(copyState); + + free_parsestate(parseState); /* finally delete the temporary file we created */ CitusDeleteFile(localFilePath->data); + table_close(shardRelation, NoLock); PG_RETURN_VOID(); } diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index e9d88f411..b91d64184 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -35,7 +35,6 @@ #define JOB_SCHEMA_PREFIX "pg_merge_job_" #define TASK_FILE_PREFIX "task_" #define TASK_TABLE_PREFIX "task_" -#define TABLE_FILE_PREFIX "table_" #define PARTITION_FILE_PREFIX "p_" #define ATTEMPT_FILE_SUFFIX ".attempt" #define MERGE_TABLE_SUFFIX "_merge" From 04388e13b0b2154a4a625e67d9e0d2ab68d6bba4 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 8 Sep 2021 15:52:47 +0200 Subject: [PATCH 2/3] Add worker_append_table_to_shard permissions tests --- src/test/regress/expected/multi_multiuser.out | 33 +++++++++++++++++++ src/test/regress/sql/multi_multiuser.sql | 24 ++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index 5853670aa..b3b60e8dd 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -668,6 +668,39 @@ SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); (1 row) RESET ROLE; +-- non-superuser should be able to use worker_append_table_to_shard on their own shard +SET ROLE full_access; +CREATE TABLE full_access_user_schema.source_table (id int); +INSERT INTO full_access_user_schema.source_table VALUES (1); +CREATE TABLE full_access_user_schema.shard_0 (id int); +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); + worker_append_table_to_shard +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM full_access_user_schema.shard_0; + id +--------------------------------------------------------------------- + 1 +(1 row) + +RESET ROLE; +-- other users should not be able to read from a table they have no access to via worker_append_table_to_shard +SET ROLE usage_access; +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); +WARNING: permission denied for table source_table +CONTEXT: while executing command on localhost:xxxxx +ERROR: could not copy table "source_table" from "localhost:xxxxx" +RESET ROLE; +-- allow usage_access to read from table +GRANT SELECT ON full_access_user_schema.source_table TO usage_access; +-- other users should not be able to write to a table they do not have write access to +SET ROLE usage_access; +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); +ERROR: permission denied for table shard_0 +RESET ROLE; +DROP TABLE full_access_user_schema.source_table, full_access_user_schema.shard_0; -- now we will test that only the user who owns the fetched file is able to merge it into -- a table -- test that no other user can merge the downloaded file before the task is being tracked diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index 4ea32e09c..a6cd9e94c 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -409,6 +409,30 @@ SET ROLE full_access; SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port); RESET ROLE; +-- non-superuser should be able to use worker_append_table_to_shard on their own shard +SET ROLE full_access; +CREATE TABLE full_access_user_schema.source_table (id int); +INSERT INTO full_access_user_schema.source_table VALUES (1); +CREATE TABLE full_access_user_schema.shard_0 (id int); +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); +SELECT * FROM full_access_user_schema.shard_0; +RESET ROLE; + +-- other users should not be able to read from a table they have no access to via worker_append_table_to_shard +SET ROLE usage_access; +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); +RESET ROLE; + +-- allow usage_access to read from table +GRANT SELECT ON full_access_user_schema.source_table TO usage_access; + +-- other users should not be able to write to a table they do not have write access to +SET ROLE usage_access; +SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port); +RESET ROLE; + +DROP TABLE full_access_user_schema.source_table, full_access_user_schema.shard_0; + -- now we will test that only the user who owns the fetched file is able to merge it into -- a table -- test that no other user can merge the downloaded file before the task is being tracked From f84164a000565a437467b10fb6d3a3235a53cfa9 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 9 Sep 2021 10:59:05 +0200 Subject: [PATCH 3/3] Avoid switch to superuser in worker_merge_files_into_table --- .../worker/worker_merge_protocol.c | 34 ++++++++++++------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/src/backend/distributed/worker/worker_merge_protocol.c b/src/backend/distributed/worker/worker_merge_protocol.c index a539a9c90..577a42a9b 100644 --- a/src/backend/distributed/worker/worker_merge_protocol.c +++ b/src/backend/distributed/worker/worker_merge_protocol.c @@ -35,6 +35,7 @@ #include "executor/spi.h" #include "nodes/makefuncs.h" +#include "parser/parse_relation.h" #include "parser/parse_type.h" #include "storage/lmgr.h" #include "utils/acl.h" @@ -183,8 +184,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) StringInfo jobSchemaName = JobSchemaName(jobId); StringInfo taskTableName = TaskTableName(taskId); StringInfo taskDirectoryName = TaskDirectoryName(jobId, taskId); - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; Oid userId = GetUserId(); /* we should have the same number of column names and types */ @@ -231,14 +230,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS) CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList); - /* need superuser to copy from files */ - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); - CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName, userId); - SetUserIdAndSecContext(savedUserId, savedSecurityContext); PG_RETURN_VOID(); } @@ -557,8 +551,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename); /* build relation object and copy statement */ - RangeVar *relation = makeRangeVar(schemaName->data, relationName->data, -1); - CopyStmt *copyStatement = CopyStatement(relation, fullFilename->data); + RangeVar *rangeVar = makeRangeVar(schemaName->data, relationName->data, -1); + CopyStmt *copyStatement = CopyStatement(rangeVar, fullFilename->data); if (BinaryWorkerCopyFormat) { DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"), @@ -567,12 +561,26 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName, } { - ParseState *pstate = make_parsestate(NULL); - pstate->p_sourcetext = queryString; + ParseState *parseState = make_parsestate(NULL); + parseState->p_sourcetext = queryString; - DoCopy(pstate, copyStatement, -1, -1, &copiedRowCount); + Relation relation = table_openrv(rangeVar, RowExclusiveLock); + (void) addRangeTableEntryForRelation(parseState, relation, RowExclusiveLock, + NULL, false, false); - free_parsestate(pstate); + CopyFromState copyState = BeginCopyFrom_compat(parseState, + relation, + NULL, + copyStatement->filename, + copyStatement->is_program, + NULL, + copyStatement->attlist, + copyStatement->options); + copiedRowCount = CopyFrom(copyState); + EndCopyFrom(copyState); + + free_parsestate(parseState); + table_close(relation, NoLock); } copiedRowTotal += copiedRowCount;