diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 3861c2ac1..fb33a128c 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -533,8 +533,6 @@ LockShardResource(uint64 shardId, LOCKMODE lockmode) const bool sessionLock = false; const bool dontWait = false; - AssertArg(shardId != INVALID_SHARD_ID); - SET_LOCKTAG_SHARD_RESOURCE(tag, MyDatabaseId, shardId); (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 6a7581bef..3c219a085 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -28,13 +28,14 @@ #include "commands/extension.h" #include "commands/sequence.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" -#include "distributed/listutils.h" #include "distributed/coordinator_protocol.h" +#include "distributed/intermediate_results.h" +#include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" -#include "distributed/commands/multi_copy.h" #include "distributed/multi_logical_optimizer.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_server_executor.h" @@ -45,6 +46,7 @@ #include "distributed/worker_protocol.h" #include "distributed/version_compat.h" #include "nodes/makefuncs.h" +#include "parser/parse_relation.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" @@ -594,9 +596,6 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) char *sourceSchemaName = NULL; char *sourceTableName = NULL; - Oid savedUserId = InvalidOid; - int savedSecurityContext = 0; - /* We extract schema names and table names from qualified names */ DeconstructQualifiedName(shardQualifiedNameList, &shardSchemaName, &shardTableName); @@ -611,10 +610,13 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) uint64 shardId = ExtractShardIdFromTableName(shardTableName, false); LockShardResource(shardId, AccessExclusiveLock); - /* copy remote table's data to this node */ + /* + * Copy into intermediate results directory, which is automatically cleaned on + * error. + */ StringInfo localFilePath = makeStringInfo(); - appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT, - PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId); + appendStringInfo(localFilePath, "%s/worker_append_table_to_shard_" UINT64_FORMAT, + CreateIntermediateResultsDirectory(), shardId); char *sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName); @@ -639,7 +641,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName); } - bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, + char *userName = CurrentUserName(); + bool received = ReceiveRegularFile(sourceNodeName, sourceNodePort, userName, sourceCopyCommand, localFilePath); if (!received) @@ -662,17 +665,33 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS) /* make sure we are allowed to execute the COPY command */ CheckCopyPermissions(localCopyCommand); - /* need superuser to copy from files */ - GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); - SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + Relation shardRelation = table_openrv(localCopyCommand->relation, RowExclusiveLock); - ProcessUtilityParseTree((Node *) localCopyCommand, queryString->data, - PROCESS_UTILITY_QUERY, NULL, None_Receiver, NULL); + /* mimic check from copy.c */ + if (XactReadOnly && !shardRelation->rd_islocaltemp) + { + PreventCommandIfReadOnly("COPY FROM"); + } - SetUserIdAndSecContext(savedUserId, savedSecurityContext); + ParseState *parseState = make_parsestate(NULL); + (void) addRangeTableEntryForRelation(parseState, shardRelation, RowExclusiveLock, + NULL, false, false); + + CopyState copyState = BeginCopyFrom(parseState, + shardRelation, + localCopyCommand->filename, + localCopyCommand->is_program, + NULL, + localCopyCommand->attlist, + localCopyCommand->options); + CopyFrom(copyState); + EndCopyFrom(copyState); + + free_parsestate(parseState); /* finally delete the temporary file we created */ CitusDeleteFile(localFilePath->data); + table_close(shardRelation, NoLock); PG_RETURN_VOID(); } diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index e9d88f411..b91d64184 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -35,7 +35,6 @@ #define JOB_SCHEMA_PREFIX "pg_merge_job_" #define TASK_FILE_PREFIX "task_" #define TASK_TABLE_PREFIX "task_" -#define TABLE_FILE_PREFIX "table_" #define PARTITION_FILE_PREFIX "p_" #define ATTEMPT_FILE_SUFFIX ".attempt" #define MERGE_TABLE_SUFFIX "_merge"