mirror of https://github.com/citusdata/citus.git
Perform copy command as regular user in worker_append_table_to_shard
parent
30b46975b8
commit
28a503fad9
|
@ -533,8 +533,6 @@ LockShardResource(uint64 shardId, LOCKMODE lockmode)
|
||||||
const bool sessionLock = false;
|
const bool sessionLock = false;
|
||||||
const bool dontWait = false;
|
const bool dontWait = false;
|
||||||
|
|
||||||
AssertArg(shardId != INVALID_SHARD_ID);
|
|
||||||
|
|
||||||
SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId);
|
SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId);
|
||||||
|
|
||||||
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
(void) LockAcquire(&tag, lockmode, sessionLock, dontWait);
|
||||||
|
|
|
@ -28,13 +28,14 @@
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
#include "commands/sequence.h"
|
#include "commands/sequence.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
|
#include "distributed/commands/multi_copy.h"
|
||||||
#include "distributed/commands/utility_hook.h"
|
#include "distributed/commands/utility_hook.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/listutils.h"
|
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
|
#include "distributed/intermediate_results.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/commands/multi_copy.h"
|
|
||||||
#include "distributed/multi_logical_optimizer.h"
|
#include "distributed/multi_logical_optimizer.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -45,6 +46,7 @@
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
#include "parser/parse_relation.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "tcop/tcopprot.h"
|
#include "tcop/tcopprot.h"
|
||||||
#include "tcop/utility.h"
|
#include "tcop/utility.h"
|
||||||
|
@ -594,9 +596,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
char *sourceSchemaName = NULL;
|
char *sourceSchemaName = NULL;
|
||||||
char *sourceTableName = NULL;
|
char *sourceTableName = NULL;
|
||||||
|
|
||||||
Oid savedUserId = InvalidOid;
|
|
||||||
int savedSecurityContext = 0;
|
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
/* We extract schema names and table names from qualified names */
|
/* We extract schema names and table names from qualified names */
|
||||||
|
@ -613,10 +612,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
uint64 shardId = ExtractShardIdFromTableName(shardTableName, false);
|
uint64 shardId = ExtractShardIdFromTableName(shardTableName, false);
|
||||||
LockShardResource(shardId, AccessExclusiveLock);
|
LockShardResource(shardId, AccessExclusiveLock);
|
||||||
|
|
||||||
/* copy remote table's data to this node */
|
/*
|
||||||
|
* Copy into intermediate results directory, which is automatically cleaned on
|
||||||
|
* error.
|
||||||
|
*/
|
||||||
StringInfo localFilePath = makeStringInfo();
|
StringInfo localFilePath = makeStringInfo();
|
||||||
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
|
appendStringInfo(localFilePath, "%s/worker_append_table_to_shard_" UINT64_FORMAT,
|
||||||
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
|
CreateIntermediateResultsDirectory(), shardId);
|
||||||
|
|
||||||
char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName,
|
char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName,
|
||||||
sourceTableName);
|
sourceTableName);
|
||||||
|
@ -641,7 +643,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL,
|
char *userName = CurrentUserName();
|
||||||
|
bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, userName,
|
||||||
sourceCopyCommand,
|
sourceCopyCommand,
|
||||||
localFilePath);
|
localFilePath);
|
||||||
if (!received)
|
if (!received)
|
||||||
|
@ -664,17 +667,36 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
/* make sure we are allowed to execute the COPY command */
|
/* make sure we are allowed to execute the COPY command */
|
||||||
CheckCopyPermissions(localCopyCommand);
|
CheckCopyPermissions(localCopyCommand);
|
||||||
|
|
||||||
/* need superuser to copy from files */
|
Relation shardRelation = table_openrv(localCopyCommand->relation, RowExclusiveLock);
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
|
||||||
|
|
||||||
ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data,
|
/* mimic check from copy.c */
|
||||||
PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL);
|
if (XactReadOnly && !shardRelation->rd_islocaltemp)
|
||||||
|
{
|
||||||
|
PreventCommandIfReadOnly("COPY FROM");
|
||||||
|
}
|
||||||
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
ParseState *parseState = make_parsestate(NULL);
|
||||||
|
(void) addRangeTableEntryForRelation(parseState, shardRelation,
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
RowExclusiveLock,
|
||||||
|
#endif
|
||||||
|
NULL, false, false);
|
||||||
|
|
||||||
|
CopyState copyState = BeginCopyFrom(parseState,
|
||||||
|
shardRelation,
|
||||||
|
localCopyCommand->filename,
|
||||||
|
localCopyCommand->is_program,
|
||||||
|
NULL,
|
||||||
|
localCopyCommand->attlist,
|
||||||
|
localCopyCommand->options);
|
||||||
|
CopyFrom(copyState);
|
||||||
|
EndCopyFrom(copyState);
|
||||||
|
|
||||||
|
free_parsestate(parseState);
|
||||||
|
|
||||||
/* finally delete the temporary file we created */
|
/* finally delete the temporary file we created */
|
||||||
CitusDeleteFile(localFilePath->data);
|
CitusDeleteFile(localFilePath->data);
|
||||||
|
table_close(shardRelation, NoLock);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,7 +35,6 @@
|
||||||
#define JOB_SCHEMA_PREFIX "pg_merge_job_"
|
#define JOB_SCHEMA_PREFIX "pg_merge_job_"
|
||||||
#define TASK_FILE_PREFIX "task_"
|
#define TASK_FILE_PREFIX "task_"
|
||||||
#define TASK_TABLE_PREFIX "task_"
|
#define TASK_TABLE_PREFIX "task_"
|
||||||
#define TABLE_FILE_PREFIX "table_"
|
|
||||||
#define PARTITION_FILE_PREFIX "p_"
|
#define PARTITION_FILE_PREFIX "p_"
|
||||||
#define ATTEMPT_FILE_SUFFIX ".attempt"
|
#define ATTEMPT_FILE_SUFFIX ".attempt"
|
||||||
#define MERGE_TABLE_SUFFIX "_merge"
|
#define MERGE_TABLE_SUFFIX "_merge"
|
||||||
|
|
Loading…
Reference in New Issue