mirror of https://github.com/citusdata/citus.git
Merge pull request #4666 from citusdata/write_to_local
Allow local execution for co-located intermediate results in COPYpull/4659/head
commit
c2480343c7
|
@ -30,20 +30,25 @@
|
|||
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||
/* managed via GUC, default is 512 kB */
|
||||
int LocalCopyFlushThresholdByte = 512 * 1024;
|
||||
|
||||
|
||||
static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
CopyOutState localCopyOutState);
|
||||
|
||||
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
|
||||
static bool ShouldSendCopyNow(StringInfo buffer);
|
||||
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
|
||||
CopyStmt *copyStatement, bool isEndOfCopy);
|
||||
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
|
||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||
|
||||
|
||||
/*
|
||||
* LocalCopyBuffer is used in copy callback to return the copied rows.
|
||||
|
@ -52,6 +57,7 @@ static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
|
|||
*/
|
||||
static StringInfo LocalCopyBuffer;
|
||||
|
||||
|
||||
/*
|
||||
* WriteTupleToLocalShard adds the given slot and does a local copy if
|
||||
* this is the end of copy, or the buffer size exceeds the threshold.
|
||||
|
@ -94,6 +100,25 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* WriteTupleToLocalFile adds the given slot and does a local copy to the
|
||||
* file if the buffer size exceeds the threshold.
|
||||
*/
|
||||
void
|
||||
WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
int64 shardId, CopyOutState localFileCopyOutState,
|
||||
FileCompat *fileCompat)
|
||||
{
|
||||
AddSlotToBuffer(slot, copyDest, localFileCopyOutState);
|
||||
|
||||
if (ShouldSendCopyNow(localFileCopyOutState->fe_msgbuf))
|
||||
{
|
||||
WriteToLocalFile(localFileCopyOutState->fe_msgbuf, fileCompat);
|
||||
resetStringInfo(localFileCopyOutState->fe_msgbuf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLocalCopyToShard finishes local copy for the given shard with the shard id.
|
||||
*/
|
||||
|
@ -112,6 +137,26 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLocalCopyToFile finishes local copy for the given file.
|
||||
*/
|
||||
void
|
||||
FinishLocalCopyToFile(CopyOutState localFileCopyOutState, FileCompat *fileCompat)
|
||||
{
|
||||
StringInfo data = localFileCopyOutState->fe_msgbuf;
|
||||
|
||||
bool isBinaryCopy = localFileCopyOutState->binary;
|
||||
if (isBinaryCopy)
|
||||
{
|
||||
AppendCopyBinaryFooters(localFileCopyOutState);
|
||||
}
|
||||
WriteToLocalFile(data, fileCompat);
|
||||
resetStringInfo(localFileCopyOutState->fe_msgbuf);
|
||||
|
||||
FileClose(fileCompat->fd);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddSlotToBuffer serializes the given slot and adds it to
|
||||
* the buffer in localCopyOutState.
|
||||
|
@ -138,7 +183,8 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutSt
|
|||
static bool
|
||||
ShouldSendCopyNow(StringInfo buffer)
|
||||
{
|
||||
return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD;
|
||||
/* LocalCopyFlushThreshold is in bytes */
|
||||
return buffer->len > LocalCopyFlushThresholdByte;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -92,6 +92,7 @@
|
|||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/transmit.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/foreign.h"
|
||||
|
||||
|
@ -122,6 +123,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
|||
*/
|
||||
#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024)
|
||||
|
||||
#define FILE_IS_OPEN(x) (x > -1)
|
||||
|
||||
typedef struct CopyShardState CopyShardState;
|
||||
typedef struct CopyPlacementState CopyPlacementState;
|
||||
|
||||
|
@ -195,9 +198,12 @@ struct CopyShardState
|
|||
/* Used as hash key. */
|
||||
uint64 shardId;
|
||||
|
||||
/* used for doing local copy */
|
||||
/* used for doing local copy, either for a shard or a co-located file */
|
||||
CopyOutState copyOutState;
|
||||
|
||||
/* used when copy is targeting co-located file */
|
||||
FileCompat fileDest;
|
||||
|
||||
/* containsLocalPlacement is true if we have a local placement for the shard id of this state */
|
||||
bool containsLocalPlacement;
|
||||
|
||||
|
@ -271,7 +277,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
|
|||
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash, bool stopOnFailure,
|
||||
bool *found, bool shouldUseLocalCopy, CopyOutState
|
||||
copyOutState, bool isCopyToIntermediateFile);
|
||||
copyOutState, bool isColocatedIntermediateResult);
|
||||
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
|
||||
ShardPlacement *placement,
|
||||
bool stopOnFailure);
|
||||
|
@ -283,10 +289,10 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash,
|
|||
const char *hostname, int32 port);
|
||||
static void InitializeCopyShardState(CopyShardState *shardState,
|
||||
HTAB *connectionStateHash,
|
||||
uint64 shardId, bool stopOnFailure, bool
|
||||
canUseLocalCopy,
|
||||
uint64 shardId, bool stopOnFailure,
|
||||
bool canUseLocalCopy,
|
||||
CopyOutState copyOutState,
|
||||
bool isCopyToIntermediateFile);
|
||||
bool colocatedIntermediateResult);
|
||||
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
||||
CopyStmt *copyStatement,
|
||||
CopyOutState copyOutState);
|
||||
|
@ -335,11 +341,14 @@ static bool ContainsLocalPlacement(int64 shardId);
|
|||
static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64
|
||||
processedRowCount);
|
||||
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
|
||||
static void CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
|
||||
CopyShardState *shardState);
|
||||
static void FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest);
|
||||
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
|
||||
static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool
|
||||
isIntermediateResult);
|
||||
static LocalCopyStatus GetLocalCopyStatus(void);
|
||||
static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList);
|
||||
static void LogLocalCopyExecution(uint64 shardId);
|
||||
static void LogLocalCopyToRelationExecution(uint64 shardId);
|
||||
static void LogLocalCopyToFileExecution(uint64 shardId);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -2083,7 +2092,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
|||
copyDest->partitionColumnIndex = partitionColumnIndex;
|
||||
copyDest->executorState = executorState;
|
||||
copyDest->stopOnFailure = stopOnFailure;
|
||||
copyDest->intermediateResultIdPrefix = intermediateResultIdPrefix;
|
||||
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
|
||||
copyDest->memoryContext = CurrentMemoryContext;
|
||||
|
||||
return copyDest;
|
||||
|
@ -2097,22 +2106,13 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
|||
* execution depending on other information.
|
||||
*/
|
||||
static LocalCopyStatus
|
||||
GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult)
|
||||
GetLocalCopyStatus(void)
|
||||
{
|
||||
if (!EnableLocalExecution ||
|
||||
GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
|
||||
{
|
||||
return LOCAL_COPY_DISABLED;
|
||||
}
|
||||
else if (isIntermediateResult)
|
||||
{
|
||||
/*
|
||||
* Intermediate files are written to a file, and files are visible to all
|
||||
* transactions, and we use a custom copy format for copy therefore we will
|
||||
* use the existing logic for that.
|
||||
*/
|
||||
return LOCAL_COPY_DISABLED;
|
||||
}
|
||||
else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
|
||||
{
|
||||
/*
|
||||
|
@ -2291,9 +2291,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
/* define the template for the COPY statement that is sent to workers */
|
||||
CopyStmt *copyStatement = makeNode(CopyStmt);
|
||||
|
||||
if (copyDest->intermediateResultIdPrefix != NULL)
|
||||
if (copyDest->colocatedIntermediateResultIdPrefix != NULL)
|
||||
{
|
||||
copyStatement->relation = makeRangeVar(NULL, copyDest->intermediateResultIdPrefix,
|
||||
copyStatement->relation = makeRangeVar(NULL,
|
||||
copyDest->
|
||||
colocatedIntermediateResultIdPrefix,
|
||||
-1);
|
||||
|
||||
DefElem *formatResultOption = makeDefElem("format", (Node *) makeString("result"),
|
||||
|
@ -2342,9 +2344,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
*/
|
||||
EnsureConnectionPossibilityForRemotePrimaryNodes();
|
||||
|
||||
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||
LocalCopyStatus localCopyStatus =
|
||||
GetLocalCopyStatus(shardIntervalList, isIntermediateResult);
|
||||
LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
|
||||
if (localCopyStatus == LOCAL_COPY_DISABLED)
|
||||
{
|
||||
copyDest->shouldUseLocalCopy = false;
|
||||
|
@ -2443,15 +2443,16 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
|
||||
/* connections hash is kept in memory context */
|
||||
MemoryContextSwitchTo(copyDest->memoryContext);
|
||||
bool isColocatedIntermediateResult =
|
||||
copyDest->colocatedIntermediateResultIdPrefix != NULL;
|
||||
|
||||
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
|
||||
copyDest->connectionStateHash,
|
||||
stopOnFailure,
|
||||
&cachedShardStateFound,
|
||||
copyDest->shouldUseLocalCopy,
|
||||
copyDest->copyOutState,
|
||||
isIntermediateResult);
|
||||
isColocatedIntermediateResult);
|
||||
if (!cachedShardStateFound)
|
||||
{
|
||||
firstTupleInShard = true;
|
||||
|
@ -2472,12 +2473,22 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
}
|
||||
}
|
||||
|
||||
if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
|
||||
if (isColocatedIntermediateResult && copyDest->shouldUseLocalCopy &&
|
||||
shardState->containsLocalPlacement)
|
||||
{
|
||||
if (firstTupleInShard)
|
||||
{
|
||||
CreateLocalColocatedIntermediateFile(copyDest, shardState);
|
||||
}
|
||||
|
||||
WriteTupleToLocalFile(slot, copyDest, shardId,
|
||||
shardState->copyOutState, &shardState->fileDest);
|
||||
}
|
||||
else if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
|
||||
{
|
||||
WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState);
|
||||
}
|
||||
|
||||
|
||||
foreach(placementStateCell, shardState->placementStateList)
|
||||
{
|
||||
CopyPlacementState *currentPlacementState = lfirst(placementStateCell);
|
||||
|
@ -2693,6 +2704,8 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
|||
Relation distributedRelation = copyDest->distributedRelation;
|
||||
|
||||
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
||||
|
||||
FinishLocalColocatedIntermediateFiles(copyDest);
|
||||
FinishLocalCopy(copyDest);
|
||||
|
||||
PG_TRY();
|
||||
|
@ -2743,6 +2756,61 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateLocalColocatedIntermediateFile creates a co-located file for the given
|
||||
* shard, and appends the binary headers if needed. The function also modifies
|
||||
* shardState to set the fileDest and copyOutState.
|
||||
*/
|
||||
static void
|
||||
CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
|
||||
CopyShardState *shardState)
|
||||
{
|
||||
/* make sure the directory exists */
|
||||
CreateIntermediateResultsDirectory();
|
||||
|
||||
const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC);
|
||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||
|
||||
StringInfo filePath = makeStringInfo();
|
||||
appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix,
|
||||
shardState->shardId);
|
||||
|
||||
const char *fileName = QueryResultFileName(filePath->data);
|
||||
shardState->fileDest =
|
||||
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags, fileMode));
|
||||
|
||||
CopyOutState localFileCopyOutState = shardState->copyOutState;
|
||||
bool isBinaryCopy = localFileCopyOutState->binary;
|
||||
if (isBinaryCopy)
|
||||
{
|
||||
AppendCopyBinaryHeaders(localFileCopyOutState);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLocalColocatedIntermediateFiles iterates over all the colocated
|
||||
* intermediate files and finishes the COPY on all of them.
|
||||
*/
|
||||
static void
|
||||
FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest)
|
||||
{
|
||||
HTAB *shardStateHash = copyDest->shardStateHash;
|
||||
HASH_SEQ_STATUS status;
|
||||
CopyShardState *copyShardState;
|
||||
|
||||
foreach_htab(copyShardState, &status, shardStateHash)
|
||||
{
|
||||
if (copyShardState->copyOutState != NULL &&
|
||||
FILE_IS_OPEN(copyShardState->fileDest.fd))
|
||||
{
|
||||
FinishLocalCopyToFile(copyShardState->copyOutState,
|
||||
©ShardState->fileDest);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShutdownCopyConnectionState ends the copy command for the current active
|
||||
* placement on connection, and then sends the rest of the buffers over the
|
||||
|
@ -3438,8 +3506,8 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32
|
|||
static CopyShardState *
|
||||
GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
|
||||
shouldUseLocalCopy, CopyOutState copyOutState, bool
|
||||
isCopyToIntermediateFile)
|
||||
shouldUseLocalCopy, CopyOutState copyOutState,
|
||||
bool isColocatedIntermediateResult)
|
||||
{
|
||||
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
|
||||
HASH_ENTER, found);
|
||||
|
@ -3447,7 +3515,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
|
|||
{
|
||||
InitializeCopyShardState(shardState, connectionStateHash,
|
||||
shardId, stopOnFailure, shouldUseLocalCopy,
|
||||
copyOutState, isCopyToIntermediateFile);
|
||||
copyOutState, isColocatedIntermediateResult);
|
||||
}
|
||||
|
||||
return shardState;
|
||||
|
@ -3462,8 +3530,9 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
|
|||
static void
|
||||
InitializeCopyShardState(CopyShardState *shardState,
|
||||
HTAB *connectionStateHash, uint64 shardId,
|
||||
bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
|
||||
copyOutState, bool isCopyToIntermediateFile)
|
||||
bool stopOnFailure, bool shouldUseLocalCopy,
|
||||
CopyOutState copyOutState,
|
||||
bool colocatedIntermediateResult)
|
||||
{
|
||||
ListCell *placementCell = NULL;
|
||||
int failedPlacementCount = 0;
|
||||
|
@ -3487,7 +3556,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
shardState->placementStateList = NIL;
|
||||
shardState->copyOutState = NULL;
|
||||
shardState->containsLocalPlacement = ContainsLocalPlacement(shardId);
|
||||
|
||||
shardState->fileDest.fd = -1;
|
||||
|
||||
foreach(placementCell, activePlacementList)
|
||||
{
|
||||
|
@ -3497,22 +3566,17 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
{
|
||||
shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState));
|
||||
CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState);
|
||||
LogLocalCopyExecution(shardId);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
||||
if (placement->groupId == GetLocalGroupId())
|
||||
{
|
||||
/*
|
||||
* if we are copying into an intermediate file we won't use local copy.
|
||||
* Files are visible to all transactions so we can still use local execution, therefore
|
||||
* we shouldn't restrict only using connection in this case.
|
||||
*/
|
||||
if (!isCopyToIntermediateFile)
|
||||
if (colocatedIntermediateResult)
|
||||
{
|
||||
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
||||
LogLocalCopyToFileExecution(shardId);
|
||||
}
|
||||
else
|
||||
{
|
||||
LogLocalCopyToRelationExecution(shardId);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
MultiConnection *connection =
|
||||
|
@ -3591,11 +3655,11 @@ CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to)
|
|||
|
||||
|
||||
/*
|
||||
* LogLocalCopyExecution logs that the copy will be done locally for
|
||||
* the given shard.
|
||||
* LogLocalCopyToRelationExecution logs that the copy will be done
|
||||
* locally for the given shard.
|
||||
*/
|
||||
static void
|
||||
LogLocalCopyExecution(uint64 shardId)
|
||||
LogLocalCopyToRelationExecution(uint64 shardId)
|
||||
{
|
||||
if (!(LogRemoteCommands || LogLocalCommands))
|
||||
{
|
||||
|
@ -3605,6 +3669,22 @@ LogLocalCopyExecution(uint64 shardId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LogLocalCopyToFileExecution logs that the copy will be done locally for
|
||||
* a file colocated to the given shard.
|
||||
*/
|
||||
static void
|
||||
LogLocalCopyToFileExecution(uint64 shardId)
|
||||
{
|
||||
if (!(LogRemoteCommands || LogLocalCommands))
|
||||
{
|
||||
return;
|
||||
}
|
||||
ereport(NOTICE, (errmsg("executing the copy locally for colocated file with "
|
||||
"shard %lu", shardId)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyGetPlacementConnection assigns a connection to the given placement. If
|
||||
* a connection has already been assigned the placement in the current transaction
|
||||
|
|
|
@ -86,7 +86,6 @@ static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
TupleDesc inputTupleDescriptor);
|
||||
static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest);
|
||||
static StringInfo ConstructCopyResultStatement(const char *resultId);
|
||||
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
||||
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
||||
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
|
||||
static void SendCopyDataOverConnection(StringInfo dataBuffer,
|
||||
|
@ -438,7 +437,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
|||
/*
|
||||
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat)
|
||||
{
|
||||
int bytesWritten = FileWriteCompat(fileCompat, copyData->data,
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#include "distributed/distributed_deadlock_detection.h"
|
||||
#include "distributed/insert_select_executor.h"
|
||||
#include "distributed/intermediate_result_pruning.h"
|
||||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/local_distributed_join_planner.h"
|
||||
#include "distributed/locally_reserved_shared_connections.h"
|
||||
|
@ -684,6 +685,16 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL,
|
||||
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.local_copy_flush_threshold",
|
||||
gettext_noop("Sets the threshold for local copy to be flushed."),
|
||||
NULL,
|
||||
&LocalCopyFlushThresholdByte,
|
||||
512 * 1024, 1, INT_MAX,
|
||||
PGC_USERSET,
|
||||
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.local_shared_pool_size",
|
||||
gettext_noop(
|
||||
|
|
|
@ -131,8 +131,13 @@ typedef struct CitusCopyDestReceiver
|
|||
/* if true, should copy to local placements in the current session */
|
||||
bool shouldUseLocalCopy;
|
||||
|
||||
/* copy into intermediate result */
|
||||
char *intermediateResultIdPrefix;
|
||||
/*
|
||||
* Copy into colocated intermediate result. When this is set, the
|
||||
* COPY assumes there are hypothetical colocated shards to the
|
||||
* relation that are files. And, COPY writes the data to the
|
||||
* files as if they are shards.
|
||||
*/
|
||||
char *colocatedIntermediateResultIdPrefix;
|
||||
} CitusCopyDestReceiver;
|
||||
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
|
|||
EState *executorState,
|
||||
List *initialNodeList, bool
|
||||
writeLocalFile);
|
||||
extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
|
||||
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
|
||||
extern void SendQueryResultViaCopy(const char *resultId);
|
||||
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
||||
|
|
|
@ -3,17 +3,24 @@
|
|||
#define LOCAL_MULTI_COPY
|
||||
|
||||
/*
|
||||
* LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed.
|
||||
* LocalCopyFlushThresholdByte is the threshold for local copy to be flushed.
|
||||
* There will be one buffer for each local placement, when the buffer size
|
||||
* exceeds this threshold, it will be flushed.
|
||||
*
|
||||
* Managed via GUC, the default is 512 kB.
|
||||
*/
|
||||
#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024)
|
||||
extern int LocalCopyFlushThresholdByte;
|
||||
|
||||
extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
int64
|
||||
shardId,
|
||||
CopyOutState localCopyOutState);
|
||||
extern void WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
int64 shardId, CopyOutState localFileCopyOutState,
|
||||
FileCompat *fileCompat);
|
||||
extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
|
||||
CopyOutState localCopyOutState);
|
||||
extern void FinishLocalCopyToFile(CopyOutState localFileCopyOutState,
|
||||
FileCompat *fileCompat);
|
||||
|
||||
#endif /* LOCAL_MULTI_COPY */
|
||||
|
|
|
@ -401,6 +401,7 @@ BEGIN;
|
|||
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
|
||||
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
|
||||
|
@ -531,7 +532,7 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
|
|||
|
||||
ROLLBACK;
|
||||
RESET citus.enable_cte_inlining;
|
||||
CREATE table ref_table(x int, y int);
|
||||
CREATE table ref_table(x int PRIMARY KEY, y int);
|
||||
-- this will be replicated to the coordinator because of add_coordinator test
|
||||
SELECT create_reference_table('ref_table');
|
||||
create_reference_table
|
||||
|
@ -555,6 +556,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinat
|
|||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- writing to local file and remote intermediate files
|
||||
-- at the same time
|
||||
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
WITH cte_1 AS (
|
||||
INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT x, y FROM coordinator_shouldhaveshards.ref_table_1503039 ref_table LIMIT 10000
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_table_1503039 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_result('insert_select_XXX_1503039'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = (excluded.y OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.x, citus_table_alias.y
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
100
|
||||
(1 row)
|
||||
|
||||
-- issue #4237: preventing empty placement creation on coordinator
|
||||
CREATE TABLE test_append_table(a int);
|
||||
SELECT create_distributed_table('test_append_table', 'a', 'append');
|
||||
|
|
|
@ -283,6 +283,7 @@ RETURNING *;
|
|||
-- can be executed locally
|
||||
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
|
||||
NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING
|
||||
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
|
||||
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
|
||||
|
|
|
@ -167,6 +167,14 @@ SELECT create_distributed_table('public.another_schema_table', 'a');
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type);
|
||||
SELECT create_distributed_table('non_binary_copy_test', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i;
|
||||
-- Confirm the basics work
|
||||
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
|
||||
SELECT * FROM test WHERE x = 1;
|
||||
|
@ -236,21 +244,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
|
|||
SET citus.log_remote_commands to true;
|
||||
-- observe that there is a conflict and the following query does nothing
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- same as the above with different syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- again the same query with another syntax
|
||||
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630515 DO NOTHING RETURNING part_key, other_col, third_col
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630519 DO NOTHING RETURNING part_key, other_col, third_col
|
||||
part_key | other_col | third_col
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
@ -258,7 +266,7 @@ NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_9063
|
|||
BEGIN;
|
||||
-- force local execution
|
||||
SELECT count(*) FROM upsert_test WHERE part_key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630515 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630519 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
|
@ -344,10 +352,10 @@ SET search_path TO single_node;
|
|||
DROP SCHEMA "Quoed.Schema" CASCADE;
|
||||
NOTICE: drop cascades to 5 other objects
|
||||
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630520
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630521
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630522
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630523
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630524
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630525
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630526
|
||||
drop cascades to table "Quoed.Schema".simple_table_name_90630527
|
||||
-- test partitioned index creation with long name
|
||||
CREATE TABLE test_index_creation1
|
||||
(
|
||||
|
@ -1591,6 +1599,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
|
|||
7
|
||||
(1 row)
|
||||
|
||||
-- this is to get ready for the next tests
|
||||
TRUNCATE another_schema_table;
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
-- copy can use local execution even if there is no connection available
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
|
@ -1601,6 +1615,176 @@ NOTICE: executing the copy locally for shard xxxxx
|
|||
CONTEXT: COPY another_schema_table, line 3: "3"
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY another_schema_table, line 6: "6"
|
||||
-- INSERT .. SELECT with co-located intermediate results
|
||||
SET citus.log_remote_commands to false;
|
||||
CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a);
|
||||
SET citus.log_local_commands to true;
|
||||
INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
|
||||
INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
1 |
|
||||
2 |
|
||||
3 |
|
||||
4 |
|
||||
5 |
|
||||
6 |
|
||||
7 |
|
||||
8 |
|
||||
9 |
|
||||
10 |
|
||||
(10 rows)
|
||||
|
||||
-- INSERT .. SELECT with co-located intermediate result for non-binary input
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value single_node.new_type)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1001
|
||||
(1 row)
|
||||
|
||||
-- test with NULL columns
|
||||
ALTER TABLE non_binary_copy_test ADD COLUMN z INT;
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630515, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630516, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630517, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
||||
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630518, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
||||
SELECT bool_and(z is null) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- test with type coersion (int -> text) and also NULL values with coersion
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
|
||||
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
|
||||
NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS count, count(DISTINCT (z)::text) AS count FROM (SELECT intermediate_result.key, intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, z integer)) cte_1
|
||||
count | count
|
||||
---------------------------------------------------------------------
|
||||
1001 | 0
|
||||
(1 row)
|
||||
|
||||
-- lets flush the copy often to make sure everyhing is fine
|
||||
SET citus.local_copy_flush_threshold TO 1;
|
||||
TRUNCATE another_schema_table;
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
|
||||
INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
||||
SELECT bool_and(z is null) FROM cte_1;
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the copy locally for colocated file with shard xxxxx
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
|
||||
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
|
||||
bool_and
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
|
|
@ -221,7 +221,7 @@ SELECT create_distributed_table('dist_table1', 'a');
|
|||
ROLLBACK;
|
||||
|
||||
RESET citus.enable_cte_inlining;
|
||||
CREATE table ref_table(x int, y int);
|
||||
CREATE table ref_table(x int PRIMARY KEY, y int);
|
||||
-- this will be replicated to the coordinator because of add_coordinator test
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
||||
|
@ -233,6 +233,14 @@ INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
|
|||
SELECT COUNT(*) FROM test JOIN ref_table USING(x);
|
||||
ROLLBACK;
|
||||
|
||||
-- writing to local file and remote intermediate files
|
||||
-- at the same time
|
||||
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
|
||||
|
||||
WITH cte_1 AS (
|
||||
INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
|
||||
-- issue #4237: preventing empty placement creation on coordinator
|
||||
CREATE TABLE test_append_table(a int);
|
||||
SELECT create_distributed_table('test_append_table', 'a', 'append');
|
||||
|
|
|
@ -79,6 +79,10 @@ CREATE TABLE local(c int, d int);
|
|||
CREATE TABLE public.another_schema_table(a int, b int);
|
||||
SELECT create_distributed_table('public.another_schema_table', 'a');
|
||||
|
||||
CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type);
|
||||
SELECT create_distributed_table('non_binary_copy_test', 'key');
|
||||
INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i;
|
||||
|
||||
-- Confirm the basics work
|
||||
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
|
||||
SELECT * FROM test WHERE x = 1;
|
||||
|
@ -783,7 +787,6 @@ ROLLBACK;
|
|||
\c - - - :master_port
|
||||
SET search_path TO single_node;
|
||||
|
||||
|
||||
-- simulate that even if there is no connection slots
|
||||
-- to connect, Citus can switch to local execution
|
||||
SET citus.force_max_query_parallelization TO false;
|
||||
|
@ -817,9 +820,48 @@ ROLLBACK;
|
|||
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||
SELECT count(*) FROM cte_1;
|
||||
|
||||
-- this is to get ready for the next tests
|
||||
TRUNCATE another_schema_table;
|
||||
|
||||
-- copy can use local execution even if there is no connection available
|
||||
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
|
||||
|
||||
-- INSERT .. SELECT with co-located intermediate results
|
||||
SET citus.log_remote_commands to false;
|
||||
CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a);
|
||||
|
||||
SET citus.log_local_commands to true;
|
||||
INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING;
|
||||
INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *;
|
||||
|
||||
-- INSERT .. SELECT with co-located intermediate result for non-binary input
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value)
|
||||
SELECT count(*) FROM cte_1;
|
||||
|
||||
-- test with NULL columns
|
||||
ALTER TABLE non_binary_copy_test ADD COLUMN z INT;
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
||||
SELECT bool_and(z is null) FROM cte_1;
|
||||
|
||||
-- test with type coersion (int -> text) and also NULL values with coersion
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
|
||||
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
|
||||
|
||||
-- lets flush the copy often to make sure everyhing is fine
|
||||
SET citus.local_copy_flush_threshold TO 1;
|
||||
TRUNCATE another_schema_table;
|
||||
INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i;
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *)
|
||||
SELECT count(*) FROM cte_1;
|
||||
WITH cte_1 AS
|
||||
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
|
||||
SELECT bool_and(z is null) FROM cte_1;
|
||||
|
||||
RESET citus.local_copy_flush_threshold;
|
||||
-- if the local execution is disabled, we cannot failover to
|
||||
-- local execution and the queries would fail
|
||||
SET citus.enable_local_execution TO false;
|
||||
|
|
Loading…
Reference in New Issue