mirror of https://github.com/citusdata/citus.git
Merge pull request #2491 from citusdata/backport_copytt_80
Backport #2487 to release-8.0release-8.0
commit
606e2b18d7
1
Makefile
1
Makefile
|
@ -116,6 +116,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
|
|||
src/backend/distributed/worker/worker_file_access_protocol.o \
|
||||
src/backend/distributed/worker/worker_merge_protocol.o \
|
||||
src/backend/distributed/worker/worker_partition_protocol.o \
|
||||
src/backend/distributed/worker/worker_sql_task_protocol.o \
|
||||
src/backend/distributed/worker/worker_truncate_trigger_protocol.o \
|
||||
$(WIN32RES)
|
||||
|
||||
|
|
|
@ -338,6 +338,18 @@ StubRelation(TupleDesc tupleDescriptor)
|
|||
void
|
||||
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
||||
DestReceiver *dest)
|
||||
{
|
||||
Query *query = ParseQueryString(queryString);
|
||||
|
||||
ExecuteQueryIntoDestReceiver(query, params, dest);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ParseQuery parses query string and returns a Query struct.
|
||||
*/
|
||||
Query *
|
||||
ParseQueryString(const char *queryString)
|
||||
{
|
||||
Query *query = NULL;
|
||||
|
||||
|
@ -356,7 +368,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params
|
|||
|
||||
query = (Query *) linitial(queryTreeList);
|
||||
|
||||
ExecuteQueryIntoDestReceiver(query, params, dest);
|
||||
return query;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -117,10 +117,11 @@ static bool IsCitusExtensionStmt(Node *parsetree);
|
|||
static bool IsTransmitStmt(Node *parsetree);
|
||||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||
static bool IsCopyResultStmt(CopyStmt *copyStatement);
|
||||
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
|
||||
|
||||
/* Local functions forward declarations for processing distributed table commands */
|
||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||
bool *commandMustRunAsOwner);
|
||||
const char *queryString);
|
||||
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
|
||||
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
|
||||
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||
|
@ -176,7 +177,6 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
|||
const char *commandString);
|
||||
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
||||
void *arg);
|
||||
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
|
||||
static void PostProcessUtility(Node *parsetree);
|
||||
static List * CollectGrantTableIdList(GrantStmt *grantStmt);
|
||||
|
@ -257,9 +257,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
char *completionTag)
|
||||
{
|
||||
Node *parsetree = pstmt->utilityStmt;
|
||||
bool commandMustRunAsOwner = false;
|
||||
Oid savedUserId = InvalidOid;
|
||||
int savedSecurityContext = 0;
|
||||
List *ddlJobs = NIL;
|
||||
bool checkExtensionVersion = false;
|
||||
|
||||
|
@ -369,7 +366,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
|
||||
parsetree = copyObject(parsetree);
|
||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
|
||||
&commandMustRunAsOwner);
|
||||
queryString);
|
||||
|
||||
previousContext = MemoryContextSwitchTo(planContext);
|
||||
parsetree = copyObject(parsetree);
|
||||
|
@ -560,13 +557,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
}
|
||||
}
|
||||
|
||||
/* set user if needed and go ahead and run local utility using standard hook */
|
||||
if (commandMustRunAsOwner)
|
||||
{
|
||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||
}
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
pstmt->utilityStmt = parsetree;
|
||||
standard_ProcessUtility(pstmt, queryString, context,
|
||||
|
@ -605,11 +595,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
|||
PostProcessUtility(parsetree);
|
||||
}
|
||||
|
||||
if (commandMustRunAsOwner)
|
||||
{
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
}
|
||||
|
||||
/*
|
||||
* Re-forming the foreign key graph relies on the command being executed
|
||||
* on the local table first. However, in order to decide whether the
|
||||
|
@ -971,9 +956,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
|||
*/
|
||||
static bool
|
||||
IsCopyResultStmt(CopyStmt *copyStatement)
|
||||
{
|
||||
return CopyStatementHasFormat(copyStatement, "result");
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyStatementHasFormat checks whether the COPY statement has the given
|
||||
* format.
|
||||
*/
|
||||
static bool
|
||||
CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
|
||||
{
|
||||
ListCell *optionCell = NULL;
|
||||
bool hasFormatReceive = false;
|
||||
bool hasFormat = false;
|
||||
|
||||
/* extract WITH (...) options from the COPY statement */
|
||||
foreach(optionCell, copyStatement->options)
|
||||
|
@ -981,14 +977,14 @@ IsCopyResultStmt(CopyStmt *copyStatement)
|
|||
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||
|
||||
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
|
||||
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
|
||||
strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0)
|
||||
{
|
||||
hasFormatReceive = true;
|
||||
hasFormat = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return hasFormatReceive;
|
||||
return hasFormat;
|
||||
}
|
||||
|
||||
|
||||
|
@ -997,18 +993,10 @@ IsCopyResultStmt(CopyStmt *copyStatement)
|
|||
* COPYing from distributed tables and preventing unsupported actions. The
|
||||
* function returns a modified COPY statement to be executed, or NULL if no
|
||||
* further processing is needed.
|
||||
*
|
||||
* commandMustRunAsOwner is an output parameter used to communicate to the caller whether
|
||||
* the copy statement should be executed using elevated privileges. If
|
||||
* ProcessCopyStmt that is required, a call to CheckCopyPermissions will take
|
||||
* care of verifying the current user's permissions before ProcessCopyStmt
|
||||
* returns.
|
||||
*/
|
||||
static Node *
|
||||
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner)
|
||||
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString)
|
||||
{
|
||||
*commandMustRunAsOwner = false; /* make sure variable is initialized */
|
||||
|
||||
/*
|
||||
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
|
||||
* for sending intermediate results to workers.
|
||||
|
@ -1114,49 +1102,47 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
if (copyStatement->filename != NULL && !copyStatement->is_program)
|
||||
{
|
||||
const char *filename = copyStatement->filename;
|
||||
|
||||
if (CacheDirectoryElement(filename))
|
||||
{
|
||||
/*
|
||||
* Only superusers are allowed to copy from a file, so we have to
|
||||
* become superuser to execute copies to/from files used by citus'
|
||||
* query execution.
|
||||
*
|
||||
* XXX: This is a decidedly suboptimal solution, as that means
|
||||
* that triggers, input functions, etc. run with elevated
|
||||
* privileges. But this is better than not being able to run
|
||||
* queries as normal user.
|
||||
*/
|
||||
*commandMustRunAsOwner = true;
|
||||
char *filename = copyStatement->filename;
|
||||
|
||||
/*
|
||||
* Have to manually check permissions here as the COPY is will be
|
||||
* run as a superuser.
|
||||
* We execute COPY commands issued by the task-tracker executor here
|
||||
* because we're not normally allowed to write to a file as a regular
|
||||
* user and we don't want to execute the query as superuser.
|
||||
*/
|
||||
if (copyStatement->relation != NULL)
|
||||
if (CacheDirectoryElement(filename) && copyStatement->query != NULL &&
|
||||
!copyStatement->is_from && !is_absolute_path(filename))
|
||||
{
|
||||
CheckCopyPermissions(copyStatement);
|
||||
bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary");
|
||||
int64 tuplesSent = 0;
|
||||
Query *query = NULL;
|
||||
Node *queryNode = copyStatement->query;
|
||||
List *queryTreeList = NIL;
|
||||
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
RawStmt *rawStmt = makeNode(RawStmt);
|
||||
rawStmt->stmt = queryNode;
|
||||
|
||||
queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
|
||||
#else
|
||||
queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0);
|
||||
#endif
|
||||
|
||||
if (list_length(queryTreeList) != 1)
|
||||
{
|
||||
ereport(ERROR, (errmsg("can only execute a single query")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Check if we have a "COPY (query) TO filename". If we do, copy
|
||||
* doesn't accept relative file paths. However, SQL tasks that get
|
||||
* assigned to worker nodes have relative paths. We therefore
|
||||
* convert relative paths to absolute ones here.
|
||||
*/
|
||||
if (copyStatement->relation == NULL &&
|
||||
!copyStatement->is_from &&
|
||||
!is_absolute_path(filename))
|
||||
{
|
||||
copyStatement->filename = make_absolute_path(filename);
|
||||
}
|
||||
}
|
||||
}
|
||||
query = (Query *) linitial(queryTreeList);
|
||||
tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat);
|
||||
|
||||
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||
"COPY " UINT64_FORMAT, tuplesSent);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return (Node *) copyStatement;
|
||||
}
|
||||
|
@ -3917,7 +3903,7 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi
|
|||
*
|
||||
* Copied from postgres, where it's part of DoCopy().
|
||||
*/
|
||||
static void
|
||||
void
|
||||
CheckCopyPermissions(CopyStmt *copyStatement)
|
||||
{
|
||||
/* *INDENT-OFF* */
|
||||
|
|
|
@ -31,6 +31,7 @@
|
|||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
|
@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
StringInfo queryString = NULL;
|
||||
Oid sourceShardRelationId = InvalidOid;
|
||||
Oid sourceSchemaId = InvalidOid;
|
||||
Oid savedUserId = InvalidOid;
|
||||
int savedSecurityContext = 0;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
|
@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
|||
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
|
||||
localFilePath->data);
|
||||
|
||||
/* make sure we are allowed to execute the COPY command */
|
||||
CheckCopyPermissions(localCopyCommand);
|
||||
|
||||
/* need superuser to copy from files */
|
||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||
|
||||
CitusProcessUtility((Node *) localCopyCommand, queryString->data,
|
||||
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||
|
||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
|
||||
/* finally delete the temporary file we created */
|
||||
CitusDeleteFile(localFilePath->data);
|
||||
|
||||
|
|
|
@ -0,0 +1,280 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* worker_sql_task_protocol.c
|
||||
*
|
||||
* Routines for executing SQL tasks during task-tracker execution.
|
||||
*
|
||||
* Copyright (c) 2012-2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "funcapi.h"
|
||||
#include "pgstat.h"
|
||||
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
/* TaskFileDestReceiver can be used to stream results into a file */
|
||||
typedef struct TaskFileDestReceiver
|
||||
{
|
||||
/* public DestReceiver interface */
|
||||
DestReceiver pub;
|
||||
|
||||
/* descriptor of the tuples that are sent to the worker */
|
||||
TupleDesc tupleDescriptor;
|
||||
|
||||
/* EState for per-tuple memory allocation */
|
||||
EState *executorState;
|
||||
|
||||
/* MemoryContext for DestReceiver session */
|
||||
MemoryContext memoryContext;
|
||||
|
||||
/* output file */
|
||||
char *filePath;
|
||||
File fileDesc;
|
||||
bool binaryCopyFormat;
|
||||
|
||||
/* state on how to copy out data types */
|
||||
CopyOutState copyOutState;
|
||||
FmgrInfo *columnOutputFunctions;
|
||||
|
||||
/* number of tuples sent */
|
||||
uint64 tuplesSent;
|
||||
} TaskFileDestReceiver;
|
||||
|
||||
|
||||
static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState,
|
||||
bool binaryCopyFormat);
|
||||
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||
TupleDesc inputTupleDescriptor);
|
||||
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
||||
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
|
||||
static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
|
||||
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
|
||||
|
||||
|
||||
/*
|
||||
* WorkerExecuteSqlTask executes an already-parsed query and writes the result
|
||||
* to the given task file.
|
||||
*/
|
||||
int64
|
||||
WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
|
||||
{
|
||||
EState *estate = NULL;
|
||||
TaskFileDestReceiver *taskFileDest = NULL;
|
||||
ParamListInfo paramListInfo = NULL;
|
||||
int64 tuplesSent = 0L;
|
||||
|
||||
estate = CreateExecutorState();
|
||||
taskFileDest =
|
||||
(TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate,
|
||||
binaryCopyFormat);
|
||||
|
||||
ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest);
|
||||
|
||||
tuplesSent = taskFileDest->tuplesSent;
|
||||
|
||||
taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest);
|
||||
FreeExecutorState(estate);
|
||||
|
||||
return tuplesSent;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateTaskFileDestReceiver creates a DestReceiver for writing query results
|
||||
* to a task file.
|
||||
*/
|
||||
static DestReceiver *
|
||||
CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat)
|
||||
{
|
||||
TaskFileDestReceiver *taskFileDest = NULL;
|
||||
|
||||
taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver));
|
||||
|
||||
/* set up the DestReceiver function pointers */
|
||||
taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive;
|
||||
taskFileDest->pub.rStartup = TaskFileDestReceiverStartup;
|
||||
taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown;
|
||||
taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy;
|
||||
taskFileDest->pub.mydest = DestCopyOut;
|
||||
|
||||
/* set up output parameters */
|
||||
taskFileDest->executorState = executorState;
|
||||
taskFileDest->memoryContext = CurrentMemoryContext;
|
||||
taskFileDest->filePath = pstrdup(filePath);
|
||||
taskFileDest->binaryCopyFormat = binaryCopyFormat;
|
||||
|
||||
return (DestReceiver *) taskFileDest;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskFileDestReceiverStartup implements the rStartup interface of
|
||||
* TaskFileDestReceiver. It opens the destination file and sets up
|
||||
* the CopyOutState.
|
||||
*/
|
||||
static void
|
||||
TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||
TupleDesc inputTupleDescriptor)
|
||||
{
|
||||
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
|
||||
|
||||
CopyOutState copyOutState = NULL;
|
||||
const char *delimiterCharacter = "\t";
|
||||
const char *nullPrintCharacter = "\\N";
|
||||
|
||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||
|
||||
/* use the memory context that was in place when the DestReceiver was created */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext);
|
||||
|
||||
taskFileDest->tupleDescriptor = inputTupleDescriptor;
|
||||
|
||||
/* define how tuples will be serialised */
|
||||
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||
copyOutState->delim = (char *) delimiterCharacter;
|
||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||
copyOutState->binary = taskFileDest->binaryCopyFormat;
|
||||
copyOutState->fe_msgbuf = makeStringInfo();
|
||||
copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState);
|
||||
taskFileDest->copyOutState = copyOutState;
|
||||
|
||||
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||
copyOutState->binary);
|
||||
|
||||
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
|
||||
fileMode);
|
||||
|
||||
if (copyOutState->binary)
|
||||
{
|
||||
/* write headers when using binary encoding */
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryHeaders(copyOutState);
|
||||
|
||||
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||
}
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskFileDestReceiverReceive implements the receiveSlot function of
|
||||
* TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents
|
||||
* to a local file.
|
||||
*/
|
||||
static bool
|
||||
TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||
{
|
||||
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
|
||||
|
||||
TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor;
|
||||
|
||||
CopyOutState copyOutState = taskFileDest->copyOutState;
|
||||
FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions;
|
||||
|
||||
Datum *columnValues = NULL;
|
||||
bool *columnNulls = NULL;
|
||||
StringInfo copyData = copyOutState->fe_msgbuf;
|
||||
|
||||
EState *executorState = taskFileDest->executorState;
|
||||
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||
|
||||
slot_getallattrs(slot);
|
||||
|
||||
columnValues = slot->tts_values;
|
||||
columnNulls = slot->tts_isnull;
|
||||
|
||||
resetStringInfo(copyData);
|
||||
|
||||
/* construct row in COPY format */
|
||||
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||
copyOutState, columnOutputFunctions, NULL);
|
||||
|
||||
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
taskFileDest->tuplesSent++;
|
||||
|
||||
ResetPerTupleExprContext(executorState);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
|
||||
*/
|
||||
static void
|
||||
WriteToLocalFile(StringInfo copyData, File fileDesc)
|
||||
{
|
||||
#if (PG_VERSION_NUM >= 100000)
|
||||
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
|
||||
#else
|
||||
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len);
|
||||
#endif
|
||||
if (bytesWritten < 0)
|
||||
{
|
||||
ereport(ERROR, (errcode_for_file_access(),
|
||||
errmsg("could not append to file: %m")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskFileDestReceiverShutdown implements the rShutdown interface of
|
||||
* TaskFileDestReceiver. It writes the footer and closes the file.
|
||||
* the relation.
|
||||
*/
|
||||
static void
|
||||
TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||
{
|
||||
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
|
||||
CopyOutState copyOutState = taskFileDest->copyOutState;
|
||||
|
||||
if (copyOutState->binary)
|
||||
{
|
||||
/* write footers when using binary encoding */
|
||||
resetStringInfo(copyOutState->fe_msgbuf);
|
||||
AppendCopyBinaryFooters(copyOutState);
|
||||
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||
}
|
||||
|
||||
FileClose(taskFileDest->fileDesc);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TaskFileDestReceiverDestroy frees memory allocated as part of the
|
||||
* TaskFileDestReceiver and closes file descriptors.
|
||||
*/
|
||||
static void
|
||||
TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
|
||||
{
|
||||
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
|
||||
|
||||
if (taskFileDest->copyOutState)
|
||||
{
|
||||
pfree(taskFileDest->copyOutState);
|
||||
}
|
||||
|
||||
if (taskFileDest->columnOutputFunctions)
|
||||
{
|
||||
pfree(taskFileDest->columnOutputFunctions);
|
||||
}
|
||||
|
||||
pfree(taskFileDest->filePath);
|
||||
pfree(taskFileDest);
|
||||
}
|
|
@ -131,6 +131,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur
|
|||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
||||
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
||||
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||
|
||||
|
||||
#endif /* MULTI_COPY_H */
|
||||
|
|
|
@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
|||
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||
tupleDescriptor, Tuplestorestate *tupstore);
|
||||
extern Query * ParseQueryString(const char *queryString);
|
||||
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
||||
params,
|
||||
DestReceiver *dest);
|
||||
|
|
|
@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur
|
|||
extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk);
|
||||
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
||||
const char *tableName);
|
||||
extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename,
|
||||
bool binaryCopyFormat);
|
||||
|
||||
|
||||
/* Function declarations shared with the master planner */
|
||||
|
|
|
@ -9,3 +9,6 @@
|
|||
# Regression test output
|
||||
/regression.diffs
|
||||
/regression.out
|
||||
|
||||
# Failure test side effets
|
||||
/proxy.output
|
||||
|
|
|
@ -122,17 +122,28 @@ SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- hide the error message (it has the PID)...
|
||||
-- this transaction block will be sent to the coordinator as a remote command to hide the
|
||||
-- error message that is caused during commit.
|
||||
-- we'll test for the txn side-effects to ensure it didn't run
|
||||
SET client_min_messages TO FATAL;
|
||||
SELECT master_run_on_worker(
|
||||
ARRAY['localhost']::text[],
|
||||
ARRAY[:master_port]::int[],
|
||||
ARRAY['
|
||||
BEGIN;
|
||||
DELETE FROM dml_test WHERE id = 1;
|
||||
DELETE FROM dml_test WHERE id = 2;
|
||||
INSERT INTO dml_test VALUES (5, 'Epsilon');
|
||||
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
|
||||
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
|
||||
INSERT INTO dml_test VALUES (5, ''Epsilon'');
|
||||
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
|
||||
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
|
||||
COMMIT;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
'],
|
||||
false
|
||||
);
|
||||
master_run_on_worker
|
||||
---------------------------
|
||||
(localhost,57636,t,BEGIN)
|
||||
(1 row)
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
mitmproxy
|
||||
-----------
|
||||
|
|
|
@ -23,16 +23,6 @@ SELECT create_distributed_table('test_table','id');
|
|||
|
||||
-- Populate data to the table
|
||||
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
||||
-- Create a function to make sure that queries returning the same result
|
||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||
BEGIN
|
||||
EXECUTE query;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
END IF;
|
||||
END;
|
||||
$$LANGUAGE plpgsql;
|
||||
-- Kill when the first COPY command arrived, since we have a single placement
|
||||
-- it is expected to error out.
|
||||
SET client_min_messages TO ERROR;
|
||||
|
@ -42,9 +32,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
|
||||
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
|
||||
ERROR: Task failed to execute
|
||||
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
||||
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- Kill the connection with a CTE
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
|
@ -70,12 +60,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT raise_failed_execution('WITH
|
||||
SELECT public.raise_failed_execution('WITH
|
||||
results AS (SELECT * FROM test_table)
|
||||
SELECT * FROM test_table, results
|
||||
WHERE test_table.id = results.id');
|
||||
ERROR: Task failed to execute
|
||||
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
||||
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
|
||||
SET client_min_messages TO DEFAULT;
|
||||
-- In parallel execution mode Citus opens separate connections for each shard
|
||||
-- so killing the connection after the first copy does not break it.
|
||||
|
@ -297,7 +287,5 @@ WARNING: could not consume data from worker node
|
|||
|
||||
COMMIT;
|
||||
DROP SCHEMA real_time_select_failure CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to function raise_failed_execution(text)
|
||||
drop cascades to table test_table
|
||||
NOTICE: drop cascades to table test_table
|
||||
SET search_path TO default;
|
||||
|
|
|
@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
-------
|
||||
2
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
count | min
|
||||
-------+-------------
|
||||
2 | full_access
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
|
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
-------
|
||||
2
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
count | min
|
||||
-------+-------------
|
||||
2 | read_access
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
|
@ -171,7 +171,7 @@ ERROR: permission denied for table test
|
|||
SELECT count(*) FROM test WHERE id = 1;
|
||||
ERROR: permission denied for table test
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
ERROR: permission denied for table test
|
||||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
|
|
@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
-------
|
||||
2
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
count | min
|
||||
-------+-------------
|
||||
2 | full_access
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
|
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
|||
(1 row)
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
count
|
||||
-------
|
||||
2
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
count | min
|
||||
-------+-------------
|
||||
2 | read_access
|
||||
(1 row)
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
|
@ -171,7 +171,7 @@ ERROR: permission denied for relation test
|
|||
SELECT count(*) FROM test WHERE id = 1;
|
||||
ERROR: permission denied for relation test
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
ERROR: permission denied for relation test
|
||||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
|
|
@ -225,8 +225,12 @@ ERROR: parameter "citus.max_task_string_size" cannot be changed without restart
|
|||
-- error message may vary between executions
|
||||
-- hiding warning and error message
|
||||
-- no output means the query has failed
|
||||
SET client_min_messages to FATAL;
|
||||
SET client_min_messages to ERROR;
|
||||
SELECT raise_failed_execution('
|
||||
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||
');
|
||||
ERROR: Task failed to execute
|
||||
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
||||
-- following will succeed since it fetches few columns
|
||||
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||
long_column_001 | long_column_002 | long_column_003
|
||||
|
|
|
@ -109,3 +109,13 @@ $desc_views$
|
|||
|
||||
(1 row)
|
||||
|
||||
-- Create a function to make sure that queries returning the same result
|
||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||
BEGIN
|
||||
EXECUTE query;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
END IF;
|
||||
END;
|
||||
$$LANGUAGE plpgsql;
|
||||
|
|
|
@ -65,19 +65,23 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
|||
-- fail at PREPARE TRANSACTION
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
||||
|
||||
-- hide the error message (it has the PID)...
|
||||
-- this transaction block will be sent to the coordinator as a remote command to hide the
|
||||
-- error message that is caused during commit.
|
||||
-- we'll test for the txn side-effects to ensure it didn't run
|
||||
SET client_min_messages TO FATAL;
|
||||
|
||||
SELECT master_run_on_worker(
|
||||
ARRAY['localhost']::text[],
|
||||
ARRAY[:master_port]::int[],
|
||||
ARRAY['
|
||||
BEGIN;
|
||||
DELETE FROM dml_test WHERE id = 1;
|
||||
DELETE FROM dml_test WHERE id = 2;
|
||||
INSERT INTO dml_test VALUES (5, 'Epsilon');
|
||||
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
|
||||
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
|
||||
INSERT INTO dml_test VALUES (5, ''Epsilon'');
|
||||
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
|
||||
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
|
||||
COMMIT;
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
||||
'],
|
||||
false
|
||||
);
|
||||
|
||||
SELECT citus.mitmproxy('conn.allow()');
|
||||
SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3;
|
||||
|
|
|
@ -17,22 +17,11 @@ SELECT create_distributed_table('test_table','id');
|
|||
-- Populate data to the table
|
||||
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
||||
|
||||
-- Create a function to make sure that queries returning the same result
|
||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||
BEGIN
|
||||
EXECUTE query;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
END IF;
|
||||
END;
|
||||
$$LANGUAGE plpgsql;
|
||||
|
||||
-- Kill when the first COPY command arrived, since we have a single placement
|
||||
-- it is expected to error out.
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
|
||||
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
|
||||
SET client_min_messages TO DEFAULT;
|
||||
|
||||
-- Kill the connection with a CTE
|
||||
|
@ -46,7 +35,7 @@ WHERE test_table.id = results.id;
|
|||
-- killing connection after first successful query should break.
|
||||
SET client_min_messages TO ERROR;
|
||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
|
||||
SELECT raise_failed_execution('WITH
|
||||
SELECT public.raise_failed_execution('WITH
|
||||
results AS (SELECT * FROM test_table)
|
||||
SELECT * FROM test_table, results
|
||||
WHERE test_table.id = results.id');
|
||||
|
|
|
@ -70,7 +70,7 @@ SELECT count(*) FROM test;
|
|||
SELECT count(*) FROM test WHERE id = 1;
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
@ -94,7 +94,7 @@ SELECT count(*) FROM test;
|
|||
SELECT count(*) FROM test WHERE id = 1;
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
|
||||
-- test re-partition query (needs to transmit intermediate results)
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
@ -115,7 +115,7 @@ SELECT count(*) FROM test;
|
|||
SELECT count(*) FROM test WHERE id = 1;
|
||||
|
||||
SET citus.task_executor_type TO 'task-tracker';
|
||||
SELECT count(*) FROM test;
|
||||
SELECT count(*), min(current_user) FROM test;
|
||||
|
||||
-- test re-partition query
|
||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||
|
|
|
@ -220,9 +220,11 @@ SET citus.max_task_string_size TO 20000;
|
|||
-- error message may vary between executions
|
||||
-- hiding warning and error message
|
||||
-- no output means the query has failed
|
||||
SET client_min_messages to FATAL;
|
||||
SET client_min_messages to ERROR;
|
||||
|
||||
SELECT raise_failed_execution('
|
||||
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||
');
|
||||
|
||||
-- following will succeed since it fetches few columns
|
||||
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||
|
|
|
@ -106,3 +106,14 @@ ORDER BY a.attrelid, a.attnum;
|
|||
|
||||
$desc_views$
|
||||
);
|
||||
|
||||
-- Create a function to make sure that queries returning the same result
|
||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||
BEGIN
|
||||
EXECUTE query;
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||
RAISE 'Task failed to execute';
|
||||
END IF;
|
||||
END;
|
||||
$$LANGUAGE plpgsql;
|
||||
|
|
Loading…
Reference in New Issue