From f9c443188568cd927ed510b3b61636787fcf00f1 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 9 Mar 2020 15:55:57 +0300 Subject: [PATCH 1/7] add the support to execute copy locally A copy will be executed locally if - Local execution is enabled and current transaction accessed a local placement - Local execution is enabled and we are inside a transaction block. So even if local execution is enabled but we are not in a transaction block, the copy will not be run locally. This will not run locally: ``` COPY distributed_table FROM STDIN; .... ``` This will run locally: ``` SET citus.enable_local_execution to 'on'; BEGIN; COPY distributed_table FROM STDIN; COMMIT; .... ``` . There are 3 ways to do a copy in postgres programmatically: - from a file - from a program - from a callback function I have chosen to implement it with a callback function, which means that we write the rows of copy from a callback function to the output buffer, which is used to insert tuples into the actual table. For each shard id, we have a buffer that keeps the current rows to be written, we perform the actual copy operation either when: - copy buffer for the given shard id reaches to a threshold, which is currently 512KB - we reach to the end of the copy The buffer size is debatable(512KB). At a given time, we might allocate (local placement * buffer size) memory at most. The local copy uses the same copy format as remote copy, which means that we serialize the data in the same format as remote copy and send it locally. There was also the option to use ExecSimpleRelationInsert to insert slots one by one, which would avoid the extra serialization/deserialization but doing some benchmarks it seems that using buffers are significantly better in terms of the performance. You can see this comment for more details: https://github.com/citusdata/citus/pull/3557#discussion_r389499054 --- .../commands/create_distributed_table.c | 5 +- .../distributed/commands/local_multi_copy.c | 241 ++++++++++++++++++ src/backend/distributed/commands/multi_copy.c | 198 ++++++++++++-- .../executor/insert_select_executor.c | 10 +- .../distributed/planner/deparse_shard_query.c | 7 +- src/backend/distributed/utils/shard_utils.c | 4 +- src/include/distributed/commands/multi_copy.h | 7 +- src/include/distributed/local_multi_copy.h | 9 + src/include/distributed/shard_utils.h | 2 +- 9 files changed, 453 insertions(+), 30 deletions(-) create mode 100644 src/backend/distributed/commands/local_multi_copy.c create mode 100644 src/include/distributed/local_multi_copy.h diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 60caa58a5..aa76f338d 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1258,12 +1258,15 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; + /* here we already have the data locally */ + bool hasCopyDataLocally = true; copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, estate, stopOnFailure, - NULL); + NULL, + hasCopyDataLocally); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c new file mode 100644 index 000000000..54fedc2a5 --- /dev/null +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -0,0 +1,241 @@ +/*------------------------------------------------------------------------- + * + * local_multi_copy.c + * Commands for running a copy locally + * + * For each local placement, we have a buffer. When we receive a slot + * from a copy, the slot will be put to the corresponding buffer based + * on the shard id. When the buffer size exceeds the threshold a local + * copy will be done. Also If we reach to the end of copy, we will send + * the current buffer for local copy. + * + * The existing logic from multi_copy.c and format are used, therefore + * even if user did not do a copy with binary format, it is possible that + * we are going to be using binary format internally. + * + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "commands/copy.h" +#include "catalog/namespace.h" +#include "parser/parse_relation.h" +#include "utils/lsyscache.h" +#include "nodes/makefuncs.h" +#include "safe_lib.h" +#include /* for htons */ + +#include "distributed/transmit.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/local_executor.h" +#include "distributed/local_multi_copy.h" +#include "distributed/shard_utils.h" + +/* + * LOCAL_COPY_BUFFER_SIZE is buffer size for local copy. + * There will be one buffer for each local placement, therefore + * the maximum amount of memory that might be alocated is + * LOCAL_COPY_BUFFER_SIZE * #local_placement + */ +#define LOCAL_COPY_BUFFER_SIZE (1 * 512 * 1024) + + +static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); +static Relation CreateCopiedShard(RangeVar *distributedRel, Relation shard); +static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + bool isBinary); + +static bool ShouldSendCopyNow(StringInfo buffer); +static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, + CopyStmt *copyStatement, bool isEndOfCopy); +static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); + +/* + * localCopyBuffer is used in copy callback to return the copied rows. + * The reason this is a global variable is that we cannot pass an additional + * argument to the copy callback. + */ +StringInfo localCopyBuffer; + +/* + * ProcessLocalCopy adds the given slot and does a local copy if + * this is the end of copy, or the buffer size exceeds the threshold. + */ +void +ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 shardId, + StringInfo buffer, bool isEndOfCopy) +{ + /* + * Here we save the previous buffer, and put the local shard's buffer + * into copyOutState. The motivation is to use the existing logic to + * serialize a row slot into buffer. + */ + StringInfo previousBuffer = copyDest->copyOutState->fe_msgbuf; + copyDest->copyOutState->fe_msgbuf = buffer; + + /* since we are doing a local copy, the following statements should use local execution to see the changes */ + TransactionAccessedLocalPlacement = true; + + bool isBinaryCopy = copyDest->copyOutState->binary; + AddSlotToBuffer(slot, copyDest, isBinaryCopy); + + if (isEndOfCopy || ShouldSendCopyNow(buffer)) + { + if (isBinaryCopy) + { + AppendCopyBinaryFooters(copyDest->copyOutState); + } + + DoLocalCopy(buffer, copyDest->distributedRelationId, shardId, + copyDest->copyStatement, isEndOfCopy); + } + copyDest->copyOutState->fe_msgbuf = previousBuffer; +} + + +/* + * AddSlotToBuffer serializes the given slot and adds it to the buffer in copyDest. + * If the copy format is binary, it adds binary headers as well. + */ +static void +AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary) +{ + if (ShouldAddBinaryHeaders(copyDest->copyOutState->fe_msgbuf, isBinary)) + { + AppendCopyBinaryHeaders(copyDest->copyOutState); + } + + if (slot != NULL) + { + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; + + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, + copyDest->copyOutState, columnOutputFunctions, + columnCoercionPaths); + } +} + + +/* + * ShouldSendCopyNow returns true if the given buffer size exceeds the + * local copy buffer size threshold. + */ +static bool +ShouldSendCopyNow(StringInfo buffer) +{ + return buffer->len > LOCAL_COPY_BUFFER_SIZE; +} + + +/* + * DoLocalCopy finds the shard table from the distributed relation id, and copies the given + * buffer into the shard. + */ +static void +DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, + bool isEndOfCopy) +{ + localCopyBuffer = buffer; + + Oid shardOid = GetShardLocalTableOid(relationId, shardId); + Relation shard = heap_open(shardOid, RowExclusiveLock); + Relation copiedShard = CreateCopiedShard(copyStatement->relation, shard); + ParseState *pState = make_parsestate(NULL); + + /* p_rtable of pState is set so that we can check constraints. */ + pState->p_rtable = CreateRangeTable(copiedShard, ACL_INSERT); + + CopyState cstate = BeginCopyFrom(pState, copiedShard, NULL, false, + ReadFromLocalBufferCallback, + copyStatement->attlist, copyStatement->options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + heap_close(shard, NoLock); + free_parsestate(pState); + FreeStringInfo(buffer); + if (!isEndOfCopy) + { + buffer = makeStringInfo(); + } +} + + +/* + * ShouldAddBinaryHeaders returns true if the given buffer + * is empty and the format is binary. + */ +static bool +ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) +{ + if (!isBinary) + { + return false; + } + return buffer->len == 0; +} + + +/* + * CreateCopiedShard clones deep copies the necessary fields of the given + * relation. + */ +Relation +CreateCopiedShard(RangeVar *distributedRel, Relation shard) +{ + TupleDesc tupleDescriptor = RelationGetDescr(shard); + + Relation copiedDistributedRelation = (Relation) palloc(sizeof(RelationData)); + Form_pg_class copiedDistributedRelationTuple = (Form_pg_class) palloc( + CLASS_TUPLE_SIZE); + + *copiedDistributedRelation = *shard; + *copiedDistributedRelationTuple = *shard->rd_rel; + + copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple; + copiedDistributedRelation->rd_att = CreateTupleDescCopyConstr(tupleDescriptor); + + Oid tableId = RangeVarGetRelid(distributedRel, NoLock, false); + + /* + * BeginCopyFrom opens all partitions of given partitioned table with relation_open + * and it expects its caller to close those relations. We do not have direct access + * to opened relations, thus we are changing relkind of partitioned tables so that + * Postgres will treat those tables as regular relations and will not open its + * partitions. + */ + if (PartitionedTable(tableId)) + { + copiedDistributedRelationTuple->relkind = RELKIND_RELATION; + } + return copiedDistributedRelation; +} + + +/* + * ReadFromLocalBufferCallback is the copy callback. + * It always tries to copy maxread bytes. + */ +static int +ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) +{ + int bytesread = 0; + int avail = localCopyBuffer->len - localCopyBuffer->cursor; + int bytesToRead = Min(avail, maxread); + if (bytesToRead > 0) + { + memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), + &localCopyBuffer->data[localCopyBuffer->cursor], bytesToRead); + } + bytesread += bytesToRead; + localCopyBuffer->cursor += bytesToRead; + + return bytesread; +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0c5d52d0e..8d2dd9928 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -85,6 +85,8 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" +#include "distributed/local_multi_copy.h" +#include "distributed/hash_helpers.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "libpq/libpq.h" @@ -162,6 +164,8 @@ struct CopyPlacementState /* State of shard to which the placement belongs to. */ CopyShardState *shardState; + int32 groupId; + /* * Buffered COPY data. When the placement is activePlacementState of * some connection, this is empty. Because in that case we directly @@ -178,6 +182,12 @@ struct CopyShardState /* Used as hash key. */ uint64 shardId; + /* used for doing local copy */ + StringInfo localCopyBuffer; + + /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ + bool containsLocalPlacement; + /* List of CopyPlacementStates for all active placements of the shard. */ List *placementStateList; }; @@ -232,13 +242,15 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, - bool *found); + bool *found, bool shouldUseLocalCopy, MemoryContext + context); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, - uint64 shardId, bool stopOnFailure); + uint64 shardId, bool stopOnFailure, bool + canUseLocalCopy, MemoryContext context); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -274,6 +286,10 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest); static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); +static bool ContainsLocalPlacement(int64 shardId); +static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); +static bool ShouldExecuteCopyLocally(void); +static void LogLocalCopyExecution(uint64 shardId); /* exports for SQL callable functions */ @@ -415,9 +431,12 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) stopOnFailure = true; } + bool hasCopyDataLocally = false; + /* set up the destination for the COPY */ copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, stopOnFailure, NULL); + executorState, stopOnFailure, NULL, + hasCopyDataLocally); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1960,11 +1979,13 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultIdPrefix) + char *intermediateResultIdPrefix, bool hasCopyDataLocally) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); + copyDest->shouldUseLocalCopy = !hasCopyDataLocally && ShouldExecuteCopyLocally(); + /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; copyDest->pub.rStartup = CitusCopyDestReceiverStartup; @@ -1985,6 +2006,44 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu } +/* + * ShouldExecuteCopyLocally returns true if the current copy + * operation should be done locally for local placements. + */ +static bool +ShouldExecuteCopyLocally() +{ + if (!EnableLocalExecution) + { + return false; + } + + if (TransactionAccessedLocalPlacement) + { + /* + * For various reasons, including the transaction visibility + * rules (e.g., read-your-own-writes), we have to use local + * execution again if it has already happened within this + * transaction block. + * + * We might error out later in the execution if it is not suitable + * to execute the tasks locally. + */ + Assert(IsMultiStatementTransaction() || InCoordinatedTransaction()); + + /* + * TODO: A future improvement could be to keep track of which placements + * have been locally executed. At this point, only use local execution for + * those placements. That'd help to benefit more from parallelism. + */ + + return true; + } + + return IsMultiStatementTransaction(); +} + + /* * CitusCopyDestReceiverStartup implements the rStartup interface of * CitusCopyDestReceiver. It opens the relation, acquires necessary @@ -2013,9 +2072,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; - /* Citus currently doesn't know how to handle COPY command locally */ - ErrorIfTransactionAccessedPlacementsLocally(); - /* look up table properties */ Relation distributedRelation = heap_open(tableId, RowExclusiveLock); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId); @@ -2145,6 +2201,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } + copyStatement->query = NULL; copyStatement->attlist = attributeList; copyStatement->is_from = true; @@ -2228,7 +2285,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, - &cachedShardStateFound); + &cachedShardStateFound, + copyDest->shouldUseLocalCopy, + copyDest->memoryContext); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2249,6 +2308,14 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } } + if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) + { + bool isEndOfCopy = false; + ProcessLocalCopy(slot, copyDest, shardId, shardState->localCopyBuffer, + isEndOfCopy); + } + + foreach(placementStateCell, shardState->placementStateList) { CopyPlacementState *currentPlacementState = lfirst(placementStateCell); @@ -2276,6 +2343,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest { StartPlacementStateCopyCommand(currentPlacementState, copyStatement, copyOutState); + dlist_delete(¤tPlacementState->bufferedPlacementNode); connectionState->activePlacementState = currentPlacementState; @@ -2330,6 +2398,30 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } +/* + * ContainsLocalPlacement returns true if the current node has + * a local placement for the given shard id. + */ +static bool +ContainsLocalPlacement(int64 shardId) +{ + ListCell *placementCell = NULL; + List *activePlacementList = ActiveShardPlacementList(shardId); + int32 localGroupId = GetLocalGroupId(); + + foreach(placementCell, activePlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == localGroupId) + { + return true; + } + } + return false; +} + + /* * ShardIdForTuple returns id of the shard to which the given tuple belongs to. */ @@ -2407,6 +2499,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; List *connectionStateList = ConnectionStateList(connectionStateHash); + FinishLocalCopy(copyDest); PG_TRY(); { @@ -2434,6 +2527,28 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) } +/* + * FinishLocalCopy sends the remaining copies for local placements. + */ +static void +FinishLocalCopy(CitusCopyDestReceiver *copyDest) +{ + HTAB *shardStateHash = copyDest->shardStateHash; + HASH_SEQ_STATUS status; + CopyShardState *copyShardState; + + bool isEndOfCopy = true; + foreach_htab(copyShardState, &status, shardStateHash) + { + if (copyShardState->localCopyBuffer->len > 0) + { + ProcessLocalCopy(NULL, copyDest, copyShardState->shardId, + copyShardState->localCopyBuffer, isEndOfCopy); + } + } +} + + /* * ShutdownCopyConnectionState ends the copy command for the current active * placement on connection, and then sends the rest of the buffers over the @@ -2864,7 +2979,6 @@ CheckCopyPermissions(CopyStmt *copyStatement) /* *INDENT-OFF* */ bool is_from = copyStatement->is_from; Relation rel; - Oid relid; List *range_table = NIL; TupleDesc tupDesc; AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); @@ -2874,15 +2988,8 @@ CheckCopyPermissions(CopyStmt *copyStatement) rel = heap_openrv(copyStatement->relation, is_from ? RowExclusiveLock : AccessShareLock); - relid = RelationGetRelid(rel); - - RangeTblEntry *rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = relid; - rte->relkind = rel->rd_rel->relkind; - rte->requiredPerms = required_access; - range_table = list_make1(rte); - + range_table = CreateRangeTable(rel, required_access); + RangeTblEntry *rte = (RangeTblEntry*) linitial(range_table); tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist); @@ -2909,6 +3016,21 @@ CheckCopyPermissions(CopyStmt *copyStatement) } +/* + * CreateRangeTable creates a range table with the given relation. + */ +List * +CreateRangeTable(Relation rel, AclMode requiredAccess) +{ + RangeTblEntry *rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = rel->rd_id; + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = requiredAccess; + return list_make1(rte); +} + + /* Helper for CheckCopyPermissions(), copied from postgres */ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) @@ -3087,14 +3209,15 @@ ConnectionStateList(HTAB *connectionStateHash) */ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, - HTAB *connectionStateHash, bool stopOnFailure, bool *found) + HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool + shouldUseLocalCopy, MemoryContext context) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure); + shardId, stopOnFailure, shouldUseLocalCopy, context); } return shardState; @@ -3109,11 +3232,16 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure) + bool stopOnFailure, bool shouldUseLocalCopy, MemoryContext + context) { ListCell *placementCell = NULL; int failedPlacementCount = 0; + MemoryContext oldContext = MemoryContextSwitchTo(context); + + MemoryContextSwitchTo(oldContext); + MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, "InitializeCopyShardState", @@ -3121,8 +3249,9 @@ InitializeCopyShardState(CopyShardState *shardState, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + /* release active placement list at the end of this function */ - MemoryContext oldContext = MemoryContextSwitchTo(localContext); + oldContext = MemoryContextSwitchTo(localContext); List *activePlacementList = ActiveShardPlacementList(shardId); @@ -3130,11 +3259,20 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->shardId = shardId; shardState->placementStateList = NIL; + shardState->localCopyBuffer = makeStringInfo(); + shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); + foreach(placementCell, activePlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) + { + LogLocalCopyExecution(shardId); + continue; + } + MultiConnection *connection = CopyGetPlacementConnection(placement, stopOnFailure); if (connection == NULL) @@ -3158,6 +3296,7 @@ InitializeCopyShardState(CopyShardState *shardState, CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState)); placementState->shardState = shardState; placementState->data = makeStringInfo(); + placementState->groupId = placement->groupId; placementState->connectionState = connectionState; /* @@ -3188,6 +3327,21 @@ InitializeCopyShardState(CopyShardState *shardState, } +/* + * LogLocalCopyExecution logs that the copy will be done locally for + * the given shard. + */ +static void +LogLocalCopyExecution(uint64 shardId) +{ + if (!(LogRemoteCommands || LogLocalCommands)) + { + return; + } + ereport(NOTICE, (errmsg("executing the copy locally for shard"))); +} + + /* * CopyGetPlacementConnection assigns a connection to the given placement. If * a connection has already been assigned the placement in the current transaction diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6663acc49..f5b3d2cce 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -581,13 +581,16 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); + bool hasCopyDataLocally = true; + /* set up a DestReceiver that copies into the intermediate table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, stopOnFailure, - intermediateResultIdPrefix); + intermediateResultIdPrefix, + hasCopyDataLocally); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -623,12 +626,15 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); + bool hasCopyDataLocally = true; + /* set up a DestReceiver that copies into the distributed table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, - stopOnFailure, NULL); + stopOnFailure, NULL, + hasCopyDataLocally); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 5187e5094..9d26a8702 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -335,7 +335,12 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return true; } - Oid shardOid = GetShardOid(relationShard->relationId, relationShard->shardId); +<<<<<<< HEAD + Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId); +======= + Oid shardOid = GetShardLocalTableOid(relationShard->relationId, + relationShard->shardId); +>>>>>>> add the support to execute copy locally newRte->relid = shardOid; diff --git a/src/backend/distributed/utils/shard_utils.c b/src/backend/distributed/utils/shard_utils.c index ad3acac67..aa0a17921 100644 --- a/src/backend/distributed/utils/shard_utils.c +++ b/src/backend/distributed/utils/shard_utils.c @@ -16,11 +16,11 @@ #include "distributed/shard_utils.h" /* - * GetShardOid returns the oid of the shard from the given distributed relation + * GetShardLocalTableOid returns the oid of the shard from the given distributed relation * with the shardid. */ Oid -GetShardOid(Oid distRelId, uint64 shardId) +GetShardLocalTableOid(Oid distRelId, uint64 shardId) { char *relationName = get_rel_name(distRelId); AppendShardIdToName(&relationName, shardId); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index d91358839..b887cc5ad 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -130,6 +130,9 @@ typedef struct CitusCopyDestReceiver /* useful for tracking multi shard accesses */ bool multiShardCopy; + /* if true, should copy to local placements in the current session */ + bool shouldUseLocalCopy; + /* copy into intermediate result */ char *intermediateResultIdPrefix; } CitusCopyDestReceiver; @@ -141,7 +144,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultPrefix); + char *intermediateResultPrefix, + bool hasCopyDataLocally); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); @@ -154,6 +158,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList); +extern List * CreateRangeTable(Relation rel, AclMode requiredAccess); extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h new file mode 100644 index 000000000..83dc6a32b --- /dev/null +++ b/src/include/distributed/local_multi_copy.h @@ -0,0 +1,9 @@ + +#ifndef LOCAL_MULTI_COPY +#define LOCAL_MULTI_COPY + +extern void ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 + shardId, + StringInfo buffer, bool isEndOfCopy); + +#endif /* LOCAL_MULTI_COPY */ diff --git a/src/include/distributed/shard_utils.h b/src/include/distributed/shard_utils.h index e4fd9a2f2..28addce15 100644 --- a/src/include/distributed/shard_utils.h +++ b/src/include/distributed/shard_utils.h @@ -13,6 +13,6 @@ #include "postgres.h" -extern Oid GetShardOid(Oid distRelId, uint64 shardId); +extern Oid GetShardLocalTableOid(Oid distRelId, uint64 shardId); #endif /* SHARD_UTILS_H */ From 39bbec0f302852e57ec59370e484c51a6fec4aaa Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Mon, 9 Mar 2020 15:59:09 +0300 Subject: [PATCH 2/7] add tests for local copy execution --- .../distributed/planner/deparse_shard_query.c | 4 - src/test/regress/data/orders.2.data | 2 +- .../regress/expected/local_shard_copy.out | 418 ++++++++++++++++++ .../expected/local_shard_execution.out | 20 +- .../locally_execute_intermediate_results.out | 5 + .../multi_replicate_reference_table.out | 2 + src/test/regress/multi_mx_schedule | 2 +- src/test/regress/sql/local_shard_copy.sql | 298 +++++++++++++ .../regress/sql/local_shard_execution.sql | 10 +- .../sql/multi_replicate_reference_table.sql | 3 + 10 files changed, 730 insertions(+), 34 deletions(-) create mode 100644 src/test/regress/expected/local_shard_copy.out create mode 100644 src/test/regress/sql/local_shard_copy.sql diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 9d26a8702..3d76f25c0 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -335,12 +335,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return true; } -<<<<<<< HEAD - Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId); -======= Oid shardOid = GetShardLocalTableOid(relationShard->relationId, relationShard->shardId); ->>>>>>> add the support to execute copy locally newRte->relid = shardOid; diff --git a/src/test/regress/data/orders.2.data b/src/test/regress/data/orders.2.data index 264c368df..43dbde8de 100644 --- a/src/test/regress/data/orders.2.data +++ b/src/test/regress/data/orders.2.data @@ -1484,4 +1484,4 @@ 14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against 14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after 14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro -14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro +14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro \ No newline at end of file diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out new file mode 100644 index 000000000..b7521f9da --- /dev/null +++ b/src/test/regress/expected/local_shard_copy.out @@ -0,0 +1,418 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; +SET client_min_messages TO DEBUG; +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +DEBUG: schema "public" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: extension "plpgsql" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx + master_add_node +--------------------------------------------------------------------- + 32 +(1 row) + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +CREATE TABLE reference_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" +DEBUG: building index "reference_table_pkey" on table "reference_table" serially +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" +DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially +SELECT create_distributed_table('distributed_table','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE local_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" +DEBUG: building index "local_table_pkey" on table "local_table" serially +INSERT INTO local_table SELECT * from generate_series(1, 10); +-- connection worker and get ready for the tests +\c - - - :worker_1_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; +-- pick some example values that reside on the shards locally and remote +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(6); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(500); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(701); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +SELECT shard_of_distribution_column_is_local(12); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify the put ages. + SELECT * FROM distributed_table; +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + key | age +--------------------------------------------------------------------- + 20 | 20 + 24 | 24 + 25 | 25 + 26 | 26 + 31 | 31 + 33 | 33 + 35 | 35 + 1 | 100 + 5 | 500 + 21 | 21 + 28 | 28 + 34 | 34 + 38 | 38 + 39 | 39 + 36 | 36 + 37 | 37 + 40 | 40 + 3 | 300 + 4 | 400 + 22 | 22 + 23 | 23 + 27 | 27 + 29 | 29 + 30 | 30 + 32 | 32 + 2 | 200 +(26 rows) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- + 100 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" +ROLLBACK; +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +COPY distributed_table FROM STDIN WITH delimiter ','; +ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,9" +ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +-- different delimiters +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '|'; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1|10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '['; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1[10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "10,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "100,15" +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 2: "200,20" +ROLLBACK; +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,16" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1330001, line 1 +ROLLBACK; +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1,15" +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + key +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +\c - - - :master_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +TRUNCATE TABLE reference_table; +TRUNCATE TABLE local_table; +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.enable_local_execution = 'on'; +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +NOTICE: executing the copy locally for shard +CONTEXT: COPY reference_table, line 1: "1" +ROLLBACK; +SET citus.enable_local_execution = 'off'; +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +ROLLBACK; +SET citus.enable_local_execution = 'on'; +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e926dd31e..8d3b35ca6 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -618,25 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" -ROLLBACK; --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index d245c324b..c235ed8ef 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -35,6 +35,7 @@ SELECT create_reference_table('ref_table'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage @@ -270,6 +271,7 @@ key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); +NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 @@ -328,7 +330,9 @@ DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 @@ -767,6 +771,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 714e8353e..e23342e00 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -499,11 +499,13 @@ SELECT create_reference_table('replicate_reference_table_copy'); (1 row) +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; +RESET citus.enable_local_execution; DROP TABLE replicate_reference_table_copy; -- test executing DDL command then adding a new node in a transaction CREATE TABLE replicate_reference_table_ddl(column1 int); diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index d3083367f..6c9a6c01b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -39,7 +39,7 @@ test: multi_mx_metadata test: master_evaluation master_evaluation_modify master_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution +test: multi_mx_modifications local_shard_execution local_shard_copy test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql new file mode 100644 index 000000000..e9b548a94 --- /dev/null +++ b/src/test/regress/sql/local_shard_copy.sql @@ -0,0 +1,298 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; + +SET client_min_messages TO DEBUG; + +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + + +CREATE TABLE reference_table (key int PRIMARY KEY); +SELECT create_reference_table('reference_table'); + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +SELECT create_distributed_table('distributed_table','key'); + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); + +CREATE TABLE local_table (key int PRIMARY KEY); +INSERT INTO local_table SELECT * from generate_series(1, 10); + + +-- connection worker and get ready for the tests +\c - - - :worker_1_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; + +-- pick some example values that reside on the shards locally and remote + +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); +SELECT shard_of_distribution_column_is_local(6); +SELECT shard_of_distribution_column_is_local(500); +SELECT shard_of_distribution_column_is_local(701); + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); +SELECT shard_of_distribution_column_is_local(12); + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify the put ages. + SELECT * FROM distributed_table; + +ROLLBACK; + + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +TRUNCATE distributed_table; + +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 9 +\. + +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +1,9 +\. +ROLLBACK; + +TRUNCATE distributed_table; + + +-- different delimiters +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '|'; +1|10 +2|30 +3|40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + +BEGIN; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '['; +1[10 +2[30 +3[40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + + +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +2,20 +3,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +10,15 +20,20 +30,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +100,15 +200,20 +300,30 +\. +ROLLBACK; + +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +1,16 +\. +ROLLBACK; + + +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +ROLLBACK; + +\c - - - :master_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +TRUNCATE TABLE reference_table; +TRUNCATE TABLE local_table; + +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + +SET citus.enable_local_execution = 'on'; + +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'off'; + +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'on'; + +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b5ba49caa..e3688bd9a 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -356,15 +356,7 @@ ROLLBACK; BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ROLLBACK; - --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index ec476e706..8edfa40b7 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -323,6 +323,7 @@ DROP TABLE replicate_reference_table_insert; CREATE TABLE replicate_reference_table_copy(column1 int); SELECT create_reference_table('replicate_reference_table_copy'); +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; 1 @@ -334,6 +335,8 @@ COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ROLLBACK; +RESET citus.enable_local_execution; + DROP TABLE replicate_reference_table_copy; From 1df9601e1326bd78be2e83e5f3b2eff366d3e1c6 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 10 Mar 2020 10:34:23 +0300 Subject: [PATCH 3/7] not use local copy if current transaction is connected to local group If current transaction is connected to local group we should not use local copy, because we might not see some of the changes that are made over the connection to the local group. --- .../commands/create_distributed_table.c | 5 +-- src/backend/distributed/commands/multi_copy.c | 12 +++---- .../executor/insert_select_executor.c | 10 ++---- src/include/distributed/commands/multi_copy.h | 3 +- .../regress/expected/local_shard_copy.out | 33 +++++++++++++++---- .../expected/local_shard_execution.out | 2 +- src/test/regress/sql/local_shard_copy.sql | 11 +++++-- 7 files changed, 44 insertions(+), 32 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index aa76f338d..60caa58a5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1258,15 +1258,12 @@ CopyLocalDataIntoShards(Oid distributedRelationId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; - /* here we already have the data locally */ - bool hasCopyDataLocally = true; copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, columnNameList, partitionColumnIndex, estate, stopOnFailure, - NULL, - hasCopyDataLocally); + NULL); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, tupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 8d2dd9928..0e698fe24 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -431,12 +431,9 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) stopOnFailure = true; } - bool hasCopyDataLocally = false; - /* set up the destination for the COPY */ copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, - executorState, stopOnFailure, NULL, - hasCopyDataLocally); + executorState, stopOnFailure, NULL); dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); @@ -1979,12 +1976,12 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultIdPrefix, bool hasCopyDataLocally) + char *intermediateResultIdPrefix) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); - copyDest->shouldUseLocalCopy = !hasCopyDataLocally && ShouldExecuteCopyLocally(); + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(); /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; @@ -2040,7 +2037,8 @@ ShouldExecuteCopyLocally() return true; } - return IsMultiStatementTransaction(); + /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ + return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index f5b3d2cce..6663acc49 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -581,16 +581,13 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the intermediate table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, stopOnFailure, - intermediateResultIdPrefix, - hasCopyDataLocally); + intermediateResultIdPrefix); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -626,15 +623,12 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); - bool hasCopyDataLocally = true; - /* set up a DestReceiver that copies into the distributed table */ CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, - stopOnFailure, NULL, - hasCopyDataLocally); + stopOnFailure, NULL); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index b887cc5ad..9ec8f280b 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -144,8 +144,7 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, int partitionColumnIndex, EState *executorState, bool stopOnFailure, - char *intermediateResultPrefix, - bool hasCopyDataLocally); + char *intermediateResultPrefix); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index b7521f9da..fbf9dc43b 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -242,26 +242,25 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard -CONTEXT: COPY distributed_table, line 1: "1, 100" -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- - 100 -(1 row) +(0 rows) SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- - 26 + 21 (1 row) + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true @@ -307,8 +306,18 @@ ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 @@ -328,8 +337,18 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + -- initial size SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true count --------------------------------------------------------------------- 0 diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 8d3b35ca6..e2ed1dedb 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -618,7 +618,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index e9b548a94..3b2dc902c 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -124,6 +124,10 @@ BEGIN; ROLLBACK; BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; 1, 100 @@ -132,10 +136,7 @@ BEGIN; 4, 400 5, 500 \. - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; - SELECT count(*) FROM distributed_table; -- verify that the copy is successful. SELECT count(*) FROM distributed_table; @@ -193,6 +194,8 @@ TRUNCATE distributed_table; -- different delimiters BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '|'; @@ -205,6 +208,8 @@ SELECT count(*) FROM distributed_table; ROLLBACK; BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; -- initial size SELECT count(*) FROM distributed_table; COPY distributed_table FROM STDIN WITH delimiter '['; From c22068e75ab57a782a105b72c65979dd6b880d86 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 10 Mar 2020 15:32:11 +0300 Subject: [PATCH 4/7] use the right partition for partitioned tables --- .../distributed/commands/local_multi_copy.c | 42 +--------------- .../regress/expected/local_shard_copy.out | 48 +++++++++++++++++++ src/test/regress/sql/local_shard_copy.sql | 34 +++++++++++++ 3 files changed, 84 insertions(+), 40 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index 54fedc2a5..c280bf7d0 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -45,7 +45,6 @@ static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); -static Relation CreateCopiedShard(RangeVar *distributedRel, Relation shard); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary); @@ -146,13 +145,12 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat Oid shardOid = GetShardLocalTableOid(relationId, shardId); Relation shard = heap_open(shardOid, RowExclusiveLock); - Relation copiedShard = CreateCopiedShard(copyStatement->relation, shard); ParseState *pState = make_parsestate(NULL); /* p_rtable of pState is set so that we can check constraints. */ - pState->p_rtable = CreateRangeTable(copiedShard, ACL_INSERT); + pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); - CopyState cstate = BeginCopyFrom(pState, copiedShard, NULL, false, + CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, ReadFromLocalBufferCallback, copyStatement->attlist, copyStatement->options); CopyFrom(cstate); @@ -183,42 +181,6 @@ ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) } -/* - * CreateCopiedShard clones deep copies the necessary fields of the given - * relation. - */ -Relation -CreateCopiedShard(RangeVar *distributedRel, Relation shard) -{ - TupleDesc tupleDescriptor = RelationGetDescr(shard); - - Relation copiedDistributedRelation = (Relation) palloc(sizeof(RelationData)); - Form_pg_class copiedDistributedRelationTuple = (Form_pg_class) palloc( - CLASS_TUPLE_SIZE); - - *copiedDistributedRelation = *shard; - *copiedDistributedRelationTuple = *shard->rd_rel; - - copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple; - copiedDistributedRelation->rd_att = CreateTupleDescCopyConstr(tupleDescriptor); - - Oid tableId = RangeVarGetRelid(distributedRel, NoLock, false); - - /* - * BeginCopyFrom opens all partitions of given partitioned table with relation_open - * and it expects its caller to close those relations. We do not have direct access - * to opened relations, thus we are changing relkind of partitioned tables so that - * Postgres will treat those tables as regular relations and will not open its - * partitions. - */ - if (PartitionedTable(tableId)) - { - copiedDistributedRelationTuple->relkind = RELKIND_RELATION; - } - return copiedDistributedRelation; -} - - /* * ReadFromLocalBufferCallback is the copy callback. * It always tries to copy maxread bytes. diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index fbf9dc43b..3eeefb812 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -54,6 +54,24 @@ CREATE TABLE local_table (key int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" DEBUG: building index "local_table_pkey" on table "local_table" serially INSERT INTO local_table SELECT * from generate_series(1, 10); +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); +DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key" +SELECT create_distributed_table('collections_list', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); -- connection worker and get ready for the tests \c - - - :worker_1_port SET search_path TO local_shard_copy; @@ -240,6 +258,36 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 26 (1 row) +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- +(0 rows) + + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard +CONTEXT: COPY collections_list, line 1: "1, 0" + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true + count +--------------------------------------------------------------------- + 5 +(1 row) + ROLLBACK; BEGIN; -- run select with local execution diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index 3b2dc902c..af8e3932e 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -22,6 +22,22 @@ INSERT INTO reference_table SELECT * FROM generate_series(1, 10); CREATE TABLE local_table (key int PRIMARY KEY); INSERT INTO local_table SELECT * from generate_series(1, 10); +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); + +SELECT create_distributed_table('collections_list', 'key'); + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); + +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); + -- connection worker and get ready for the tests \c - - - :worker_1_port @@ -123,6 +139,24 @@ BEGIN; ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM collections_list; + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +1, 0 +2, 0 +3, 0 +4, 1 +5, 1 +\. + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; + +ROLLBACK; + BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; From 42cfc4c0e937dd6678dd9a80f62199208c2bee4f Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Tue, 10 Mar 2020 23:47:27 +0300 Subject: [PATCH 5/7] apply review items log shard id in local copy and add more comments --- .../distributed/commands/local_multi_copy.c | 115 ++++++++-------- src/backend/distributed/commands/multi_copy.c | 68 ++++++---- src/include/distributed/local_multi_copy.h | 16 ++- .../regress/expected/local_shard_copy.out | 123 +++++++++--------- src/test/regress/sql/local_shard_copy.sql | 1 + 5 files changed, 183 insertions(+), 140 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index c280bf7d0..24b2440f7 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -35,18 +35,9 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" -/* - * LOCAL_COPY_BUFFER_SIZE is buffer size for local copy. - * There will be one buffer for each local placement, therefore - * the maximum amount of memory that might be alocated is - * LOCAL_COPY_BUFFER_SIZE * #local_placement - */ -#define LOCAL_COPY_BUFFER_SIZE (1 * 512 * 1024) - - static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, - bool isBinary); + CopyOutState localCopyOutState); static bool ShouldSendCopyNow(StringInfo buffer); static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, @@ -54,71 +45,83 @@ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); /* - * localCopyBuffer is used in copy callback to return the copied rows. + * LocalCopyBuffer is used in copy callback to return the copied rows. * The reason this is a global variable is that we cannot pass an additional * argument to the copy callback. */ -StringInfo localCopyBuffer; +static StringInfo LocalCopyBuffer; /* - * ProcessLocalCopy adds the given slot and does a local copy if + * WriteTupleToLocalShard adds the given slot and does a local copy if * this is the end of copy, or the buffer size exceeds the threshold. */ void -ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 shardId, - StringInfo buffer, bool isEndOfCopy) +WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 + shardId, + CopyOutState localCopyOutState) { - /* - * Here we save the previous buffer, and put the local shard's buffer - * into copyOutState. The motivation is to use the existing logic to - * serialize a row slot into buffer. - */ - StringInfo previousBuffer = copyDest->copyOutState->fe_msgbuf; - copyDest->copyOutState->fe_msgbuf = buffer; - /* since we are doing a local copy, the following statements should use local execution to see the changes */ TransactionAccessedLocalPlacement = true; - bool isBinaryCopy = copyDest->copyOutState->binary; - AddSlotToBuffer(slot, copyDest, isBinaryCopy); + bool isBinaryCopy = localCopyOutState->binary; + if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) + { + AppendCopyBinaryHeaders(localCopyOutState); + } - if (isEndOfCopy || ShouldSendCopyNow(buffer)) + AddSlotToBuffer(slot, copyDest, localCopyOutState); + + if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf)) { if (isBinaryCopy) { - AppendCopyBinaryFooters(copyDest->copyOutState); + /* + * We're going to flush the buffer to disk by effectively doing a full COPY command. + * Hence we also need to add footers to the current buffer. + */ + AppendCopyBinaryFooters(localCopyOutState); } - - DoLocalCopy(buffer, copyDest->distributedRelationId, shardId, + bool isEndOfCopy = false; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, + shardId, copyDest->copyStatement, isEndOfCopy); } - copyDest->copyOutState->fe_msgbuf = previousBuffer; } /* - * AddSlotToBuffer serializes the given slot and adds it to the buffer in copyDest. - * If the copy format is binary, it adds binary headers as well. + * FinishLocalCopyToShard finishes local copy for the given shard with the shard id. + */ +void +FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState) +{ + bool isBinaryCopy = localCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + bool isEndOfCopy = true; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, + copyDest->copyStatement, isEndOfCopy); +} + + +/* + * AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState. */ static void -AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBinary) +AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState + localCopyOutState) { - if (ShouldAddBinaryHeaders(copyDest->copyOutState->fe_msgbuf, isBinary)) - { - AppendCopyBinaryHeaders(copyDest->copyOutState); - } + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; - if (slot != NULL) - { - Datum *columnValues = slot->tts_values; - bool *columnNulls = slot->tts_isnull; - FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; - CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; - - AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, - copyDest->copyOutState, columnOutputFunctions, - columnCoercionPaths); - } + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, + localCopyOutState, columnOutputFunctions, + columnCoercionPaths); } @@ -129,19 +132,25 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, bool isBi static bool ShouldSendCopyNow(StringInfo buffer) { - return buffer->len > LOCAL_COPY_BUFFER_SIZE; + return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD; } /* * DoLocalCopy finds the shard table from the distributed relation id, and copies the given * buffer into the shard. + * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though + * it was reading from stdin. It then parses the tuples and writes them to the shardOid table. */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, bool isEndOfCopy) { - localCopyBuffer = buffer; + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it. + * We cannot pass additional arguments to ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = buffer; Oid shardOid = GetShardLocalTableOid(relationId, shardId); Relation shard = heap_open(shardOid, RowExclusiveLock); @@ -189,15 +198,15 @@ static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) { int bytesread = 0; - int avail = localCopyBuffer->len - localCopyBuffer->cursor; + int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; int bytesToRead = Min(avail, maxread); if (bytesToRead > 0) { memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), - &localCopyBuffer->data[localCopyBuffer->cursor], bytesToRead); + &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); } bytesread += bytesToRead; - localCopyBuffer->cursor += bytesToRead; + LocalCopyBuffer->cursor += bytesToRead; return bytesread; } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0e698fe24..34852becb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -164,6 +164,7 @@ struct CopyPlacementState /* State of shard to which the placement belongs to. */ CopyShardState *shardState; + /* node group ID of the placement */ int32 groupId; /* @@ -183,7 +184,7 @@ struct CopyShardState uint64 shardId; /* used for doing local copy */ - StringInfo localCopyBuffer; + CopyOutState copyOutState; /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ bool containsLocalPlacement; @@ -242,15 +243,16 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, - bool *found, bool shouldUseLocalCopy, MemoryContext - context); + bool *found, bool shouldUseLocalCopy, CopyOutState + copyOutState); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, bool stopOnFailure, bool - canUseLocalCopy, MemoryContext context); + canUseLocalCopy, + CopyOutState copyOutState); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -288,6 +290,7 @@ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); +static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); static bool ShouldExecuteCopyLocally(void); static void LogLocalCopyExecution(uint64 shardId); @@ -2285,7 +2288,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, - copyDest->memoryContext); + copyDest->copyOutState); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2308,9 +2311,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) { - bool isEndOfCopy = false; - ProcessLocalCopy(slot, copyDest, shardId, shardState->localCopyBuffer, - isEndOfCopy); + WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState); } @@ -2535,13 +2536,13 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest) HASH_SEQ_STATUS status; CopyShardState *copyShardState; - bool isEndOfCopy = true; foreach_htab(copyShardState, &status, shardStateHash) { - if (copyShardState->localCopyBuffer->len > 0) + if (copyShardState->copyOutState != NULL && + copyShardState->copyOutState->fe_msgbuf->len > 0) { - ProcessLocalCopy(NULL, copyDest, copyShardState->shardId, - copyShardState->localCopyBuffer, isEndOfCopy); + FinishLocalCopyToShard(copyDest, copyShardState->shardId, + copyShardState->copyOutState); } } } @@ -3208,14 +3209,15 @@ ConnectionStateList(HTAB *connectionStateHash) static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, MemoryContext context) + shouldUseLocalCopy, CopyOutState copyOutState) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure, shouldUseLocalCopy, context); + shardId, stopOnFailure, shouldUseLocalCopy, + copyOutState); } return shardState; @@ -3230,16 +3232,12 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure, bool shouldUseLocalCopy, MemoryContext - context) + bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState + copyOutState) { ListCell *placementCell = NULL; int failedPlacementCount = 0; - MemoryContext oldContext = MemoryContextSwitchTo(context); - - MemoryContextSwitchTo(oldContext); - MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, "InitializeCopyShardState", @@ -3249,7 +3247,7 @@ InitializeCopyShardState(CopyShardState *shardState, /* release active placement list at the end of this function */ - oldContext = MemoryContextSwitchTo(localContext); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); List *activePlacementList = ActiveShardPlacementList(shardId); @@ -3257,7 +3255,7 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->shardId = shardId; shardState->placementStateList = NIL; - shardState->localCopyBuffer = makeStringInfo(); + shardState->copyOutState = NULL; shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); @@ -3267,6 +3265,8 @@ InitializeCopyShardState(CopyShardState *shardState, if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) { + shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState)); + CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState); LogLocalCopyExecution(shardId); continue; } @@ -3325,6 +3325,28 @@ InitializeCopyShardState(CopyShardState *shardState, } +/* + * CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new + * fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because + * in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each + * placement and the serialization functions take a CopyOutState as a parameter. + */ +static void +CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to) +{ + to->attnumlist = from->attnumlist; + to->binary = from->binary; + to->copy_dest = from->copy_dest; + to->delim = from->delim; + to->file_encoding = from->file_encoding; + to->need_transcoding = from->need_transcoding; + to->null_print = from->null_print; + to->null_print_client = from->null_print_client; + to->rowcontext = from->rowcontext; + to->fe_msgbuf = makeStringInfo(); +} + + /* * LogLocalCopyExecution logs that the copy will be done locally for * the given shard. @@ -3336,7 +3358,7 @@ LogLocalCopyExecution(uint64 shardId) { return; } - ereport(NOTICE, (errmsg("executing the copy locally for shard"))); + ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId))); } diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h index 83dc6a32b..a4e46f015 100644 --- a/src/include/distributed/local_multi_copy.h +++ b/src/include/distributed/local_multi_copy.h @@ -2,8 +2,18 @@ #ifndef LOCAL_MULTI_COPY #define LOCAL_MULTI_COPY -extern void ProcessLocalCopy(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 - shardId, - StringInfo buffer, bool isEndOfCopy); +/* + * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed. + * There will be one buffer for each local placement, when the buffer size + * exceeds this threshold, it will be flushed. + */ +#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024) + +extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + int64 + shardId, + CopyOutState localCopyOutState); +extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState); #endif /* LOCAL_MULTI_COPY */ diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 3eeefb812..d42403016 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -1,6 +1,7 @@ CREATE SCHEMA local_shard_copy; SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); DEBUG: schema "public" already exists, skipping DETAIL: NOTICE from localhost:xxxxx @@ -144,15 +145,15 @@ SELECT shard_of_distribution_column_is_local(12); BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -160,12 +161,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -175,15 +176,15 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -191,12 +192,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify the put ages. SELECT * FROM distributed_table; -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true key | age --------------------------------------------------------------------- 20 | 20 @@ -231,15 +232,15 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 (1 row) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -247,12 +248,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -262,14 +263,14 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 0 @@ -277,12 +278,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY collections_list FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY collections_list, line 1: "1, 0" -- verify that the copy is successful. SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1330007 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true count --------------------------------------------------------------------- 5 @@ -292,14 +293,14 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) age --------------------------------------------------------------------- (0 rows) SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 21 @@ -307,12 +308,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- the local placements should be executed locally COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -- verify that the copy is successful. SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 26 @@ -322,7 +323,7 @@ ROLLBACK; BEGIN; -- Since we are in a transaction, the copy should be locally executed. COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" ROLLBACK; -- Since we are not in a transaction, the copy should not be locally executed. @@ -331,32 +332,32 @@ BEGIN; -- Since we are in a transaction, the copy should be locally executed. But -- we are putting duplicate key, so it should error. COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1, 100" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; COPY distributed_table FROM STDIN WITH delimiter ','; -ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). BEGIN; -- Since we are in a transaction, the execution will be local, however we are putting invalid age. -- The constaints should give an error COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,9" -ERROR: new row for relation "distributed_table_1330001" violates check constraint "distributed_table_age_check" +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; -- different delimiters BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 @@ -364,20 +365,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- initial size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '|'; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1|10" -- new size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 @@ -387,7 +388,7 @@ ROLLBACK; BEGIN; -- run select with local execution SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 0 @@ -395,20 +396,20 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- initial size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 0 (1 row) COPY distributed_table FROM STDIN WITH delimiter '['; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1[10" -- new size SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1330003 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true count --------------------------------------------------------------------- 3 @@ -418,38 +419,38 @@ ROLLBACK; -- multiple local copies BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "10,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "100,15" -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 2: "200,20" ROLLBACK; -- local copy followed by local copy should see the changes -- and error since it is a duplicate primary key. BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,16" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1330001" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1330001, line 1 +CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; -- local copy followed by local copy should see the changes BEGIN; COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY distributed_table, line 1: "1,15" -- select should see the change SELECT key FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1330001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) key --------------------------------------------------------------------- 1 @@ -471,7 +472,7 @@ SET citus.enable_local_execution = 'on'; BEGIN; -- copy should be executed locally COPY reference_table FROM STDIN; -NOTICE: executing the copy locally for shard +NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY reference_table, line 1: "1" ROLLBACK; SET citus.enable_local_execution = 'off'; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index af8e3932e..f07a346e9 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -2,6 +2,7 @@ CREATE SCHEMA local_shard_copy; SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); From 9d2f3c392a56e56ac04eff2575bef460503b1164 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Thu, 12 Mar 2020 14:34:38 +0300 Subject: [PATCH 6/7] enable local execution in INSERT..SELECT and add more tests We can use local copy in INSERT..SELECT, so the check that disables local execution is removed. Also a test for local copy where the data size > LOCAL_COPY_FLUSH_THRESHOLD is added. use local execution with insert..select --- .../distributed/commands/local_multi_copy.c | 44 ++++++++++--------- .../distributed/executor/citus_custom_scan.c | 4 ++ .../executor/insert_select_executor.c | 28 ++++-------- .../distributed/executor/local_executor.c | 42 ++++++++++++++++++ src/include/distributed/local_executor.h | 3 ++ .../expected/coordinator_shouldhaveshards.out | 2 + .../regress/expected/local_shard_copy.out | 8 ++++ .../expected/local_shard_execution.out | 29 +++--------- .../expected/master_evaluation_modify.out | 2 + .../multi_mx_insert_select_repartition.out | 6 +-- .../multi_mx_transaction_recovery.out | 2 +- .../multi_mx_truncate_from_worker.out | 2 + ...licate_reference_tables_to_coordinator.out | 1 + src/test/regress/sql/local_shard_copy.sql | 8 ++++ .../regress/sql/local_shard_execution.sql | 16 ------- .../sql/multi_mx_truncate_from_worker.sql | 2 + 16 files changed, 116 insertions(+), 83 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index 24b2440f7..efa8aa954 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -35,7 +35,7 @@ #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" -static int ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread); +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState localCopyOutState); @@ -60,7 +60,10 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in shardId, CopyOutState localCopyOutState) { - /* since we are doing a local copy, the following statements should use local execution to see the changes */ + /* + * Since we are doing a local copy, the following statements should + * use local execution to see the changes + */ TransactionAccessedLocalPlacement = true; bool isBinaryCopy = localCopyOutState->binary; @@ -76,8 +79,8 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in if (isBinaryCopy) { /* - * We're going to flush the buffer to disk by effectively doing a full COPY command. - * Hence we also need to add footers to the current buffer. + * We're going to flush the buffer to disk by effectively doing a full + * COPY command. Hence we also need to add footers to the current buffer. */ AppendCopyBinaryFooters(localCopyOutState); } @@ -108,7 +111,8 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, /* - * AddSlotToBuffer serializes the given slot and adds it to the buffer in localCopyOutState. + * AddSlotToBuffer serializes the given slot and adds it to + * the buffer in localCopyOutState. */ static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState @@ -139,16 +143,18 @@ ShouldSendCopyNow(StringInfo buffer) /* * DoLocalCopy finds the shard table from the distributed relation id, and copies the given * buffer into the shard. - * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer as though - * it was reading from stdin. It then parses the tuples and writes them to the shardOid table. + * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer + * as though it was reading from stdin. It then parses the tuples and + * writes them to the shardOid table. */ static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, bool isEndOfCopy) { /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback to read from it. - * We cannot pass additional arguments to ReadFromLocalBufferCallback. + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. */ LocalCopyBuffer = buffer; @@ -167,11 +173,7 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat heap_close(shard, NoLock); free_parsestate(pState); - FreeStringInfo(buffer); - if (!isEndOfCopy) - { - buffer = makeStringInfo(); - } + resetStringInfo(buffer); } @@ -192,21 +194,21 @@ ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) /* * ReadFromLocalBufferCallback is the copy callback. - * It always tries to copy maxread bytes. + * It always tries to copy maxRead bytes. */ static int -ReadFromLocalBufferCallback(void *outbuf, int minread, int maxread) +ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) { - int bytesread = 0; + int bytesRead = 0; int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; - int bytesToRead = Min(avail, maxread); + int bytesToRead = Min(avail, maxRead); if (bytesToRead > 0) { - memcpy_s(outbuf, bytesToRead + strlen((char *) outbuf), + memcpy_s(outBuf, bytesToRead, &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); } - bytesread += bytesToRead; + bytesRead += bytesToRead; LocalCopyBuffer->cursor += bytesToRead; - return bytesread; + return bytesRead; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c0de40978..c9a02fb50 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -553,6 +553,10 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { + if (distributedPlan->workerJob == NULL) + { + return NULL; + } List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6663acc49..4b7c7cc32 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -26,6 +26,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" +#include "distributed/local_executor.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" @@ -135,15 +136,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; HTAB *shardStateHash = NULL; - /* - * INSERT .. SELECT via coordinator consists of two steps, a SELECT is - * followd by a COPY. If the SELECT is executed locally, then the COPY - * would fail since Citus currently doesn't know how to handle COPY - * locally. So, to prevent the command fail, we simply disable local - * execution. - */ - DisableLocalExecution(); - /* select query to execute */ Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); @@ -198,7 +190,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) GetDistributedPlan((CustomScan *) selectPlan->planTree); Job *distSelectJob = distSelectPlan->workerJob; List *distSelectTaskList = distSelectJob->taskList; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; bool binaryFormat = @@ -280,11 +271,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, - taskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); + uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, + taskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); executorState->es_processed = rowsInserted; } @@ -335,17 +325,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) if (prunedTaskList != NIL) { - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, - tupleDescriptor, scanState->tuplestorestate, - hasReturning); + ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7003ea0b3..4930ad7af 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -231,6 +231,48 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } +/* + * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks + * if local execution can be used and executes them. + */ +uint64 +ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel rowModifyLevel, bool + hasReturning) +{ + uint64 processedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + + if (ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + processedRows += ExecuteLocalTaskList(scanState, localTaskList); + } + else + { + /* all tasks should be executed via remote connections */ + remoteTaskList = taskList; + } + + /* execute remote tasks if any */ + if (list_length(remoteTaskList) > 0) + { + processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning); + } + + return processedRows; +} + + /* * ExtractParametersForLocalExecution extracts parameter types and values * from the given ParamListInfo structure, and fills parameter type and diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 8b11e096c..265434b89 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,6 +21,9 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ +extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel + rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 0af6f175a..023f58720 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -37,6 +37,8 @@ SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- router queries execute locally INSERT INTO test VALUES (1, 1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index d42403016..375f4200a 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -339,6 +339,13 @@ DETAIL: Key (key)=(1) already exists. CONTEXT: COPY distributed_table_1570001, line 1 ROLLBACK; TRUNCATE distributed_table; +BEGIN; +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +ROLLBACK; COPY distributed_table FROM STDIN WITH delimiter ','; ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" DETAIL: Failing row contains (1, 9). @@ -461,6 +468,7 @@ ROLLBACK; SET search_path TO local_shard_copy; SET citus.log_local_commands TO ON; TRUNCATE TABLE reference_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE TRUNCATE TABLE local_table; SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; count diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e2ed1dedb..9dbf9975a 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -276,6 +276,8 @@ RETURNING *; -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; +NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; @@ -507,28 +509,7 @@ NOTICE: truncate cascades to table "second_distributed_table" ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- - 500 | 500 | 25 -(1 row) - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; -NOTICE: truncate cascades to table "second_distributed_table" - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- -(0 rows) - -ROLLBACK; +NOTICE: executing the copy locally for shard xxxxx -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -619,6 +600,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar (1 row) INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; @@ -1331,7 +1313,10 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx" NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- show that both local, and mixed local-distributed executions -- calculate rows processed correctly BEGIN; diff --git a/src/test/regress/expected/master_evaluation_modify.out b/src/test/regress/expected/master_evaluation_modify.out index a8123b94c..5571bce2e 100644 --- a/src/test/regress/expected/master_evaluation_modify.out +++ b/src/test/regress/expected/master_evaluation_modify.out @@ -951,6 +951,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0); @@ -1438,6 +1439,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE router_with_param_and_func_on_non_dist_key(0); diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index d61e046f2..469b08b3a 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i (1 row) insert into target_table SELECT a FROM source_table LIMIT 10; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 6ef2ba445..d108c61ec 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count --------------------------------------------------------------------- - 4 + 2 (1 row) SELECT recover_prepared_transactions(); diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index abb59b761..9825cd69b 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -118,6 +118,8 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i; SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; count diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index c0292606c..f0fb28056 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -17,6 +17,7 @@ SELECT create_reference_table('squares'); (1 row) INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; +NOTICE: executing the copy locally for shard xxxxx -- should be executed locally SELECT count(*) FROM squares; NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index f07a346e9..28bdffd40 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -212,6 +212,14 @@ ROLLBACK; TRUNCATE distributed_table; +BEGIN; + +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); + +ROLLBACK; + COPY distributed_table FROM STDIN WITH delimiter ','; 1, 9 \. diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index e3688bd9a..22b46abed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -305,22 +305,6 @@ ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; - - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; -ROLLBACK; - -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index aa05aa02a..00492246b 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -85,6 +85,8 @@ SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; TRUNCATE on_update_fkey_table; From 2eaf7bba6969475e609fcfe75fe9a1b7e60c23c3 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Fri, 13 Mar 2020 12:18:31 +0300 Subject: [PATCH 7/7] not use local copy if we are copying into intermediate results file We have special logic to copy into intermediate results and we use a custom format for that, "result" copy format. Postgres internally does not know this format and if we use this locally it will error saying that it does not know this format. Files are visible to all transactions, which means that we can use any connection to access files. In order to use the existing logic, it makes sense that in case we have intermediate results, which means we will write the results to a file, we preserve the same behavior, which is opening connections to localhost. Therefore if we have intermediate results we return false in ShouldExecuteCopyLocally. --- .../distributed/commands/local_multi_copy.c | 2 +- src/backend/distributed/commands/multi_copy.c | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index efa8aa954..60eaef3ce 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -88,6 +88,7 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, copyDest->copyStatement, isEndOfCopy); + resetStringInfo(localCopyOutState->fe_msgbuf); } } @@ -173,7 +174,6 @@ DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStat heap_close(shard, NoLock); free_parsestate(pState); - resetStringInfo(buffer); } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 34852becb..0c17a32e4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -291,7 +291,7 @@ static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); static bool ContainsLocalPlacement(int64 shardId); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(void); +static bool ShouldExecuteCopyLocally(bool isIntermediateResult); static void LogLocalCopyExecution(uint64 shardId); @@ -1984,8 +1984,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(); - /* set up the DestReceiver function pointers */ copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; copyDest->pub.rStartup = CitusCopyDestReceiverStartup; @@ -2011,13 +2009,23 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu * operation should be done locally for local placements. */ static bool -ShouldExecuteCopyLocally() +ShouldExecuteCopyLocally(bool isIntermediateResult) { if (!EnableLocalExecution) { return false; } + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + if (isIntermediateResult) + { + return false; + } + if (TransactionAccessedLocalPlacement) { /* @@ -2056,6 +2064,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId);