mirror of https://github.com/citusdata/citus.git
Perform copy command as regular user in worker_append_table_to_shard
parent
9ae912a8c8
commit
4faa49775b
|
@ -577,8 +577,6 @@ LockShardResource(uint64 shardId, LOCKMODE lockmode)
|
|||
const bool sessionLock = false;
|
||||
const bool dontWait = false;
|
||||
|
||||
AssertArg(shardId != INVALID_SHARD_ID);
|
||||
|
||||
SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId);
|
||||
|
||||
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
||||
|
|
|
@ -28,13 +28,14 @@
|
|||
#include "commands/extension.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
@ -45,6 +46,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "parser/parse_relation.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "tcop/tcopprot.h"
|
||||
#include "tcop/utility.h"
|
||||
|
@ -594,9 +596,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
char *sourceSchemaName = NULL;
|
||||
char *sourceTableName = NULL;
|
||||
|
||||
Oid savedUserId = InvalidOid;
|
||||
int savedSecurityContext = 0;
|
||||
|
||||
/* We extract schema names and table names from qualified names */
|
||||
DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName);
|
||||
|
||||
|
@ -611,10 +610,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
uint64 shardId = ExtractShardIdFromTableName(shardTableName, false);
|
||||
LockShardResource(shardId, AccessExclusiveLock);
|
||||
|
||||
/* copy remote table's data to this node */
|
||||
/*
|
||||
* Copy into intermediate results directory, which is automatically cleaned on
|
||||
* error.
|
||||
*/
|
||||
StringInfo localFilePath = makeStringInfo();
|
||||
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
|
||||
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
|
||||
appendStringInfo(localFilePath, "%s/worker_append_table_to_shard_" UINT64_FORMAT,
|
||||
CreateIntermediateResultsDirectory(), shardId);
|
||||
|
||||
char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName,
|
||||
sourceTableName);
|
||||
|
@ -639,7 +641,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
||||
}
|
||||
|
||||
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL,
|
||||
char *userName = CurrentUserName();
|
||||
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, userName,
|
||||
sourceCopyCommand,
|
||||
localFilePath);
|
||||
if (!received)
|
||||
|
@ -662,17 +665,36 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
/* make sure we are allowed to execute the COPY command */
|
||||
CheckCopyPermissions(localCopyCommand);
|
||||
|
||||
/* need superuser to copy from files */
|
||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||
Relation shardRelation = table_openrv(localCopyCommand->relation, RowExclusiveLock);
|
||||
|
||||
ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data,
|
||||
PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL);
|
||||
/* mimic check from copy.c */
|
||||
if (XactReadOnly && !shardRelation->rd_islocaltemp)
|
||||
{
|
||||
PreventCommandIfReadOnly("COPY FROM");
|
||||
}
|
||||
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
ParseState *parseState = make_parsestate(NULL);
|
||||
(void) addRangeTableEntryForRelation(parseState, shardRelation, RowExclusiveLock,
|
||||
NULL, false, false);
|
||||
|
||||
CopyFromState copyState = BeginCopyFrom_compat(parseState,
|
||||
shardRelation,
|
||||
NULL,
|
||||
localCopyCommand->filename,
|
||||
localCopyCommand->is_program,
|
||||
NULL,
|
||||
localCopyCommand->attlist,
|
||||
localCopyCommand->options);
|
||||
|
||||
|
||||
CopyFrom(copyState);
|
||||
EndCopyFrom(copyState);
|
||||
|
||||
free_parsestate(parseState);
|
||||
|
||||
/* finally delete the temporary file we created */
|
||||
CitusDeleteFile(localFilePath->data);
|
||||
table_close(shardRelation, NoLock);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -35,7 +35,6 @@
|
|||
#define JOB_SCHEMA_PREFIX "pg_merge_job_"
|
||||
#define TASK_FILE_PREFIX "task_"
|
||||
#define TASK_TABLE_PREFIX "task_"
|
||||
#define TABLE_FILE_PREFIX "table_"
|
||||
#define PARTITION_FILE_PREFIX "p_"
|
||||
#define ATTEMPT_FILE_SUFFIX ".attempt"
|
||||
#define MERGE_TABLE_SUFFIX "_merge"
|
||||
|
|
Loading…
Reference in New Issue