mirror of https://github.com/citusdata/citus.git
Merge pull request #3557 from citusdata/enh/localExecutionCopy
add local copy executionpull/3619/head^2
commit
e5a2bbb2bd
|
@ -0,0 +1,214 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* local_multi_copy.c
|
||||
* Commands for running a copy locally
|
||||
*
|
||||
* For each local placement, we have a buffer. When we receive a slot
|
||||
* from a copy, the slot will be put to the corresponding buffer based
|
||||
* on the shard id. When the buffer size exceeds the threshold a local
|
||||
* copy will be done. Also If we reach to the end of copy, we will send
|
||||
* the current buffer for local copy.
|
||||
*
|
||||
* The existing logic from multi_copy.c and format are used, therefore
|
||||
* even if user did not do a copy with binary format, it is possible that
|
||||
* we are going to be using binary format internally.
|
||||
*
|
||||
*
|
||||
* Copyright (c) Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "commands/copy.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "parser/parse_relation.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "safe_lib.h"
|
||||
#include <netinet/in.h> /* for htons */
|
||||
|
||||
#include "distributed/transmit.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/shard_utils.h"
|
||||
|
||||
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
|
||||
static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
CopyOutState localCopyOutState);
|
||||
|
||||
static bool ShouldSendCopyNow(StringInfo buffer);
|
||||
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
|
||||
CopyStmt *copyStatement, bool isEndOfCopy);
|
||||
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
|
||||
|
||||
/*
|
||||
* LocalCopyBuffer is used in copy callback to return the copied rows.
|
||||
* The reason this is a global variable is that we cannot pass an additional
|
||||
* argument to the copy callback.
|
||||
*/
|
||||
static StringInfo LocalCopyBuffer;
|
||||
|
||||
/*
|
||||
* WriteTupleToLocalShard adds the given slot and does a local copy if
|
||||
* this is the end of copy, or the buffer size exceeds the threshold.
|
||||
*/
|
||||
void
|
||||
WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64
|
||||
shardId,
|
||||
CopyOutState localCopyOutState)
|
||||
{
|
||||
/*
|
||||
* Since we are doing a local copy, the following statements should
|
||||
* use local execution to see the changes
|
||||
*/
|
||||
TransactionAccessedLocalPlacement = true;
|
||||
|
||||
bool isBinaryCopy = localCopyOutState->binary;
|
||||
if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy))
|
||||
{
|
||||
AppendCopyBinaryHeaders(localCopyOutState);
|
||||
}
|
||||
|
||||
AddSlotToBuffer(slot, copyDest, localCopyOutState);
|
||||
|
||||
if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf))
|
||||
{
|
||||
if (isBinaryCopy)
|
||||
{
|
||||
/*
|
||||
* We're going to flush the buffer to disk by effectively doing a full
|
||||
* COPY command. Hence we also need to add footers to the current buffer.
|
||||
*/
|
||||
AppendCopyBinaryFooters(localCopyOutState);
|
||||
}
|
||||
bool isEndOfCopy = false;
|
||||
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId,
|
||||
shardId,
|
||||
copyDest->copyStatement, isEndOfCopy);
|
||||
resetStringInfo(localCopyOutState->fe_msgbuf);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLocalCopyToShard finishes local copy for the given shard with the shard id.
|
||||
*/
|
||||
void
|
||||
FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
|
||||
CopyOutState localCopyOutState)
|
||||
{
|
||||
bool isBinaryCopy = localCopyOutState->binary;
|
||||
if (isBinaryCopy)
|
||||
{
|
||||
AppendCopyBinaryFooters(localCopyOutState);
|
||||
}
|
||||
bool isEndOfCopy = true;
|
||||
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId,
|
||||
copyDest->copyStatement, isEndOfCopy);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AddSlotToBuffer serializes the given slot and adds it to
|
||||
* the buffer in localCopyOutState.
|
||||
*/
|
||||
static void
|
||||
AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState
|
||||
localCopyOutState)
|
||||
{
|
||||
Datum *columnValues = slot->tts_values;
|
||||
bool *columnNulls = slot->tts_isnull;
|
||||
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
||||
CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths;
|
||||
|
||||
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
|
||||
localCopyOutState, columnOutputFunctions,
|
||||
columnCoercionPaths);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSendCopyNow returns true if the given buffer size exceeds the
|
||||
* local copy buffer size threshold.
|
||||
*/
|
||||
static bool
|
||||
ShouldSendCopyNow(StringInfo buffer)
|
||||
{
|
||||
return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DoLocalCopy finds the shard table from the distributed relation id, and copies the given
|
||||
* buffer into the shard.
|
||||
* CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer
|
||||
* as though it was reading from stdin. It then parses the tuples and
|
||||
* writes them to the shardOid table.
|
||||
*/
|
||||
static void
|
||||
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
|
||||
bool isEndOfCopy)
|
||||
{
|
||||
/*
|
||||
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback
|
||||
* to read from it. We cannot pass additional arguments to
|
||||
* ReadFromLocalBufferCallback.
|
||||
*/
|
||||
LocalCopyBuffer = buffer;
|
||||
|
||||
Oid shardOid = GetShardLocalTableOid(relationId, shardId);
|
||||
Relation shard = heap_open(shardOid, RowExclusiveLock);
|
||||
ParseState *pState = make_parsestate(NULL);
|
||||
|
||||
/* p_rtable of pState is set so that we can check constraints. */
|
||||
pState->p_rtable = CreateRangeTable(shard, ACL_INSERT);
|
||||
|
||||
CopyState cstate = BeginCopyFrom(pState, shard, NULL, false,
|
||||
ReadFromLocalBufferCallback,
|
||||
copyStatement->attlist, copyStatement->options);
|
||||
CopyFrom(cstate);
|
||||
EndCopyFrom(cstate);
|
||||
|
||||
heap_close(shard, NoLock);
|
||||
free_parsestate(pState);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldAddBinaryHeaders returns true if the given buffer
|
||||
* is empty and the format is binary.
|
||||
*/
|
||||
static bool
|
||||
ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary)
|
||||
{
|
||||
if (!isBinary)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
return buffer->len == 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReadFromLocalBufferCallback is the copy callback.
|
||||
* It always tries to copy maxRead bytes.
|
||||
*/
|
||||
static int
|
||||
ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead)
|
||||
{
|
||||
int bytesRead = 0;
|
||||
int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor;
|
||||
int bytesToRead = Min(avail, maxRead);
|
||||
if (bytesToRead > 0)
|
||||
{
|
||||
memcpy_s(outBuf, bytesToRead,
|
||||
&LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead);
|
||||
}
|
||||
bytesRead += bytesToRead;
|
||||
LocalCopyBuffer->cursor += bytesToRead;
|
||||
|
||||
return bytesRead;
|
||||
}
|
|
@ -85,6 +85,8 @@
|
|||
#include "distributed/shard_pruning.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_protocol.h"
|
||||
#include "distributed/local_multi_copy.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "executor/executor.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "libpq/libpq.h"
|
||||
|
@ -162,6 +164,9 @@ struct CopyPlacementState
|
|||
/* State of shard to which the placement belongs to. */
|
||||
CopyShardState *shardState;
|
||||
|
||||
/* node group ID of the placement */
|
||||
int32 groupId;
|
||||
|
||||
/*
|
||||
* Buffered COPY data. When the placement is activePlacementState of
|
||||
* some connection, this is empty. Because in that case we directly
|
||||
|
@ -178,6 +183,12 @@ struct CopyShardState
|
|||
/* Used as hash key. */
|
||||
uint64 shardId;
|
||||
|
||||
/* used for doing local copy */
|
||||
CopyOutState copyOutState;
|
||||
|
||||
/* containsLocalPlacement is true if we have a local placement for the shard id of this state */
|
||||
bool containsLocalPlacement;
|
||||
|
||||
/* List of CopyPlacementStates for all active placements of the shard. */
|
||||
List *placementStateList;
|
||||
};
|
||||
|
@ -232,13 +243,16 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
|
|||
MultiConnection *connection);
|
||||
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash, bool stopOnFailure,
|
||||
bool *found);
|
||||
bool *found, bool shouldUseLocalCopy, CopyOutState
|
||||
copyOutState);
|
||||
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement,
|
||||
bool stopOnFailure);
|
||||
static List * ConnectionStateList(HTAB *connectionStateHash);
|
||||
static void InitializeCopyShardState(CopyShardState *shardState,
|
||||
HTAB *connectionStateHash,
|
||||
uint64 shardId, bool stopOnFailure);
|
||||
uint64 shardId, bool stopOnFailure, bool
|
||||
canUseLocalCopy,
|
||||
CopyOutState copyOutState);
|
||||
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
|
||||
CopyStmt *copyStatement,
|
||||
CopyOutState copyOutState);
|
||||
|
@ -274,6 +288,11 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot,
|
|||
DestReceiver *copyDest);
|
||||
static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
|
||||
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
|
||||
static bool ContainsLocalPlacement(int64 shardId);
|
||||
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
|
||||
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
|
||||
static bool ShouldExecuteCopyLocally(bool isIntermediateResult);
|
||||
static void LogLocalCopyExecution(uint64 shardId);
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -1985,6 +2004,55 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldExecuteCopyLocally returns true if the current copy
|
||||
* operation should be done locally for local placements.
|
||||
*/
|
||||
static bool
|
||||
ShouldExecuteCopyLocally(bool isIntermediateResult)
|
||||
{
|
||||
if (!EnableLocalExecution)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
* Intermediate files are written to a file, and files are visible to all
|
||||
* transactions, and we use a custom copy format for copy therefore we will
|
||||
* use the existing logic for that.
|
||||
*/
|
||||
if (isIntermediateResult)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (TransactionAccessedLocalPlacement)
|
||||
{
|
||||
/*
|
||||
* For various reasons, including the transaction visibility
|
||||
* rules (e.g., read-your-own-writes), we have to use local
|
||||
* execution again if it has already happened within this
|
||||
* transaction block.
|
||||
*
|
||||
* We might error out later in the execution if it is not suitable
|
||||
* to execute the tasks locally.
|
||||
*/
|
||||
Assert(IsMultiStatementTransaction() || InCoordinatedTransaction());
|
||||
|
||||
/*
|
||||
* TODO: A future improvement could be to keep track of which placements
|
||||
* have been locally executed. At this point, only use local execution for
|
||||
* those placements. That'd help to benefit more from parallelism.
|
||||
*/
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */
|
||||
return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusCopyDestReceiverStartup implements the rStartup interface of
|
||||
* CitusCopyDestReceiver. It opens the relation, acquires necessary
|
||||
|
@ -1996,6 +2064,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
{
|
||||
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
|
||||
|
||||
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
|
||||
copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult);
|
||||
Oid tableId = copyDest->distributedRelationId;
|
||||
|
||||
char *relationName = get_rel_name(tableId);
|
||||
|
@ -2013,9 +2083,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
const char *delimiterCharacter = "\t";
|
||||
const char *nullPrintCharacter = "\\N";
|
||||
|
||||
/* Citus currently doesn't know how to handle COPY command locally */
|
||||
ErrorIfTransactionAccessedPlacementsLocally();
|
||||
|
||||
/* look up table properties */
|
||||
Relation distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId);
|
||||
|
@ -2145,6 +2212,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
copyStatement->query = NULL;
|
||||
copyStatement->attlist = attributeList;
|
||||
copyStatement->is_from = true;
|
||||
|
@ -2228,7 +2296,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
|
||||
copyDest->connectionStateHash,
|
||||
stopOnFailure,
|
||||
&cachedShardStateFound);
|
||||
&cachedShardStateFound,
|
||||
copyDest->shouldUseLocalCopy,
|
||||
copyDest->copyOutState);
|
||||
if (!cachedShardStateFound)
|
||||
{
|
||||
firstTupleInShard = true;
|
||||
|
@ -2249,6 +2319,12 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
}
|
||||
}
|
||||
|
||||
if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
|
||||
{
|
||||
WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState);
|
||||
}
|
||||
|
||||
|
||||
foreach(placementStateCell, shardState->placementStateList)
|
||||
{
|
||||
CopyPlacementState *currentPlacementState = lfirst(placementStateCell);
|
||||
|
@ -2276,6 +2352,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
{
|
||||
StartPlacementStateCopyCommand(currentPlacementState, copyStatement,
|
||||
copyOutState);
|
||||
|
||||
dlist_delete(¤tPlacementState->bufferedPlacementNode);
|
||||
connectionState->activePlacementState = currentPlacementState;
|
||||
|
||||
|
@ -2330,6 +2407,30 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ContainsLocalPlacement returns true if the current node has
|
||||
* a local placement for the given shard id.
|
||||
*/
|
||||
static bool
|
||||
ContainsLocalPlacement(int64 shardId)
|
||||
{
|
||||
ListCell *placementCell = NULL;
|
||||
List *activePlacementList = ActiveShardPlacementList(shardId);
|
||||
int32 localGroupId = GetLocalGroupId();
|
||||
|
||||
foreach(placementCell, activePlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||
|
||||
if (placement->groupId == localGroupId)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardIdForTuple returns id of the shard to which the given tuple belongs to.
|
||||
*/
|
||||
|
@ -2407,6 +2508,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
|||
Relation distributedRelation = copyDest->distributedRelation;
|
||||
|
||||
List *connectionStateList = ConnectionStateList(connectionStateHash);
|
||||
FinishLocalCopy(copyDest);
|
||||
|
||||
PG_TRY();
|
||||
{
|
||||
|
@ -2434,6 +2536,28 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FinishLocalCopy sends the remaining copies for local placements.
|
||||
*/
|
||||
static void
|
||||
FinishLocalCopy(CitusCopyDestReceiver *copyDest)
|
||||
{
|
||||
HTAB *shardStateHash = copyDest->shardStateHash;
|
||||
HASH_SEQ_STATUS status;
|
||||
CopyShardState *copyShardState;
|
||||
|
||||
foreach_htab(copyShardState, &status, shardStateHash)
|
||||
{
|
||||
if (copyShardState->copyOutState != NULL &&
|
||||
copyShardState->copyOutState->fe_msgbuf->len > 0)
|
||||
{
|
||||
FinishLocalCopyToShard(copyDest, copyShardState->shardId,
|
||||
copyShardState->copyOutState);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShutdownCopyConnectionState ends the copy command for the current active
|
||||
* placement on connection, and then sends the rest of the buffers over the
|
||||
|
@ -2864,7 +2988,6 @@ CheckCopyPermissions(CopyStmt *copyStatement)
|
|||
/* *INDENT-OFF* */
|
||||
bool is_from = copyStatement->is_from;
|
||||
Relation rel;
|
||||
Oid relid;
|
||||
List *range_table = NIL;
|
||||
TupleDesc tupDesc;
|
||||
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
|
||||
|
@ -2874,15 +2997,8 @@ CheckCopyPermissions(CopyStmt *copyStatement)
|
|||
rel = heap_openrv(copyStatement->relation,
|
||||
is_from ? RowExclusiveLock : AccessShareLock);
|
||||
|
||||
relid = RelationGetRelid(rel);
|
||||
|
||||
RangeTblEntry *rte = makeNode(RangeTblEntry);
|
||||
rte->rtekind = RTE_RELATION;
|
||||
rte->relid = relid;
|
||||
rte->relkind = rel->rd_rel->relkind;
|
||||
rte->requiredPerms = required_access;
|
||||
range_table = list_make1(rte);
|
||||
|
||||
range_table = CreateRangeTable(rel, required_access);
|
||||
RangeTblEntry *rte = (RangeTblEntry*) linitial(range_table);
|
||||
tupDesc = RelationGetDescr(rel);
|
||||
|
||||
attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist);
|
||||
|
@ -2909,6 +3025,21 @@ CheckCopyPermissions(CopyStmt *copyStatement)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateRangeTable creates a range table with the given relation.
|
||||
*/
|
||||
List *
|
||||
CreateRangeTable(Relation rel, AclMode requiredAccess)
|
||||
{
|
||||
RangeTblEntry *rte = makeNode(RangeTblEntry);
|
||||
rte->rtekind = RTE_RELATION;
|
||||
rte->relid = rel->rd_id;
|
||||
rte->relkind = rel->rd_rel->relkind;
|
||||
rte->requiredPerms = requiredAccess;
|
||||
return list_make1(rte);
|
||||
}
|
||||
|
||||
|
||||
/* Helper for CheckCopyPermissions(), copied from postgres */
|
||||
static List *
|
||||
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
|
||||
|
@ -3087,14 +3218,16 @@ ConnectionStateList(HTAB *connectionStateHash)
|
|||
*/
|
||||
static CopyShardState *
|
||||
GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||
HTAB *connectionStateHash, bool stopOnFailure, bool *found)
|
||||
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
|
||||
shouldUseLocalCopy, CopyOutState copyOutState)
|
||||
{
|
||||
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
|
||||
HASH_ENTER, found);
|
||||
if (!*found)
|
||||
{
|
||||
InitializeCopyShardState(shardState, connectionStateHash,
|
||||
shardId, stopOnFailure);
|
||||
shardId, stopOnFailure, shouldUseLocalCopy,
|
||||
copyOutState);
|
||||
}
|
||||
|
||||
return shardState;
|
||||
|
@ -3109,7 +3242,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
|
|||
static void
|
||||
InitializeCopyShardState(CopyShardState *shardState,
|
||||
HTAB *connectionStateHash, uint64 shardId,
|
||||
bool stopOnFailure)
|
||||
bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
|
||||
copyOutState)
|
||||
{
|
||||
ListCell *placementCell = NULL;
|
||||
int failedPlacementCount = 0;
|
||||
|
@ -3121,6 +3255,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
ALLOCSET_DEFAULT_INITSIZE,
|
||||
ALLOCSET_DEFAULT_MAXSIZE);
|
||||
|
||||
|
||||
/* release active placement list at the end of this function */
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||
|
||||
|
@ -3130,11 +3265,22 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
|
||||
shardState->shardId = shardId;
|
||||
shardState->placementStateList = NIL;
|
||||
shardState->copyOutState = NULL;
|
||||
shardState->containsLocalPlacement = ContainsLocalPlacement(shardId);
|
||||
|
||||
|
||||
foreach(placementCell, activePlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
|
||||
|
||||
if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId())
|
||||
{
|
||||
shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState));
|
||||
CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState);
|
||||
LogLocalCopyExecution(shardId);
|
||||
continue;
|
||||
}
|
||||
|
||||
MultiConnection *connection =
|
||||
CopyGetPlacementConnection(placement, stopOnFailure);
|
||||
if (connection == NULL)
|
||||
|
@ -3158,6 +3304,7 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState));
|
||||
placementState->shardState = shardState;
|
||||
placementState->data = makeStringInfo();
|
||||
placementState->groupId = placement->groupId;
|
||||
placementState->connectionState = connectionState;
|
||||
|
||||
/*
|
||||
|
@ -3188,6 +3335,43 @@ InitializeCopyShardState(CopyShardState *shardState,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new
|
||||
* fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because
|
||||
* in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each
|
||||
* placement and the serialization functions take a CopyOutState as a parameter.
|
||||
*/
|
||||
static void
|
||||
CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to)
|
||||
{
|
||||
to->attnumlist = from->attnumlist;
|
||||
to->binary = from->binary;
|
||||
to->copy_dest = from->copy_dest;
|
||||
to->delim = from->delim;
|
||||
to->file_encoding = from->file_encoding;
|
||||
to->need_transcoding = from->need_transcoding;
|
||||
to->null_print = from->null_print;
|
||||
to->null_print_client = from->null_print_client;
|
||||
to->rowcontext = from->rowcontext;
|
||||
to->fe_msgbuf = makeStringInfo();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LogLocalCopyExecution logs that the copy will be done locally for
|
||||
* the given shard.
|
||||
*/
|
||||
static void
|
||||
LogLocalCopyExecution(uint64 shardId)
|
||||
{
|
||||
if (!(LogRemoteCommands || LogLocalCommands))
|
||||
{
|
||||
return;
|
||||
}
|
||||
ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CopyGetPlacementConnection assigns a connection to the given placement. If
|
||||
* a connection has already been assigned the placement in the current transaction
|
||||
|
|
|
@ -553,6 +553,10 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan
|
|||
PlannedStmt *
|
||||
GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
|
||||
{
|
||||
if (distributedPlan->workerJob == NULL)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
|
||||
LocalPlannedStatement *localPlannedStatement = NULL;
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_router_planner.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/recursive_planning.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
|
@ -135,15 +136,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
bool hasReturning = distributedPlan->hasReturning;
|
||||
HTAB *shardStateHash = NULL;
|
||||
|
||||
/*
|
||||
* INSERT .. SELECT via coordinator consists of two steps, a SELECT is
|
||||
* followd by a COPY. If the SELECT is executed locally, then the COPY
|
||||
* would fail since Citus currently doesn't know how to handle COPY
|
||||
* locally. So, to prevent the command fail, we simply disable local
|
||||
* execution.
|
||||
*/
|
||||
DisableLocalExecution();
|
||||
|
||||
/* select query to execute */
|
||||
Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery);
|
||||
|
||||
|
@ -198,7 +190,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
GetDistributedPlan((CustomScan *) selectPlan->planTree);
|
||||
Job *distSelectJob = distSelectPlan->workerJob;
|
||||
List *distSelectTaskList = distSelectJob->taskList;
|
||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||
bool randomAccess = true;
|
||||
bool interTransactions = false;
|
||||
bool binaryFormat =
|
||||
|
@ -280,11 +271,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
scanState->tuplestorestate =
|
||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||
|
||||
uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE,
|
||||
taskList,
|
||||
tupleDescriptor,
|
||||
scanState->tuplestorestate,
|
||||
hasReturning);
|
||||
uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState,
|
||||
taskList,
|
||||
ROW_MODIFY_COMMUTATIVE,
|
||||
hasReturning);
|
||||
|
||||
executorState->es_processed = rowsInserted;
|
||||
}
|
||||
|
@ -335,17 +325,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
|
||||
if (prunedTaskList != NIL)
|
||||
{
|
||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||
bool randomAccess = true;
|
||||
bool interTransactions = false;
|
||||
|
||||
Assert(scanState->tuplestorestate == NULL);
|
||||
scanState->tuplestorestate =
|
||||
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
|
||||
|
||||
ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList,
|
||||
tupleDescriptor, scanState->tuplestorestate,
|
||||
hasReturning);
|
||||
ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList,
|
||||
ROW_MODIFY_COMMUTATIVE,
|
||||
hasReturning);
|
||||
|
||||
if (SortReturning && hasReturning)
|
||||
{
|
||||
|
|
|
@ -231,6 +231,48 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks
|
||||
* if local execution can be used and executes them.
|
||||
*/
|
||||
uint64
|
||||
ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
|
||||
List *taskList, RowModifyLevel rowModifyLevel, bool
|
||||
hasReturning)
|
||||
{
|
||||
uint64 processedRows = 0;
|
||||
List *localTaskList = NIL;
|
||||
List *remoteTaskList = NIL;
|
||||
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
|
||||
|
||||
if (ShouldExecuteTasksLocally(taskList))
|
||||
{
|
||||
bool readOnlyPlan = false;
|
||||
|
||||
/* set local (if any) & remote tasks */
|
||||
ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList,
|
||||
&remoteTaskList);
|
||||
processedRows += ExecuteLocalTaskList(scanState, localTaskList);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* all tasks should be executed via remote connections */
|
||||
remoteTaskList = taskList;
|
||||
}
|
||||
|
||||
/* execute remote tasks if any */
|
||||
if (list_length(remoteTaskList) > 0)
|
||||
{
|
||||
processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList,
|
||||
tupleDescriptor,
|
||||
scanState->tuplestorestate,
|
||||
hasReturning);
|
||||
}
|
||||
|
||||
return processedRows;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExtractParametersForLocalExecution extracts parameter types and values
|
||||
* from the given ParamListInfo structure, and fills parameter type and
|
||||
|
|
|
@ -335,7 +335,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList)
|
|||
return true;
|
||||
}
|
||||
|
||||
Oid shardOid = GetShardOid(relationShard->relationId, relationShard->shardId);
|
||||
Oid shardOid = GetShardLocalTableOid(relationShard->relationId,
|
||||
relationShard->shardId);
|
||||
|
||||
newRte->relid = shardOid;
|
||||
|
||||
|
|
|
@ -16,11 +16,11 @@
|
|||
#include "distributed/shard_utils.h"
|
||||
|
||||
/*
|
||||
* GetShardOid returns the oid of the shard from the given distributed relation
|
||||
* GetShardLocalTableOid returns the oid of the shard from the given distributed relation
|
||||
* with the shardid.
|
||||
*/
|
||||
Oid
|
||||
GetShardOid(Oid distRelId, uint64 shardId)
|
||||
GetShardLocalTableOid(Oid distRelId, uint64 shardId)
|
||||
{
|
||||
char *relationName = get_rel_name(distRelId);
|
||||
AppendShardIdToName(&relationName, shardId);
|
||||
|
|
|
@ -130,6 +130,9 @@ typedef struct CitusCopyDestReceiver
|
|||
/* useful for tracking multi shard accesses */
|
||||
bool multiShardCopy;
|
||||
|
||||
/* if true, should copy to local placements in the current session */
|
||||
bool shouldUseLocalCopy;
|
||||
|
||||
/* copy into intermediate result */
|
||||
char *intermediateResultIdPrefix;
|
||||
} CitusCopyDestReceiver;
|
||||
|
@ -154,6 +157,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
|||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||
extern void EndRemoteCopy(int64 shardId, List *connectionList);
|
||||
extern List * CreateRangeTable(Relation rel, AclMode requiredAccess);
|
||||
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||
const char *queryString);
|
||||
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||
|
|
|
@ -21,6 +21,9 @@ extern bool TransactionAccessedLocalPlacement;
|
|||
extern bool TransactionConnectedToLocalGroup;
|
||||
|
||||
/* extern function declarations */
|
||||
extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState,
|
||||
List *taskList, RowModifyLevel
|
||||
rowModifyLevel, bool hasReturning);
|
||||
extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList);
|
||||
extern void ExecuteLocalUtilityTaskList(List *localTaskList);
|
||||
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
|
||||
#ifndef LOCAL_MULTI_COPY
|
||||
#define LOCAL_MULTI_COPY
|
||||
|
||||
/*
|
||||
* LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed.
|
||||
* There will be one buffer for each local placement, when the buffer size
|
||||
* exceeds this threshold, it will be flushed.
|
||||
*/
|
||||
#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024)
|
||||
|
||||
extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
|
||||
int64
|
||||
shardId,
|
||||
CopyOutState localCopyOutState);
|
||||
extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
|
||||
CopyOutState localCopyOutState);
|
||||
|
||||
#endif /* LOCAL_MULTI_COPY */
|
|
@ -13,6 +13,6 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
extern Oid GetShardOid(Oid distRelId, uint64 shardId);
|
||||
extern Oid GetShardLocalTableOid(Oid distRelId, uint64 shardId);
|
||||
|
||||
#endif /* SHARD_UTILS_H */
|
||||
|
|
|
@ -1484,4 +1484,4 @@
|
|||
14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against
|
||||
14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after
|
||||
14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
||||
14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
||||
14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro
|
|
@ -37,6 +37,8 @@ SET client_min_messages TO LOG;
|
|||
SET citus.log_local_commands TO ON;
|
||||
-- INSERT..SELECT with COPY under the covers
|
||||
INSERT INTO test SELECT s,s FROM generate_series(2,100) s;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
-- router queries execute locally
|
||||
INSERT INTO test VALUES (1, 1);
|
||||
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1)
|
||||
|
|
|
@ -0,0 +1,494 @@
|
|||
CREATE SCHEMA local_shard_copy;
|
||||
SET search_path TO local_shard_copy;
|
||||
SET client_min_messages TO DEBUG;
|
||||
SET citus.next_shard_id TO 1570000;
|
||||
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||
DEBUG: schema "public" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
DEBUG: extension "plpgsql" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
DEBUG: schema "citus_mx_test_schema" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping
|
||||
DETAIL: NOTICE from localhost:xxxxx
|
||||
NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx
|
||||
NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx
|
||||
NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx
|
||||
NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx
|
||||
master_add_node
|
||||
---------------------------------------------------------------------
|
||||
32
|
||||
(1 row)
|
||||
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE reference_table (key int PRIMARY KEY);
|
||||
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
|
||||
DEBUG: building index "reference_table_pkey" on table "reference_table" serially
|
||||
SELECT create_reference_table('reference_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10));
|
||||
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
|
||||
DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially
|
||||
SELECT create_distributed_table('distributed_table','key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40);
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
INSERT INTO reference_table SELECT * FROM generate_series(1, 10);
|
||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
CREATE TABLE local_table (key int PRIMARY KEY);
|
||||
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table"
|
||||
DEBUG: building index "local_table_pkey" on table "local_table" serially
|
||||
INSERT INTO local_table SELECT * from generate_series(1, 10);
|
||||
-- partitioned table
|
||||
CREATE TABLE collections_list (
|
||||
key bigserial,
|
||||
collection_id integer
|
||||
) PARTITION BY LIST (collection_id );
|
||||
DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key"
|
||||
SELECT create_distributed_table('collections_list', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE collections_list_0
|
||||
PARTITION OF collections_list (key, collection_id)
|
||||
FOR VALUES IN ( 0 );
|
||||
CREATE TABLE collections_list_1
|
||||
PARTITION OF collections_list (key, collection_id)
|
||||
FOR VALUES IN ( 1 );
|
||||
-- connection worker and get ready for the tests
|
||||
\c - - - :worker_1_port
|
||||
SET search_path TO local_shard_copy;
|
||||
SET citus.log_local_commands TO ON;
|
||||
-- returns true of the distribution key filter
|
||||
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
|
||||
-- placement which is local to this not
|
||||
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
|
||||
|
||||
DECLARE shard_is_local BOOLEAN := FALSE;
|
||||
|
||||
BEGIN
|
||||
|
||||
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)),
|
||||
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
|
||||
SELECT
|
||||
true INTO shard_is_local
|
||||
FROM
|
||||
local_shard_ids
|
||||
WHERE
|
||||
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
|
||||
|
||||
IF shard_is_local IS NULL THEN
|
||||
shard_is_local = FALSE;
|
||||
END IF;
|
||||
|
||||
RETURN shard_is_local;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
-- pick some example values that reside on the shards locally and remote
|
||||
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||
-- we'll use these values in the tests
|
||||
SELECT shard_of_distribution_column_is_local(1);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT shard_of_distribution_column_is_local(6);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT shard_of_distribution_column_is_local(500);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT shard_of_distribution_column_is_local(701);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- distribution key values of 11 and 12 are REMOTE to shards
|
||||
SELECT shard_of_distribution_column_is_local(11);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
SELECT shard_of_distribution_column_is_local(12);
|
||||
shard_of_distribution_column_is_local
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
21
|
||||
(1 row)
|
||||
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
26
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
21
|
||||
(1 row)
|
||||
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
-- verify the put ages.
|
||||
SELECT * FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
key | age
|
||||
---------------------------------------------------------------------
|
||||
20 | 20
|
||||
24 | 24
|
||||
25 | 25
|
||||
26 | 26
|
||||
31 | 31
|
||||
33 | 33
|
||||
35 | 35
|
||||
1 | 100
|
||||
5 | 500
|
||||
21 | 21
|
||||
28 | 28
|
||||
34 | 34
|
||||
38 | 38
|
||||
39 | 39
|
||||
36 | 36
|
||||
37 | 37
|
||||
40 | 40
|
||||
3 | 300
|
||||
4 | 400
|
||||
22 | 22
|
||||
23 | 23
|
||||
27 | 27
|
||||
29 | 29
|
||||
30 | 30
|
||||
32 | 32
|
||||
2 | 200
|
||||
(26 rows)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
21
|
||||
(1 row)
|
||||
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
26
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT age FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
age
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT count(*) FROM collections_list;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- the local placements should be executed locally
|
||||
COPY collections_list FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY collections_list, line 1: "1, 0"
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM collections_list;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT age FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
age
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
21
|
||||
(1 row)
|
||||
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
26
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the copy should be locally executed.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
ROLLBACK;
|
||||
-- Since we are not in a transaction, the copy should not be locally executed.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the copy should be locally executed. But
|
||||
-- we are putting duplicate key, so it should error.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1, 100"
|
||||
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001"
|
||||
DETAIL: Key (key)=(1) already exists.
|
||||
CONTEXT: COPY distributed_table_1570001, line 1
|
||||
ROLLBACK;
|
||||
TRUNCATE distributed_table;
|
||||
BEGIN;
|
||||
-- insert a lot of data ( around 8MB),
|
||||
-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB)
|
||||
INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000);
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
ROLLBACK;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check"
|
||||
DETAIL: Failing row contains (1, 9).
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the execution will be local, however we are putting invalid age.
|
||||
-- The constaints should give an error
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1,9"
|
||||
ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check"
|
||||
DETAIL: Failing row contains (1, 9).
|
||||
CONTEXT: COPY distributed_table_1570001, line 1
|
||||
ROLLBACK;
|
||||
TRUNCATE distributed_table;
|
||||
-- different delimiters
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- initial size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
COPY distributed_table FROM STDIN WITH delimiter '|';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1|10"
|
||||
-- new size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- initial size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
COPY distributed_table FROM STDIN WITH delimiter '[';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1[10"
|
||||
-- new size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
-- multiple local copies
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "10,15"
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "100,15"
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 2: "200,20"
|
||||
ROLLBACK;
|
||||
-- local copy followed by local copy should see the changes
|
||||
-- and error since it is a duplicate primary key.
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1,16"
|
||||
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001"
|
||||
DETAIL: Key (key)=(1) already exists.
|
||||
CONTEXT: COPY distributed_table_1570001, line 1
|
||||
ROLLBACK;
|
||||
-- local copy followed by local copy should see the changes
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY distributed_table, line 1: "1,15"
|
||||
-- select should see the change
|
||||
SELECT key FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
ROLLBACK;
|
||||
\c - - - :master_port
|
||||
SET search_path TO local_shard_copy;
|
||||
SET citus.log_local_commands TO ON;
|
||||
TRUNCATE TABLE reference_table;
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE
|
||||
TRUNCATE TABLE local_table;
|
||||
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_local_execution = 'on';
|
||||
BEGIN;
|
||||
-- copy should be executed locally
|
||||
COPY reference_table FROM STDIN;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
CONTEXT: COPY reference_table, line 1: "1"
|
||||
ROLLBACK;
|
||||
SET citus.enable_local_execution = 'off';
|
||||
BEGIN;
|
||||
-- copy should not be executed locally as citus.enable_local_execution = off
|
||||
COPY reference_table FROM STDIN;
|
||||
ROLLBACK;
|
||||
SET citus.enable_local_execution = 'on';
|
||||
SET client_min_messages TO ERROR;
|
||||
SET search_path TO public;
|
||||
DROP SCHEMA local_shard_copy CASCADE;
|
|
@ -276,6 +276,8 @@ RETURNING *;
|
|||
-- that's why it is disallowed to use local execution even if the SELECT
|
||||
-- can be executed locally
|
||||
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
|
||||
NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
|
||||
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING
|
||||
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
|
||||
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
|
||||
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
|
||||
|
@ -507,28 +509,7 @@ NOTICE: truncate cascades to table "second_distributed_table"
|
|||
ROLLBACK;
|
||||
-- load some data so that foreign keys won't complain with the next tests
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
|
||||
-- a very similar set of operation, but this time use
|
||||
-- COPY as the first command
|
||||
BEGIN;
|
||||
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
|
||||
-- this could go through local execution, but doesn't because we've already
|
||||
-- done distributed execution
|
||||
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||
key | value | age
|
||||
---------------------------------------------------------------------
|
||||
500 | 500 | 25
|
||||
(1 row)
|
||||
|
||||
-- utility commands could still use distributed execution
|
||||
TRUNCATE distributed_table CASCADE;
|
||||
NOTICE: truncate cascades to table "second_distributed_table"
|
||||
-- ensure that TRUNCATE made it
|
||||
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||
key | value | age
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
ROLLBACK;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
-- show that cascading foreign keys just works fine with local execution
|
||||
BEGIN;
|
||||
INSERT INTO reference_table VALUES (701);
|
||||
|
@ -618,25 +599,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- even no need to supply any data
|
||||
COPY distributed_table FROM STDIN WITH CSV;
|
||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
||||
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
|
||||
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
|
||||
ROLLBACK;
|
||||
-- a local query is followed by a command that cannot be executed locally
|
||||
BEGIN;
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
|
||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
||||
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
|
||||
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
|
||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
ROLLBACK;
|
||||
-- a local query is followed by a command that cannot be executed locally
|
||||
BEGIN;
|
||||
|
@ -1349,7 +1313,10 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx"
|
|||
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
|
||||
NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
-- show that both local, and mixed local-distributed executions
|
||||
-- calculate rows processed correctly
|
||||
BEGIN;
|
||||
|
|
|
@ -35,6 +35,7 @@ SELECT create_reference_table('ref_table');
|
|||
INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4');
|
||||
INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||
INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6');
|
||||
NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text)
|
||||
-- prevent PG 11 - PG 12 outputs to diverge
|
||||
-- and have a lot more CTEs recursively planned for the
|
||||
-- sake of increasing the test coverage
|
||||
|
@ -270,6 +271,7 @@ key
|
|||
FROM a JOIN ref_table USING (key)
|
||||
GROUP BY key
|
||||
HAVING (max(ref_table.value) <= (SELECT value FROM a));
|
||||
NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1))
|
||||
count | key
|
||||
---------------------------------------------------------------------
|
||||
1 | 6
|
||||
|
@ -328,7 +330,9 @@ DEBUG: Subplan XXX_2 will be written to local file
|
|||
NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2
|
||||
NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key))
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
4 | 4
|
||||
|
@ -767,6 +771,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re
|
|||
DEBUG: Subplan XXX_1 will be written to local file
|
||||
DEBUG: Subplan XXX_2 will be written to local file
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx
|
||||
key | value
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -951,6 +951,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio
|
|||
(1 row)
|
||||
|
||||
INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS
|
||||
DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index;
|
||||
EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0);
|
||||
|
@ -1438,6 +1439,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio
|
|||
(1 row)
|
||||
|
||||
INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
PREPARE router_with_param_and_func_on_non_dist_key(int) AS
|
||||
DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index;
|
||||
EXECUTE router_with_param_and_func_on_non_dist_key(0);
|
||||
|
|
|
@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i
|
|||
(1 row)
|
||||
|
||||
insert into target_table SELECT a FROM source_table LIMIT 10;
|
||||
ERROR: cannot execute command because a local execution has accessed a placement in the transaction
|
||||
DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally
|
||||
HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;"
|
||||
NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
ROLLBACK;
|
||||
\c - - - :master_port
|
||||
SET search_path TO multi_mx_insert_select_repartition;
|
||||
|
|
|
@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
|
|||
SELECT count(*) FROM pg_dist_transaction;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
4
|
||||
2
|
||||
(1 row)
|
||||
|
||||
SELECT recover_prepared_transactions();
|
||||
|
|
|
@ -118,6 +118,8 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i;
|
|||
SET search_path TO 'truncate_from_workers';
|
||||
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
|
||||
BEGIN;
|
||||
-- we can enable local execution when truncate can be executed locally.
|
||||
SET citus.enable_local_execution = 'off';
|
||||
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
|
||||
SELECT count(*) FROM on_update_fkey_table;
|
||||
count
|
||||
|
|
|
@ -499,11 +499,13 @@ SELECT create_reference_table('replicate_reference_table_copy');
|
|||
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_local_execution = 'off';
|
||||
BEGIN;
|
||||
COPY replicate_reference_table_copy FROM STDIN;
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
ERROR: cannot open new connections after the first modification command within a transaction
|
||||
ROLLBACK;
|
||||
RESET citus.enable_local_execution;
|
||||
DROP TABLE replicate_reference_table_copy;
|
||||
-- test executing DDL command then adding a new node in a transaction
|
||||
CREATE TABLE replicate_reference_table_ddl(column1 int);
|
||||
|
|
|
@ -17,6 +17,7 @@ SELECT create_reference_table('squares');
|
|||
(1 row)
|
||||
|
||||
INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i;
|
||||
NOTICE: executing the copy locally for shard xxxxx
|
||||
-- should be executed locally
|
||||
SELECT count(*) FROM squares;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares
|
||||
|
|
|
@ -39,7 +39,7 @@ test: multi_mx_metadata
|
|||
test: master_evaluation master_evaluation_modify master_evaluation_select
|
||||
test: multi_mx_call
|
||||
test: multi_mx_function_call_delegation
|
||||
test: multi_mx_modifications local_shard_execution
|
||||
test: multi_mx_modifications local_shard_execution local_shard_copy
|
||||
test: multi_mx_transaction_recovery
|
||||
test: multi_mx_modifying_xacts
|
||||
test: multi_mx_explain
|
||||
|
|
|
@ -0,0 +1,346 @@
|
|||
CREATE SCHEMA local_shard_copy;
|
||||
SET search_path TO local_shard_copy;
|
||||
|
||||
SET client_min_messages TO DEBUG;
|
||||
SET citus.next_shard_id TO 1570000;
|
||||
|
||||
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
|
||||
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
|
||||
CREATE TABLE reference_table (key int PRIMARY KEY);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
||||
CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10));
|
||||
SELECT create_distributed_table('distributed_table','key');
|
||||
|
||||
INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40);
|
||||
INSERT INTO reference_table SELECT * FROM generate_series(1, 10);
|
||||
|
||||
CREATE TABLE local_table (key int PRIMARY KEY);
|
||||
INSERT INTO local_table SELECT * from generate_series(1, 10);
|
||||
|
||||
-- partitioned table
|
||||
CREATE TABLE collections_list (
|
||||
key bigserial,
|
||||
collection_id integer
|
||||
) PARTITION BY LIST (collection_id );
|
||||
|
||||
SELECT create_distributed_table('collections_list', 'key');
|
||||
|
||||
CREATE TABLE collections_list_0
|
||||
PARTITION OF collections_list (key, collection_id)
|
||||
FOR VALUES IN ( 0 );
|
||||
|
||||
CREATE TABLE collections_list_1
|
||||
PARTITION OF collections_list (key, collection_id)
|
||||
FOR VALUES IN ( 1 );
|
||||
|
||||
|
||||
-- connection worker and get ready for the tests
|
||||
\c - - - :worker_1_port
|
||||
|
||||
SET search_path TO local_shard_copy;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
-- returns true of the distribution key filter
|
||||
-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard
|
||||
-- placement which is local to this not
|
||||
CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$
|
||||
|
||||
DECLARE shard_is_local BOOLEAN := FALSE;
|
||||
|
||||
BEGIN
|
||||
|
||||
WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)),
|
||||
all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group))
|
||||
SELECT
|
||||
true INTO shard_is_local
|
||||
FROM
|
||||
local_shard_ids
|
||||
WHERE
|
||||
get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node);
|
||||
|
||||
IF shard_is_local IS NULL THEN
|
||||
shard_is_local = FALSE;
|
||||
END IF;
|
||||
|
||||
RETURN shard_is_local;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
-- pick some example values that reside on the shards locally and remote
|
||||
|
||||
-- distribution key values of 1,6, 500 and 701 are LOCAL to shards,
|
||||
-- we'll use these values in the tests
|
||||
SELECT shard_of_distribution_column_is_local(1);
|
||||
SELECT shard_of_distribution_column_is_local(6);
|
||||
SELECT shard_of_distribution_column_is_local(500);
|
||||
SELECT shard_of_distribution_column_is_local(701);
|
||||
|
||||
-- distribution key values of 11 and 12 are REMOTE to shards
|
||||
SELECT shard_of_distribution_column_is_local(11);
|
||||
SELECT shard_of_distribution_column_is_local(12);
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
-- verify the put ages.
|
||||
SELECT * FROM distributed_table;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT age FROM distributed_table WHERE key = 1;
|
||||
|
||||
SELECT count(*) FROM collections_list;
|
||||
-- the local placements should be executed locally
|
||||
COPY collections_list FROM STDIN WITH delimiter ',';
|
||||
1, 0
|
||||
2, 0
|
||||
3, 0
|
||||
4, 1
|
||||
5, 1
|
||||
\.
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM collections_list;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT age FROM distributed_table WHERE key = 1;
|
||||
|
||||
SELECT count(*) FROM distributed_table;
|
||||
-- the local placements should be executed locally
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
|
||||
|
||||
-- verify that the copy is successful.
|
||||
SELECT count(*) FROM distributed_table;
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the copy should be locally executed.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
-- Since we are not in a transaction, the copy should not be locally executed.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the copy should be locally executed. But
|
||||
-- we are putting duplicate key, so it should error.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 100
|
||||
2, 200
|
||||
3, 300
|
||||
4, 400
|
||||
5, 500
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
TRUNCATE distributed_table;
|
||||
|
||||
BEGIN;
|
||||
|
||||
-- insert a lot of data ( around 8MB),
|
||||
-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB)
|
||||
INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000);
|
||||
|
||||
ROLLBACK;
|
||||
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1, 9
|
||||
\.
|
||||
|
||||
BEGIN;
|
||||
-- Since we are in a transaction, the execution will be local, however we are putting invalid age.
|
||||
-- The constaints should give an error
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1,9
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
TRUNCATE distributed_table;
|
||||
|
||||
|
||||
-- different delimiters
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
-- initial size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
COPY distributed_table FROM STDIN WITH delimiter '|';
|
||||
1|10
|
||||
2|30
|
||||
3|40
|
||||
\.
|
||||
-- new size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
ROLLBACK;
|
||||
|
||||
BEGIN;
|
||||
-- run select with local execution
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
-- initial size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
COPY distributed_table FROM STDIN WITH delimiter '[';
|
||||
1[10
|
||||
2[30
|
||||
3[40
|
||||
\.
|
||||
-- new size
|
||||
SELECT count(*) FROM distributed_table;
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
-- multiple local copies
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1,15
|
||||
2,20
|
||||
3,30
|
||||
\.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
10,15
|
||||
20,20
|
||||
30,30
|
||||
\.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
100,15
|
||||
200,20
|
||||
300,30
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
-- local copy followed by local copy should see the changes
|
||||
-- and error since it is a duplicate primary key.
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1,15
|
||||
\.
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1,16
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
|
||||
-- local copy followed by local copy should see the changes
|
||||
BEGIN;
|
||||
COPY distributed_table FROM STDIN WITH delimiter ',';
|
||||
1,15
|
||||
\.
|
||||
-- select should see the change
|
||||
SELECT key FROM distributed_table WHERE key = 1;
|
||||
ROLLBACK;
|
||||
|
||||
\c - - - :master_port
|
||||
|
||||
SET search_path TO local_shard_copy;
|
||||
SET citus.log_local_commands TO ON;
|
||||
|
||||
TRUNCATE TABLE reference_table;
|
||||
TRUNCATE TABLE local_table;
|
||||
|
||||
SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key;
|
||||
|
||||
SET citus.enable_local_execution = 'on';
|
||||
|
||||
BEGIN;
|
||||
-- copy should be executed locally
|
||||
COPY reference_table FROM STDIN;
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
SET citus.enable_local_execution = 'off';
|
||||
|
||||
BEGIN;
|
||||
-- copy should not be executed locally as citus.enable_local_execution = off
|
||||
COPY reference_table FROM STDIN;
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
\.
|
||||
ROLLBACK;
|
||||
|
||||
SET citus.enable_local_execution = 'on';
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
SET search_path TO public;
|
||||
DROP SCHEMA local_shard_copy CASCADE;
|
|
@ -305,22 +305,6 @@ ROLLBACK;
|
|||
-- load some data so that foreign keys won't complain with the next tests
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i;
|
||||
|
||||
-- a very similar set of operation, but this time use
|
||||
-- COPY as the first command
|
||||
BEGIN;
|
||||
INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i;
|
||||
|
||||
-- this could go through local execution, but doesn't because we've already
|
||||
-- done distributed execution
|
||||
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||
|
||||
-- utility commands could still use distributed execution
|
||||
TRUNCATE distributed_table CASCADE;
|
||||
|
||||
-- ensure that TRUNCATE made it
|
||||
SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3;
|
||||
ROLLBACK;
|
||||
|
||||
-- show that cascading foreign keys just works fine with local execution
|
||||
BEGIN;
|
||||
INSERT INTO reference_table VALUES (701);
|
||||
|
@ -356,15 +340,7 @@ ROLLBACK;
|
|||
BEGIN;
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
|
||||
-- even no need to supply any data
|
||||
COPY distributed_table FROM STDIN WITH CSV;
|
||||
ROLLBACK;
|
||||
|
||||
-- a local query is followed by a command that cannot be executed locally
|
||||
BEGIN;
|
||||
SELECT count(*) FROM distributed_table WHERE key = 1;
|
||||
|
||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i;
|
||||
INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i;
|
||||
ROLLBACK;
|
||||
|
||||
-- a local query is followed by a command that cannot be executed locally
|
||||
|
|
|
@ -85,6 +85,8 @@ SET search_path TO 'truncate_from_workers';
|
|||
|
||||
-- make sure that DMLs-SELECTs works along with TRUNCATE worker fine
|
||||
BEGIN;
|
||||
-- we can enable local execution when truncate can be executed locally.
|
||||
SET citus.enable_local_execution = 'off';
|
||||
INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i;
|
||||
SELECT count(*) FROM on_update_fkey_table;
|
||||
TRUNCATE on_update_fkey_table;
|
||||
|
|
|
@ -323,6 +323,7 @@ DROP TABLE replicate_reference_table_insert;
|
|||
CREATE TABLE replicate_reference_table_copy(column1 int);
|
||||
SELECT create_reference_table('replicate_reference_table_copy');
|
||||
|
||||
SET citus.enable_local_execution = 'off';
|
||||
BEGIN;
|
||||
COPY replicate_reference_table_copy FROM STDIN;
|
||||
1
|
||||
|
@ -334,6 +335,8 @@ COPY replicate_reference_table_copy FROM STDIN;
|
|||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
ROLLBACK;
|
||||
|
||||
RESET citus.enable_local_execution;
|
||||
|
||||
DROP TABLE replicate_reference_table_copy;
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue