apply review items

log shard id in local copy and add more comments
pull/3557/head
SaitTalhaNisanci 2020-03-10 23:47:27 +03:00
parent c22068e75a
commit 42cfc4c0e9
5 changed files with 183 additions and 140 deletions

View File

@ -35,18 +35,9 @@
#include "distributed/local_multi_copy.h" #include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h" #include "distributed/shard_utils.h"
/*
* LOCAL_COPY_BUFFER_SIZE is buffer size for local copy.
* There will be one buffer for each local placement, therefore
* the maximum amount of memory that might be alocated is
* LOCAL_COPY_BUFFER_SIZE * #local_placement
*/
#define LOCAL_COPY_BUFFER_SIZE (1 * 512 * 1024)
static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread);
static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
bool isBinary); CopyOutState localCopyOutState);
static bool ShouldSendCopyNow(StringInfo buffer); static bool ShouldSendCopyNow(StringInfo buffer);
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
@ -54,71 +45,83 @@ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
/* /*
* localCopyBuffer is used in copy callback to return the copied rows. * LocalCopyBuffer is used in copy callback to return the copied rows.
* The reason this is a global variable is that we cannot pass an additional * The reason this is a global variable is that we cannot pass an additional
* argument to the copy callback. * argument to the copy callback.
*/ */
StringInfo localCopyBuffer; static StringInfo LocalCopyBuffer;
/* /*
* ProcessLocalCopy adds the given slot and does a local copy if * WriteTupleToLocalShard adds the given slot and does a local copy if
* this is the end of copy, or the buffer size exceeds the threshold. * this is the end of copy, or the buffer size exceeds the threshold.
*/ */
void void
ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 shardId, WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64
StringInfo buffer, bool isEndOfCopy) shardId,
CopyOutState localCopyOutState)
{ {
/*
* Here we save the previous buffer, and put the local shard's buffer
* into copyOutState. The motivation is to use the existing logic to
* serialize a row slot into buffer.
*/
StringInfo previousBuffer = copyDest->copyOutState->fe_msgbuf;
copyDest->copyOutState->fe_msgbuf = buffer;
/* since we are doing a local copy, the following statements should use local execution to see the changes */ /* since we are doing a local copy, the following statements should use local execution to see the changes */
TransactionAccessedLocalPlacement = true; TransactionAccessedLocalPlacement = true;
bool isBinaryCopy = copyDest->copyOutState->binary; bool isBinaryCopy = localCopyOutState->binary;
AddSlotToBuffer(slot, copyDest, isBinaryCopy); if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy))
{
AppendCopyBinaryHeaders(localCopyOutState);
}
if (isEndOfCopy || ShouldSendCopyNow(buffer)) AddSlotToBuffer(slot, copyDest, localCopyOutState);
if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf))
{ {
if (isBinaryCopy) if (isBinaryCopy)
{ {
AppendCopyBinaryFooters(copyDest->copyOutState); /*
* We're going to flush the buffer to disk by effectively doing a full COPY command.
* Hence we also need to add footers to the current buffer.
*/
AppendCopyBinaryFooters(localCopyOutState);
} }
bool isEndOfCopy = false;
DoLocalCopy(buffer, copyDest->distributedRelationId, shardId, DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId,
shardId,
copyDest->copyStatement, isEndOfCopy); copyDest->copyStatement, isEndOfCopy);
} }
copyDest->copyOutState->fe_msgbuf = previousBuffer;
} }
/* /*
* AddSlotToBuffer serializes the given slot and adds it to the buffer in copyDest. * FinishLocalCopyToShard finishes local copy for the given shard with the shard id.
* If the copy format is binary, it adds binary headers as well. */
void
FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
CopyOutState localCopyOutState)
{
bool isBinaryCopy = localCopyOutState->binary;
if (isBinaryCopy)
{
AppendCopyBinaryFooters(localCopyOutState);
}
bool isEndOfCopy = true;
DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId,
copyDest->copyStatement, isEndOfCopy);
}
/*
* AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState.
*/ */
static void static void
AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary) AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState
localCopyOutState)
{ {
if (ShouldAddBinaryHeaders(copyDest->copyOutState->fe_msgbuf, isBinary)) Datum *columnValues = slot->tts_values;
{ bool *columnNulls = slot->tts_isnull;
AppendCopyBinaryHeaders(copyDest->copyOutState); FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
} CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths;
if (slot != NULL) AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
{ localCopyOutState, columnOutputFunctions,
Datum *columnValues = slot->tts_values; columnCoercionPaths);
bool *columnNulls = slot->tts_isnull;
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths;
AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor,
copyDest->copyOutState, columnOutputFunctions,
columnCoercionPaths);
}
} }
@ -129,19 +132,25 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBi
static bool static bool
ShouldSendCopyNow(StringInfo buffer) ShouldSendCopyNow(StringInfo buffer)
{ {
return buffer->len > LOCAL_COPY_BUFFER_SIZE; return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD;
} }
/* /*
* DoLocalCopy finds the shard table from the distributed relation id, and copies the given * DoLocalCopy finds the shard table from the distributed relation id, and copies the given
* buffer into the shard. * buffer into the shard.
* CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though
* it was reading from stdin. It then parses the tuples and writes them to the shardOid table.
*/ */
static void static void
DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement,
bool isEndOfCopy) bool isEndOfCopy)
{ {
localCopyBuffer = buffer; /*
* Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it.
* We cannot pass additional arguments to ReadFromLocalBufferCallback.
*/
LocalCopyBuffer = buffer;
Oid shardOid = GetShardLocalTableOid(relationId, shardId); Oid shardOid = GetShardLocalTableOid(relationId, shardId);
Relation shard = heap_open(shardOid, RowExclusiveLock); Relation shard = heap_open(shardOid, RowExclusiveLock);
@ -189,15 +198,15 @@ static int
ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread)
{ {
int bytesread = 0; int bytesread = 0;
int avail = localCopyBuffer->len - localCopyBuffer->cursor; int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor;
int bytesToRead = Min(avail, maxread); int bytesToRead = Min(avail, maxread);
if (bytesToRead > 0) if (bytesToRead > 0)
{ {
memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf),
&localCopyBuffer->data[localCopyBuffer->cursor], bytesToRead); &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead);
} }
bytesread += bytesToRead; bytesread += bytesToRead;
localCopyBuffer->cursor += bytesToRead; LocalCopyBuffer->cursor += bytesToRead;
return bytesread; return bytesread;
} }

View File

@ -164,6 +164,7 @@ struct CopyPlacementState
/* State of shard to which the placement belongs to. */ /* State of shard to which the placement belongs to. */
CopyShardState *shardState; CopyShardState *shardState;
/* node group ID of the placement */
int32 groupId; int32 groupId;
/* /*
@ -183,7 +184,7 @@ struct CopyShardState
uint64 shardId; uint64 shardId;
/* used for doing local copy */ /* used for doing local copy */
StringInfo localCopyBuffer; CopyOutState copyOutState;
/* containsLocalPlacement is true if we have a local placement for the shard id of this state */ /* containsLocalPlacement is true if we have a local placement for the shard id of this state */
bool containsLocalPlacement; bool containsLocalPlacement;
@ -242,15 +243,16 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
MultiConnection *connection); MultiConnection *connection);
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, HTAB *connectionStateHash, bool stopOnFailure,
bool *found, bool shouldUseLocalCopy, MemoryContext bool *found, bool shouldUseLocalCopy, CopyOutState
context); copyOutState);
static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement,
bool stopOnFailure); bool stopOnFailure);
static List * ConnectionStateList(HTAB *connectionStateHash); static List * ConnectionStateList(HTAB *connectionStateHash);
static void InitializeCopyShardState(CopyShardState *shardState, static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure, bool uint64 shardId, bool stopOnFailure, bool
canUseLocalCopy, MemoryContext context); canUseLocalCopy,
CopyOutState copyOutState);
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement, CopyStmt *copyStatement,
CopyOutState copyOutState); CopyOutState copyOutState);
@ -288,6 +290,7 @@ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
static bool ContainsLocalPlacement(int64 shardId); static bool ContainsLocalPlacement(int64 shardId);
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
static bool ShouldExecuteCopyLocally(void); static bool ShouldExecuteCopyLocally(void);
static void LogLocalCopyExecution(uint64 shardId); static void LogLocalCopyExecution(uint64 shardId);
@ -2285,7 +2288,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
stopOnFailure, stopOnFailure,
&cachedShardStateFound, &cachedShardStateFound,
copyDest->shouldUseLocalCopy, copyDest->shouldUseLocalCopy,
copyDest->memoryContext); copyDest->copyOutState);
if (!cachedShardStateFound) if (!cachedShardStateFound)
{ {
firstTupleInShard = true; firstTupleInShard = true;
@ -2308,9 +2311,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
{ {
bool isEndOfCopy = false; WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState);
ProcessLocalCopy(slot, copyDest, shardId, shardState->localCopyBuffer,
isEndOfCopy);
} }
@ -2535,13 +2536,13 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest)
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
CopyShardState *copyShardState; CopyShardState *copyShardState;
bool isEndOfCopy = true;
foreach_htab(copyShardState, &status, shardStateHash) foreach_htab(copyShardState, &status, shardStateHash)
{ {
if (copyShardState->localCopyBuffer->len > 0) if (copyShardState->copyOutState != NULL &&
copyShardState->copyOutState->fe_msgbuf->len > 0)
{ {
ProcessLocalCopy(NULL, copyDest, copyShardState->shardId, FinishLocalCopyToShard(copyDest, copyShardState->shardId,
copyShardState->localCopyBuffer, isEndOfCopy); copyShardState->copyOutState);
} }
} }
} }
@ -3208,14 +3209,15 @@ ConnectionStateList(HTAB *connectionStateHash)
static CopyShardState * static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash, GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
shouldUseLocalCopy, MemoryContext context) shouldUseLocalCopy, CopyOutState copyOutState)
{ {
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
HASH_ENTER, found); HASH_ENTER, found);
if (!*found) if (!*found)
{ {
InitializeCopyShardState(shardState, connectionStateHash, InitializeCopyShardState(shardState, connectionStateHash,
shardId, stopOnFailure, shouldUseLocalCopy, context); shardId, stopOnFailure, shouldUseLocalCopy,
copyOutState);
} }
return shardState; return shardState;
@ -3230,16 +3232,12 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
static void static void
InitializeCopyShardState(CopyShardState *shardState, InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId, HTAB *connectionStateHash, uint64 shardId,
bool stopOnFailure, bool shouldUseLocalCopy, MemoryContext bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
context) copyOutState)
{ {
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
int failedPlacementCount = 0; int failedPlacementCount = 0;
MemoryContext oldContext = MemoryContextSwitchTo(context);
MemoryContextSwitchTo(oldContext);
MemoryContext localContext = MemoryContext localContext =
AllocSetContextCreateExtended(CurrentMemoryContext, AllocSetContextCreateExtended(CurrentMemoryContext,
"InitializeCopyShardState", "InitializeCopyShardState",
@ -3249,7 +3247,7 @@ InitializeCopyShardState(CopyShardState *shardState,
/* release active placement list at the end of this function */ /* release active placement list at the end of this function */
oldContext = MemoryContextSwitchTo(localContext); MemoryContext oldContext = MemoryContextSwitchTo(localContext);
List *activePlacementList = ActiveShardPlacementList(shardId); List *activePlacementList = ActiveShardPlacementList(shardId);
@ -3257,7 +3255,7 @@ InitializeCopyShardState(CopyShardState *shardState,
shardState->shardId = shardId; shardState->shardId = shardId;
shardState->placementStateList = NIL; shardState->placementStateList = NIL;
shardState->localCopyBuffer = makeStringInfo(); shardState->copyOutState = NULL;
shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); shardState->containsLocalPlacement = ContainsLocalPlacement(shardId);
@ -3267,6 +3265,8 @@ InitializeCopyShardState(CopyShardState *shardState,
if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId())
{ {
shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState));
CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState);
LogLocalCopyExecution(shardId); LogLocalCopyExecution(shardId);
continue; continue;
} }
@ -3325,6 +3325,28 @@ InitializeCopyShardState(CopyShardState *shardState,
} }
/*
* CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new
* fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because
* in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each
* placement and the serialization functions take a CopyOutState as a parameter.
*/
static void
CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to)
{
to->attnumlist = from->attnumlist;
to->binary = from->binary;
to->copy_dest = from->copy_dest;
to->delim = from->delim;
to->file_encoding = from->file_encoding;
to->need_transcoding = from->need_transcoding;
to->null_print = from->null_print;
to->null_print_client = from->null_print_client;
to->rowcontext = from->rowcontext;
to->fe_msgbuf = makeStringInfo();
}
/* /*
* LogLocalCopyExecution logs that the copy will be done locally for * LogLocalCopyExecution logs that the copy will be done locally for
* the given shard. * the given shard.
@ -3336,7 +3358,7 @@ LogLocalCopyExecution(uint64 shardId)
{ {
return; return;
} }
ereport(NOTICE, (errmsg("executing the copy locally for shard"))); ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId)));
} }

View File

@ -2,8 +2,18 @@
#ifndef LOCAL_MULTI_COPY #ifndef LOCAL_MULTI_COPY
#define LOCAL_MULTI_COPY #define LOCAL_MULTI_COPY
extern void ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 /*
shardId, * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed.
StringInfo buffer, bool isEndOfCopy); * There will be one buffer for each local placement, when the buffer size
* exceeds this threshold, it will be flushed.
*/
#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024)
extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
int64
shardId,
CopyOutState localCopyOutState);
extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
CopyOutState localCopyOutState);
#endif /* LOCAL_MULTI_COPY */ #endif /* LOCAL_MULTI_COPY */

View File

@ -1,6 +1,7 @@
CREATE SCHEMA local_shard_copy; CREATE SCHEMA local_shard_copy;
SET search_path TO local_shard_copy; SET search_path TO local_shard_copy;
SET client_min_messages TO DEBUG; SET client_min_messages TO DEBUG;
SET citus.next_shard_id TO 1570000;
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);
DEBUG: schema "public" already exists, skipping DEBUG: schema "public" already exists, skipping
DETAIL: NOTICE from localhost:xxxxx DETAIL: NOTICE from localhost:xxxxx
@ -144,15 +145,15 @@ SELECT shard_of_distribution_column_is_local(12);
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1; SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
21 21
@ -160,12 +161,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- the local placements should be executed locally -- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
-- verify that the copy is successful. -- verify that the copy is successful.
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
26 26
@ -175,15 +176,15 @@ ROLLBACK;
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1; SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
21 21
@ -191,12 +192,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- the local placements should be executed locally -- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
-- verify the put ages. -- verify the put ages.
SELECT * FROM distributed_table; SELECT * FROM distributed_table;
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
key | age key | age
--------------------------------------------------------------------- ---------------------------------------------------------------------
20 | 20 20 | 20
@ -231,15 +232,15 @@ ROLLBACK;
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1; SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
21 21
@ -247,12 +248,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- the local placements should be executed locally -- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
-- verify that the copy is successful. -- verify that the copy is successful.
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
26 26
@ -262,14 +263,14 @@ ROLLBACK;
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT age FROM distributed_table WHERE key = 1; SELECT age FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
age age
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
SELECT count(*) FROM collections_list; SELECT count(*) FROM collections_list;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -277,12 +278,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- the local placements should be executed locally -- the local placements should be executed locally
COPY collections_list FROM STDIN WITH delimiter ','; COPY collections_list FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY collections_list, line 1: "1, 0" CONTEXT: COPY collections_list, line 1: "1, 0"
-- verify that the copy is successful. -- verify that the copy is successful.
SELECT count(*) FROM collections_list; SELECT count(*) FROM collections_list;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
5 5
@ -292,14 +293,14 @@ ROLLBACK;
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT age FROM distributed_table WHERE key = 1; SELECT age FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
age age
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
21 21
@ -307,12 +308,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- the local placements should be executed locally -- the local placements should be executed locally
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
-- verify that the copy is successful. -- verify that the copy is successful.
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
26 26
@ -322,7 +323,7 @@ ROLLBACK;
BEGIN; BEGIN;
-- Since we are in a transaction, the copy should be locally executed. -- Since we are in a transaction, the copy should be locally executed.
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
ROLLBACK; ROLLBACK;
-- Since we are not in a transaction, the copy should not be locally executed. -- Since we are not in a transaction, the copy should not be locally executed.
@ -331,32 +332,32 @@ BEGIN;
-- Since we are in a transaction, the copy should be locally executed. But -- Since we are in a transaction, the copy should be locally executed. But
-- we are putting duplicate key, so it should error. -- we are putting duplicate key, so it should error.
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1, 100" CONTEXT: COPY distributed_table, line 1: "1, 100"
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001"
DETAIL: Key (key)=(1) already exists. DETAIL: Key (key)=(1) already exists.
CONTEXT: COPY distributed_table_1330001, line 1 CONTEXT: COPY distributed_table_1570001, line 1
ROLLBACK; ROLLBACK;
TRUNCATE distributed_table; TRUNCATE distributed_table;
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check"
DETAIL: Failing row contains (1, 9). DETAIL: Failing row contains (1, 9).
BEGIN; BEGIN;
-- Since we are in a transaction, the execution will be local, however we are putting invalid age. -- Since we are in a transaction, the execution will be local, however we are putting invalid age.
-- The constaints should give an error -- The constaints should give an error
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1,9" CONTEXT: COPY distributed_table, line 1: "1,9"
ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check"
DETAIL: Failing row contains (1, 9). DETAIL: Failing row contains (1, 9).
CONTEXT: COPY distributed_table_1330001, line 1 CONTEXT: COPY distributed_table_1570001, line 1
ROLLBACK; ROLLBACK;
TRUNCATE distributed_table; TRUNCATE distributed_table;
-- different delimiters -- different delimiters
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1; SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -364,20 +365,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- initial size -- initial size
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
COPY distributed_table FROM STDIN WITH delimiter '|'; COPY distributed_table FROM STDIN WITH delimiter '|';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1|10" CONTEXT: COPY distributed_table, line 1: "1|10"
-- new size -- new size
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3
@ -387,7 +388,7 @@ ROLLBACK;
BEGIN; BEGIN;
-- run select with local execution -- run select with local execution
SELECT count(*) FROM distributed_table WHERE key = 1; SELECT count(*) FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -395,20 +396,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar
-- initial size -- initial size
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
COPY distributed_table FROM STDIN WITH delimiter '['; COPY distributed_table FROM STDIN WITH delimiter '[';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1[10" CONTEXT: COPY distributed_table, line 1: "1[10"
-- new size -- new size
SELECT count(*) FROM distributed_table; SELECT count(*) FROM distributed_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3
@ -418,38 +419,38 @@ ROLLBACK;
-- multiple local copies -- multiple local copies
BEGIN; BEGIN;
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1,15" CONTEXT: COPY distributed_table, line 1: "1,15"
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "10,15" CONTEXT: COPY distributed_table, line 1: "10,15"
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "100,15" CONTEXT: COPY distributed_table, line 1: "100,15"
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 2: "200,20" CONTEXT: COPY distributed_table, line 2: "200,20"
ROLLBACK; ROLLBACK;
-- local copy followed by local copy should see the changes -- local copy followed by local copy should see the changes
-- and error since it is a duplicate primary key. -- and error since it is a duplicate primary key.
BEGIN; BEGIN;
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1,15" CONTEXT: COPY distributed_table, line 1: "1,15"
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1,16" CONTEXT: COPY distributed_table, line 1: "1,16"
ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001"
DETAIL: Key (key)=(1) already exists. DETAIL: Key (key)=(1) already exists.
CONTEXT: COPY distributed_table_1330001, line 1 CONTEXT: COPY distributed_table_1570001, line 1
ROLLBACK; ROLLBACK;
-- local copy followed by local copy should see the changes -- local copy followed by local copy should see the changes
BEGIN; BEGIN;
COPY distributed_table FROM STDIN WITH delimiter ','; COPY distributed_table FROM STDIN WITH delimiter ',';
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY distributed_table, line 1: "1,15" CONTEXT: COPY distributed_table, line 1: "1,15"
-- select should see the change -- select should see the change
SELECT key FROM distributed_table WHERE key = 1; SELECT key FROM distributed_table WHERE key = 1;
NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1)
key key
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -471,7 +472,7 @@ SET citus.enable_local_execution = 'on';
BEGIN; BEGIN;
-- copy should be executed locally -- copy should be executed locally
COPY reference_table FROM STDIN; COPY reference_table FROM STDIN;
NOTICE: executing the copy locally for shard NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY reference_table, line 1: "1" CONTEXT: COPY reference_table, line 1: "1"
ROLLBACK; ROLLBACK;
SET citus.enable_local_execution = 'off'; SET citus.enable_local_execution = 'off';

View File

@ -2,6 +2,7 @@ CREATE SCHEMA local_shard_copy;
SET search_path TO local_shard_copy; SET search_path TO local_shard_copy;
SET client_min_messages TO DEBUG; SET client_min_messages TO DEBUG;
SET citus.next_shard_id TO 1570000;
SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);