diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index c280bf7d0..24b2440f7 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -35,18 +35,9 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" -/* - * LOCAL_COPY_BUFFER_SIZE is buffer size for local copy. - * There will be one buffer for each local placement, therefore - * the maximum amount of memory that might be alocated is - * LOCAL_COPY_BUFFER_SIZE * #local_placement - */ -#define LOCAL_COPY_BUFFER_SIZE (1 * 512 * 1024) - - static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, - bool isBinary); + CopyOutState localCopyOutState); static bool ShouldSendCopyNow(StringInfo buffer); static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, @@ -54,71 +45,83 @@ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); /* - * localCopyBuffer is used in copy callback to return the copied rows. + * LocalCopyBuffer is used in copy callback to return the copied rows. * The reason this is a global variable is that we cannot pass an additional * argument to the copy callback. */ -StringInfo localCopyBuffer; +static StringInfo LocalCopyBuffer; /* - * ProcessLocalCopy adds the given slot and does a local copy if + * WriteTupleToLocalShard adds the given slot and does a local copy if * this is the end of copy, or the buffer size exceeds the threshold. */ void -ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 shardId, - StringInfo buffer, bool isEndOfCopy) +WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 + shardId, + CopyOutState localCopyOutState) { - /* - * Here we save the previous buffer, and put the local shard's buffer - * into copyOutState. The motivation is to use the existing logic to - * serialize a row slot into buffer. - */ - StringInfo previousBuffer = copyDest->copyOutState->fe_msgbuf; - copyDest->copyOutState->fe_msgbuf = buffer; - /* since we are doing a local copy, the following statements should use local execution to see the changes */ TransactionAccessedLocalPlacement = true; - bool isBinaryCopy = copyDest->copyOutState->binary; - AddSlotToBuffer(slot, copyDest, isBinaryCopy); + bool isBinaryCopy = localCopyOutState->binary; + if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) + { + AppendCopyBinaryHeaders(localCopyOutState); + } - if (isEndOfCopy || ShouldSendCopyNow(buffer)) + AddSlotToBuffer(slot, copyDest, localCopyOutState); + + if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf)) { if (isBinaryCopy) { - AppendCopyBinaryFooters(copyDest->copyOutState); + /* + * We're going to flush the buffer to disk by effectively doing a full COPY command. + * Hence we also need to add footers to the current buffer. + */ + AppendCopyBinaryFooters(localCopyOutState); } - - DoLocalCopy(buffer, copyDest->distributedRelationId, shardId, + bool isEndOfCopy = false; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, + shardId, copyDest->copyStatement, isEndOfCopy); } - copyDest->copyOutState->fe_msgbuf = previousBuffer; } /* - * AddSlotToBuffer serializes the given slot and adds it to the buffer in copyDest. - * If the copy format is binary, it adds binary headers as well. + * FinishLocalCopyToShard finishes local copy for the given shard with the shard id. + */ +void +FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState) +{ + bool isBinaryCopy = localCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + bool isEndOfCopy = true; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, + copyDest->copyStatement, isEndOfCopy); +} + + +/* + * AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState. */ static void -AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary) +AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState + localCopyOutState) { - if (ShouldAddBinaryHeaders(copyDest->copyOutState->fe_msgbuf, isBinary)) - { - AppendCopyBinaryHeaders(copyDest->copyOutState); - } + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; - if (slot != NULL) - { - Datum *columnValues = slot->tts_values; - bool *columnNulls = slot->tts_isnull; - FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; - CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; - - AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, - copyDest->copyOutState, columnOutputFunctions, - columnCoercionPaths); - } + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, + localCopyOutState, columnOutputFunctions, + columnCoercionPaths); } @@ -129,19 +132,25 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBi static bool ShouldSendCopyNow(StringInfo buffer) { - return buffer->len > LOCAL_COPY_BUFFER_SIZE; + return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD; } /* * DoLocalCopy finds the shard table from the distributed relation id, and copies the given * buffer into the shard. + * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though + * it was reading from stdin. It then parses the tuples and writes them to the shardOid table. */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, bool isEndOfCopy) { - localCopyBuffer = buffer; + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it. + * We cannot pass additional arguments to ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = buffer; Oid shardOid = GetShardLocalTableOid(relationId, shardId); Relation shard = heap_open(shardOid, RowExclusiveLock); @@ -189,15 +198,15 @@ static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) { int bytesread = 0; - int avail = localCopyBuffer->len - localCopyBuffer->cursor; + int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; int bytesToRead = Min(avail, maxread); if (bytesToRead > 0) { memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), - &localCopyBuffer->data[localCopyBuffer->cursor], bytesToRead); + &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); } bytesread += bytesToRead; - localCopyBuffer->cursor += bytesToRead; + LocalCopyBuffer->cursor += bytesToRead; return bytesread; } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0e698fe24..34852becb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -164,6 +164,7 @@ struct CopyPlacementState /* State of shard to which the placement belongs to. */ CopyShardState *shardState; + /* node group ID of the placement */ int32 groupId; /* @@ -183,7 +184,7 @@ struct CopyShardState uint64 shardId; /* used for doing local copy */ - StringInfo localCopyBuffer; + CopyOutState copyOutState; /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ bool containsLocalPlacement; @@ -242,15 +243,16 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, - bool *found, bool shouldUseLocalCopy, MemoryContext - context); + bool *found, bool shouldUseLocalCopy, CopyOutState + copyOutState); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool - canUseLocalCopy, MemoryContext context); + canUseLocalCopy, + CopyOutState copyOutState); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -288,6 +290,7 @@ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); +static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static bool ShouldExecuteCopyLocally(void); static void LogLocalCopyExecution(uint64 shardId); @@ -2285,7 +2288,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, - copyDest->memoryContext); + copyDest->copyOutState); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2308,9 +2311,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) { - bool isEndOfCopy = false; - ProcessLocalCopy(slot, copyDest, shardId, shardState->localCopyBuffer, - isEndOfCopy); + WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState); } @@ -2535,13 +2536,13 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest) HASH_SEQ_STATUS status; CopyShardState *copyShardState; - bool isEndOfCopy = true; foreach_htab(copyShardState, &status, shardStateHash) { - if (copyShardState->localCopyBuffer->len > 0) + if (copyShardState->copyOutState != NULL && + copyShardState->copyOutState->fe_msgbuf->len > 0) { - ProcessLocalCopy(NULL, copyDest, copyShardState->shardId, - copyShardState->localCopyBuffer, isEndOfCopy); + FinishLocalCopyToShard(copyDest, copyShardState->shardId, + copyShardState->copyOutState); } } } @@ -3208,14 +3209,15 @@ ConnectionStateList(HTAB *connectionStateHash) static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, MemoryContext context) + shouldUseLocalCopy, CopyOutState copyOutState) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure, shouldUseLocalCopy, context); + shardId, stopOnFailure, shouldUseLocalCopy, + copyOutState); } return shardState; @@ -3230,16 +3232,12 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure, bool shouldUseLocalCopy, MemoryContext - context) + bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState + copyOutState) { ListCell *placementCell = NULL; int failedPlacementCount = 0; - MemoryContext oldContext = MemoryContextSwitchTo(context); - - MemoryContextSwitchTo(oldContext); - MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, "InitializeCopyShardState", @@ -3249,7 +3247,7 @@ InitializeCopyShardState(CopyShardState *shardState, /* release active placement list at the end of this function */ - oldContext = MemoryContextSwitchTo(localContext); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); List *activePlacementList = ActiveShardPlacementList(shardId); @@ -3257,7 +3255,7 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->shardId = shardId; shardState->placementStateList = NIL; - shardState->localCopyBuffer = makeStringInfo(); + shardState->copyOutState = NULL; shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); @@ -3267,6 +3265,8 @@ InitializeCopyShardState(CopyShardState *shardState, if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) { + shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState)); + CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState); LogLocalCopyExecution(shardId); continue; } @@ -3325,6 +3325,28 @@ InitializeCopyShardState(CopyShardState *shardState, } +/* + * CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new + * fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because + * in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each + * placement and the serialization functions take a CopyOutState as a parameter. + */ +static void +CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to) +{ + to->attnumlist = from->attnumlist; + to->binary = from->binary; + to->copy_dest = from->copy_dest; + to->delim = from->delim; + to->file_encoding = from->file_encoding; + to->need_transcoding = from->need_transcoding; + to->null_print = from->null_print; + to->null_print_client = from->null_print_client; + to->rowcontext = from->rowcontext; + to->fe_msgbuf = makeStringInfo(); +} + + /* * LogLocalCopyExecution logs that the copy will be done locally for * the given shard. @@ -3336,7 +3358,7 @@ LogLocalCopyExecution(uint64 shardId) { return; } - ereport(NOTICE, (errmsg("executing the copy locally for shard"))); + ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId))); } diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h index 83dc6a32b..a4e46f015 100644 --- a/src/include/distributed/local_multi_copy.h +++ b/src/include/distributed/local_multi_copy.h @@ -2,8 +2,18 @@ #ifndef LOCAL_MULTI_COPY #define LOCAL_MULTI_COPY -extern void ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 - shardId, - StringInfo buffer, bool isEndOfCopy); +/* + * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed. + * There will be one buffer for each local placement, when the buffer size + * exceeds this threshold, it will be flushed. + */ +#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024) + +extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + int64 + shardId, + CopyOutState localCopyOutState); +extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState); #endif /* LOCAL_MULTI_COPY */ diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 3eeefb812..d42403016 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -1,6 +1,7 @@ CREATE SCHEMA local_shard_copy; SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); DEBUG: schema "public" already exists, skipping DETAIL: NOTICE from localhost:xxxxx @@ -144,15 +145,15 @@ SELECT shard_of_distribution_column_is_local(12); BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -160,12 +161,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -175,15 +176,15 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -191,12 +192,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify the put ages. SELECT * FROM distributed_table; -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true key | age --------------------------------------------------------------------- 20 | 20 @@ -231,15 +232,15 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -247,12 +248,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -262,14 +263,14 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 0 @@ -277,12 +278,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY collections_list FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY collections_list, line 1: "1, 0" -- verify that the copy is successful. SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 5 @@ -292,14 +293,14 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -307,12 +308,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -322,7 +323,7 @@ ROLLBACK; BEGIN; -- Since we are in a transaction, the copy should be locally executed. COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" ROLLBACK; -- Since we are not in a transaction, the copy should not be locally executed. @@ -331,32 +332,32 @@ BEGIN; -- Since we are in a transaction, the copy should be locally executed. But -- we are putting duplicate key, so it should error. COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; COPY distributed_table FROM STDIN WITH delimiter ','; -ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). BEGIN; -- Since we are in a transaction, the execution will be local, however we are putting invalid age. -- The constaints should give an error COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,9" -ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 @@ -364,20 +365,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- initial size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '|'; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1|10" -- new size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 @@ -387,7 +388,7 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 @@ -395,20 +396,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- initial size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '['; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1[10" -- new size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 @@ -418,38 +419,38 @@ ROLLBACK; -- multiple local copies BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "10,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "100,15" -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 2: "200,20" ROLLBACK; -- local copy followed by local copy should see the changes -- and error since it is a duplicate primary key. BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,16" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; -- local copy followed by local copy should see the changes BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" -- select should see the change SELECT key FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) key --------------------------------------------------------------------- 1 @@ -471,7 +472,7 @@ SET citus.enable_local_execution = 'on'; BEGIN; -- copy should be executed locally COPY reference_table FROM STDIN; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY reference_table, line 1: "1" ROLLBACK; SET citus.enable_local_execution = 'off'; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index af8e3932e..f07a346e9 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -2,6 +2,7 @@ CREATE SCHEMA local_shard_copy; SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0);