mirror of https://github.com/citusdata/citus.git
Merge pull request #5256 from citusdata/marcocitus/worker_append_table_to_shard
Perform copy command as regular user in worker_append_table_to_shardpull/5234/head
commit
b3f1a94688
|
@ -577,8 +577,6 @@ LockShardResource(uint64 shardId, LOCKMODE lockmode)
|
||||||
const bool sessionLock = false;
|
const bool sessionLock = false;
|
||||||
const bool dontWait = false;
|
const bool dontWait = false;
|
||||||
|
|
||||||
AssertArg(shardId != INVALID_SHARD_ID);
|
|
||||||
|
|
||||||
SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId);
|
SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId);
|
||||||
|
|
||||||
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
||||||
|
|
|
@ -28,13 +28,14 @@
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "commands/sequence.h"
|
#include "commands/sequence.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/listutils.h"
|
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
|
||||||
#include "distributed/multi_logical_optimizer.h"
|
#include "distributed/multi_logical_optimizer.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -45,6 +46,7 @@
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "parser/parse_relation.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "tcop/tcopprot.h"
|
#include "tcop/tcopprot.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
|
@ -594,9 +596,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
char *sourceSchemaName = NULL;
|
char *sourceSchemaName = NULL;
|
||||||
char *sourceTableName = NULL;
|
char *sourceTableName = NULL;
|
||||||
|
|
||||||
Oid savedUserId = InvalidOid;
|
|
||||||
int savedSecurityContext = 0;
|
|
||||||
|
|
||||||
/* We extract schema names and table names from qualified names */
|
/* We extract schema names and table names from qualified names */
|
||||||
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
|
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
|
||||||
|
|
||||||
|
@ -611,10 +610,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
uint64 shardId = ExtractShardIdFromTableName(shardTableName, false);
|
uint64 shardId = ExtractShardIdFromTableName(shardTableName, false);
|
||||||
LockShardResource(shardId, AccessExclusiveLock);
|
LockShardResource(shardId, AccessExclusiveLock);
|
||||||
|
|
||||||
/* copy remote table's data to this node */
|
/*
|
||||||
|
* Copy into intermediate results directory, which is automatically cleaned on
|
||||||
|
* error.
|
||||||
|
*/
|
||||||
StringInfo localFilePath = makeStringInfo();
|
StringInfo localFilePath = makeStringInfo();
|
||||||
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
|
appendStringInfo(localFilePath, "%s/worker_append_table_to_shard_" UINT64_FORMAT,
|
||||||
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
|
CreateIntermediateResultsDirectory(), shardId);
|
||||||
|
|
||||||
char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName,
|
char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName,
|
||||||
sourceTableName);
|
sourceTableName);
|
||||||
|
@ -639,7 +641,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL,
|
char *userName = CurrentUserName();
|
||||||
|
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, userName,
|
||||||
sourceCopyCommand,
|
sourceCopyCommand,
|
||||||
localFilePath);
|
localFilePath);
|
||||||
if (!received)
|
if (!received)
|
||||||
|
@ -662,17 +665,36 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
/* make sure we are allowed to execute the COPY command */
|
/* make sure we are allowed to execute the COPY command */
|
||||||
CheckCopyPermissions(localCopyCommand);
|
CheckCopyPermissions(localCopyCommand);
|
||||||
|
|
||||||
/* need superuser to copy from files */
|
Relation shardRelation = table_openrv(localCopyCommand->relation, RowExclusiveLock);
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
|
||||||
|
|
||||||
ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data,
|
/* mimic check from copy.c */
|
||||||
PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL);
|
if (XactReadOnly && !shardRelation->rd_islocaltemp)
|
||||||
|
{
|
||||||
|
PreventCommandIfReadOnly("COPY FROM");
|
||||||
|
}
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
ParseState *parseState = make_parsestate(NULL);
|
||||||
|
(void) addRangeTableEntryForRelation(parseState, shardRelation, RowExclusiveLock,
|
||||||
|
NULL, false, false);
|
||||||
|
|
||||||
|
CopyFromState copyState = BeginCopyFrom_compat(parseState,
|
||||||
|
shardRelation,
|
||||||
|
NULL,
|
||||||
|
localCopyCommand->filename,
|
||||||
|
localCopyCommand->is_program,
|
||||||
|
NULL,
|
||||||
|
localCopyCommand->attlist,
|
||||||
|
localCopyCommand->options);
|
||||||
|
|
||||||
|
|
||||||
|
CopyFrom(copyState);
|
||||||
|
EndCopyFrom(copyState);
|
||||||
|
|
||||||
|
free_parsestate(parseState);
|
||||||
|
|
||||||
/* finally delete the temporary file we created */
|
/* finally delete the temporary file we created */
|
||||||
CitusDeleteFile(localFilePath->data);
|
CitusDeleteFile(localFilePath->data);
|
||||||
|
table_close(shardRelation, NoLock);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@
|
||||||
|
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "parser/parse_relation.h"
|
||||||
#include "parser/parse_type.h"
|
#include "parser/parse_type.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "utils/acl.h"
|
#include "utils/acl.h"
|
||||||
|
@ -183,8 +184,6 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
||||||
StringInfo jobSchemaName = JobSchemaName(jobId);
|
StringInfo jobSchemaName = JobSchemaName(jobId);
|
||||||
StringInfo taskTableName = TaskTableName(taskId);
|
StringInfo taskTableName = TaskTableName(taskId);
|
||||||
StringInfo taskDirectoryName = TaskDirectoryName(jobId, taskId);
|
StringInfo taskDirectoryName = TaskDirectoryName(jobId, taskId);
|
||||||
Oid savedUserId = InvalidOid;
|
|
||||||
int savedSecurityContext = 0;
|
|
||||||
Oid userId = GetUserId();
|
Oid userId = GetUserId();
|
||||||
|
|
||||||
/* we should have the same number of column names and types */
|
/* we should have the same number of column names and types */
|
||||||
|
@ -231,14 +230,9 @@ worker_merge_files_into_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList);
|
CreateTaskTable(jobSchemaName, taskTableName, columnNameList, columnTypeList);
|
||||||
|
|
||||||
/* need superuser to copy from files */
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
|
||||||
|
|
||||||
CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName,
|
CopyTaskFilesFromDirectory(jobSchemaName, taskTableName, taskDirectoryName,
|
||||||
userId);
|
userId);
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,8 +551,8 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
appendStringInfo(fullFilename, "%s/%s", directoryName, baseFilename);
|
||||||
|
|
||||||
/* build relation object and copy statement */
|
/* build relation object and copy statement */
|
||||||
RangeVar *relation = makeRangeVar(schemaName->data, relationName->data, -1);
|
RangeVar *rangeVar = makeRangeVar(schemaName->data, relationName->data, -1);
|
||||||
CopyStmt *copyStatement = CopyStatement(relation, fullFilename->data);
|
CopyStmt *copyStatement = CopyStatement(rangeVar, fullFilename->data);
|
||||||
if (BinaryWorkerCopyFormat)
|
if (BinaryWorkerCopyFormat)
|
||||||
{
|
{
|
||||||
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"),
|
DefElem *copyOption = makeDefElem("format", (Node *) makeString("binary"),
|
||||||
|
@ -567,12 +561,26 @@ CopyTaskFilesFromDirectory(StringInfo schemaName, StringInfo relationName,
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
ParseState *pstate = make_parsestate(NULL);
|
ParseState *parseState = make_parsestate(NULL);
|
||||||
pstate->p_sourcetext = queryString;
|
parseState->p_sourcetext = queryString;
|
||||||
|
|
||||||
DoCopy(pstate, copyStatement, -1, -1, &copiedRowCount);
|
Relation relation = table_openrv(rangeVar, RowExclusiveLock);
|
||||||
|
(void) addRangeTableEntryForRelation(parseState, relation, RowExclusiveLock,
|
||||||
|
NULL, false, false);
|
||||||
|
|
||||||
free_parsestate(pstate);
|
CopyFromState copyState = BeginCopyFrom_compat(parseState,
|
||||||
|
relation,
|
||||||
|
NULL,
|
||||||
|
copyStatement->filename,
|
||||||
|
copyStatement->is_program,
|
||||||
|
NULL,
|
||||||
|
copyStatement->attlist,
|
||||||
|
copyStatement->options);
|
||||||
|
copiedRowCount = CopyFrom(copyState);
|
||||||
|
EndCopyFrom(copyState);
|
||||||
|
|
||||||
|
free_parsestate(parseState);
|
||||||
|
table_close(relation, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
copiedRowTotal += copiedRowCount;
|
copiedRowTotal += copiedRowCount;
|
||||||
|
|
|
@ -35,7 +35,6 @@
|
||||||
#define JOB_SCHEMA_PREFIX "pg_merge_job_"
|
#define JOB_SCHEMA_PREFIX "pg_merge_job_"
|
||||||
#define TASK_FILE_PREFIX "task_"
|
#define TASK_FILE_PREFIX "task_"
|
||||||
#define TASK_TABLE_PREFIX "task_"
|
#define TASK_TABLE_PREFIX "task_"
|
||||||
#define TABLE_FILE_PREFIX "table_"
|
|
||||||
#define PARTITION_FILE_PREFIX "p_"
|
#define PARTITION_FILE_PREFIX "p_"
|
||||||
#define ATTEMPT_FILE_SUFFIX ".attempt"
|
#define ATTEMPT_FILE_SUFFIX ".attempt"
|
||||||
#define MERGE_TABLE_SUFFIX "_merge"
|
#define MERGE_TABLE_SUFFIX "_merge"
|
||||||
|
|
|
@ -668,6 +668,39 @@ SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
-- non-superuser should be able to use worker_append_table_to_shard on their own shard
|
||||||
|
SET ROLE full_access;
|
||||||
|
CREATE TABLE full_access_user_schema.source_table (id int);
|
||||||
|
INSERT INTO full_access_user_schema.source_table VALUES (1);
|
||||||
|
CREATE TABLE full_access_user_schema.shard_0 (id int);
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
worker_append_table_to_shard
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM full_access_user_schema.shard_0;
|
||||||
|
id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET ROLE;
|
||||||
|
-- other users should not be able to read from a table they have no access to via worker_append_table_to_shard
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
WARNING: permission denied for table source_table
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
ERROR: could not copy table "source_table" from "localhost:xxxxx"
|
||||||
|
RESET ROLE;
|
||||||
|
-- allow usage_access to read from table
|
||||||
|
GRANT SELECT ON full_access_user_schema.source_table TO usage_access;
|
||||||
|
-- other users should not be able to write to a table they do not have write access to
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
ERROR: permission denied for table shard_0
|
||||||
|
RESET ROLE;
|
||||||
|
DROP TABLE full_access_user_schema.source_table, full_access_user_schema.shard_0;
|
||||||
-- now we will test that only the user who owns the fetched file is able to merge it into
|
-- now we will test that only the user who owns the fetched file is able to merge it into
|
||||||
-- a table
|
-- a table
|
||||||
-- test that no other user can merge the downloaded file before the task is being tracked
|
-- test that no other user can merge the downloaded file before the task is being tracked
|
||||||
|
|
|
@ -409,6 +409,30 @@ SET ROLE full_access;
|
||||||
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
|
||||||
RESET ROLE;
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- non-superuser should be able to use worker_append_table_to_shard on their own shard
|
||||||
|
SET ROLE full_access;
|
||||||
|
CREATE TABLE full_access_user_schema.source_table (id int);
|
||||||
|
INSERT INTO full_access_user_schema.source_table VALUES (1);
|
||||||
|
CREATE TABLE full_access_user_schema.shard_0 (id int);
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
SELECT * FROM full_access_user_schema.shard_0;
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- other users should not be able to read from a table they have no access to via worker_append_table_to_shard
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
-- allow usage_access to read from table
|
||||||
|
GRANT SELECT ON full_access_user_schema.source_table TO usage_access;
|
||||||
|
|
||||||
|
-- other users should not be able to write to a table they do not have write access to
|
||||||
|
SET ROLE usage_access;
|
||||||
|
SELECT worker_append_table_to_shard('full_access_user_schema.shard_0', 'full_access_user_schema.source_table', 'localhost', :worker_2_port);
|
||||||
|
RESET ROLE;
|
||||||
|
|
||||||
|
DROP TABLE full_access_user_schema.source_table, full_access_user_schema.shard_0;
|
||||||
|
|
||||||
-- now we will test that only the user who owns the fetched file is able to merge it into
|
-- now we will test that only the user who owns the fetched file is able to merge it into
|
||||||
-- a table
|
-- a table
|
||||||
-- test that no other user can merge the downloaded file before the task is being tracked
|
-- test that no other user can merge the downloaded file before the task is being tracked
|
||||||
|
|
Loading…
Reference in New Issue