diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c new file mode 100644 index 000000000..60eaef3ce --- /dev/null +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -0,0 +1,214 @@ +/*------------------------------------------------------------------------- + * + * local_multi_copy.c + * Commands for running a copy locally + * + * For each local placement, we have a buffer. When we receive a slot + * from a copy, the slot will be put to the corresponding buffer based + * on the shard id. When the buffer size exceeds the threshold a local + * copy will be done. Also If we reach to the end of copy, we will send + * the current buffer for local copy. + * + * The existing logic from multi_copy.c and format are used, therefore + * even if user did not do a copy with binary format, it is possible that + * we are going to be using binary format internally. + * + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "commands/copy.h" +#include "catalog/namespace.h" +#include "parser/parse_relation.h" +#include "utils/lsyscache.h" +#include "nodes/makefuncs.h" +#include "safe_lib.h" +#include /* for htons */ + +#include "distributed/transmit.h" +#include "distributed/commands/multi_copy.h" +#include "distributed/multi_partitioning_utils.h" +#include "distributed/local_executor.h" +#include "distributed/local_multi_copy.h" +#include "distributed/shard_utils.h" + +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); +static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + CopyOutState localCopyOutState); + +static bool ShouldSendCopyNow(StringInfo buffer); +static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, + CopyStmt *copyStatement, bool isEndOfCopy); +static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); + +/* + * LocalCopyBuffer is used in copy callback to return the copied rows. + * The reason this is a global variable is that we cannot pass an additional + * argument to the copy callback. + */ +static StringInfo LocalCopyBuffer; + +/* + * WriteTupleToLocalShard adds the given slot and does a local copy if + * this is the end of copy, or the buffer size exceeds the threshold. + */ +void +WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 + shardId, + CopyOutState localCopyOutState) +{ + /* + * Since we are doing a local copy, the following statements should + * use local execution to see the changes + */ + TransactionAccessedLocalPlacement = true; + + bool isBinaryCopy = localCopyOutState->binary; + if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) + { + AppendCopyBinaryHeaders(localCopyOutState); + } + + AddSlotToBuffer(slot, copyDest, localCopyOutState); + + if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf)) + { + if (isBinaryCopy) + { + /* + * We're going to flush the buffer to disk by effectively doing a full + * COPY command. Hence we also need to add footers to the current buffer. + */ + AppendCopyBinaryFooters(localCopyOutState); + } + bool isEndOfCopy = false; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, + shardId, + copyDest->copyStatement, isEndOfCopy); + resetStringInfo(localCopyOutState->fe_msgbuf); + } +} + + +/* + * FinishLocalCopyToShard finishes local copy for the given shard with the shard id. + */ +void +FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState) +{ + bool isBinaryCopy = localCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localCopyOutState); + } + bool isEndOfCopy = true; + DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, + copyDest->copyStatement, isEndOfCopy); +} + + +/* + * AddSlotToBuffer serializes the given slot and adds it to + * the buffer in localCopyOutState. + */ +static void +AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState + localCopyOutState) +{ + Datum *columnValues = slot->tts_values; + bool *columnNulls = slot->tts_isnull; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; + + AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, + localCopyOutState, columnOutputFunctions, + columnCoercionPaths); +} + + +/* + * ShouldSendCopyNow returns true if the given buffer size exceeds the + * local copy buffer size threshold. + */ +static bool +ShouldSendCopyNow(StringInfo buffer) +{ + return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD; +} + + +/* + * DoLocalCopy finds the shard table from the distributed relation id, and copies the given + * buffer into the shard. + * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer + * as though it was reading from stdin. It then parses the tuples and + * writes them to the shardOid table. + */ +static void +DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, + bool isEndOfCopy) +{ + /* + * Set the buffer as a global variable to allow ReadFromLocalBufferCallback + * to read from it. We cannot pass additional arguments to + * ReadFromLocalBufferCallback. + */ + LocalCopyBuffer = buffer; + + Oid shardOid = GetShardLocalTableOid(relationId, shardId); + Relation shard = heap_open(shardOid, RowExclusiveLock); + ParseState *pState = make_parsestate(NULL); + + /* p_rtable of pState is set so that we can check constraints. */ + pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); + + CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, + ReadFromLocalBufferCallback, + copyStatement->attlist, copyStatement->options); + CopyFrom(cstate); + EndCopyFrom(cstate); + + heap_close(shard, NoLock); + free_parsestate(pState); +} + + +/* + * ShouldAddBinaryHeaders returns true if the given buffer + * is empty and the format is binary. + */ +static bool +ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) +{ + if (!isBinary) + { + return false; + } + return buffer->len == 0; +} + + +/* + * ReadFromLocalBufferCallback is the copy callback. + * It always tries to copy maxRead bytes. + */ +static int +ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) +{ + int bytesRead = 0; + int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; + int bytesToRead = Min(avail, maxRead); + if (bytesToRead > 0) + { + memcpy_s(outBuf, bytesToRead, + &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); + } + bytesRead += bytesToRead; + LocalCopyBuffer->cursor += bytesToRead; + + return bytesRead; +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0c5d52d0e..0c17a32e4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -85,6 +85,8 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" +#include "distributed/local_multi_copy.h" +#include "distributed/hash_helpers.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "libpq/libpq.h" @@ -162,6 +164,9 @@ struct CopyPlacementState /* State of shard to which the placement belongs to. */ CopyShardState *shardState; + /* node group ID of the placement */ + int32 groupId; + /* * Buffered COPY data. When the placement is activePlacementState of * some connection, this is empty. Because in that case we directly @@ -178,6 +183,12 @@ struct CopyShardState /* Used as hash key. */ uint64 shardId; + /* used for doing local copy */ + CopyOutState copyOutState; + + /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ + bool containsLocalPlacement; + /* List of CopyPlacementStates for all active placements of the shard. */ List *placementStateList; }; @@ -232,13 +243,16 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, - bool *found); + bool *found, bool shouldUseLocalCopy, CopyOutState + copyOutState); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, - uint64 shardId, bool stopOnFailure); + uint64 shardId, bool stopOnFailure, bool + canUseLocalCopy, + CopyOutState copyOutState); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -274,6 +288,11 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest); static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); +static bool ContainsLocalPlacement(int64 shardId); +static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); +static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); +static bool ShouldExecuteCopyLocally(bool isIntermediateResult); +static void LogLocalCopyExecution(uint64 shardId); /* exports for SQL callable functions */ @@ -1985,6 +2004,55 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu } +/* + * ShouldExecuteCopyLocally returns true if the current copy + * operation should be done locally for local placements. + */ +static bool +ShouldExecuteCopyLocally(bool isIntermediateResult) +{ + if (!EnableLocalExecution) + { + return false; + } + + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + if (isIntermediateResult) + { + return false; + } + + if (TransactionAccessedLocalPlacement) + { + /* + * For various reasons, including the transaction visibility + * rules (e.g., read-your-own-writes), we have to use local + * execution again if it has already happened within this + * transaction block. + * + * We might error out later in the execution if it is not suitable + * to execute the tasks locally. + */ + Assert(IsMultiStatementTransaction() || InCoordinatedTransaction()); + + /* + * TODO: A future improvement could be to keep track of which placements + * have been locally executed. At this point, only use local execution for + * those placements. That'd help to benefit more from parallelism. + */ + + return true; + } + + /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ + return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); +} + + /* * CitusCopyDestReceiverStartup implements the rStartup interface of * CitusCopyDestReceiver. It opens the relation, acquires necessary @@ -1996,6 +2064,8 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId); @@ -2013,9 +2083,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; - /* Citus currently doesn't know how to handle COPY command locally */ - ErrorIfTransactionAccessedPlacementsLocally(); - /* look up table properties */ Relation distributedRelation = heap_open(tableId, RowExclusiveLock); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId); @@ -2145,6 +2212,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } + copyStatement->query = NULL; copyStatement->attlist = attributeList; copyStatement->is_from = true; @@ -2228,7 +2296,9 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, - &cachedShardStateFound); + &cachedShardStateFound, + copyDest->shouldUseLocalCopy, + copyDest->copyOutState); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2249,6 +2319,12 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } } + if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) + { + WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState); + } + + foreach(placementStateCell, shardState->placementStateList) { CopyPlacementState *currentPlacementState = lfirst(placementStateCell); @@ -2276,6 +2352,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest { StartPlacementStateCopyCommand(currentPlacementState, copyStatement, copyOutState); + dlist_delete(¤tPlacementState->bufferedPlacementNode); connectionState->activePlacementState = currentPlacementState; @@ -2330,6 +2407,30 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } +/* + * ContainsLocalPlacement returns true if the current node has + * a local placement for the given shard id. + */ +static bool +ContainsLocalPlacement(int64 shardId) +{ + ListCell *placementCell = NULL; + List *activePlacementList = ActiveShardPlacementList(shardId); + int32 localGroupId = GetLocalGroupId(); + + foreach(placementCell, activePlacementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + if (placement->groupId == localGroupId) + { + return true; + } + } + return false; +} + + /* * ShardIdForTuple returns id of the shard to which the given tuple belongs to. */ @@ -2407,6 +2508,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; List *connectionStateList = ConnectionStateList(connectionStateHash); + FinishLocalCopy(copyDest); PG_TRY(); { @@ -2434,6 +2536,28 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) } +/* + * FinishLocalCopy sends the remaining copies for local placements. + */ +static void +FinishLocalCopy(CitusCopyDestReceiver *copyDest) +{ + HTAB *shardStateHash = copyDest->shardStateHash; + HASH_SEQ_STATUS status; + CopyShardState *copyShardState; + + foreach_htab(copyShardState, &status, shardStateHash) + { + if (copyShardState->copyOutState != NULL && + copyShardState->copyOutState->fe_msgbuf->len > 0) + { + FinishLocalCopyToShard(copyDest, copyShardState->shardId, + copyShardState->copyOutState); + } + } +} + + /* * ShutdownCopyConnectionState ends the copy command for the current active * placement on connection, and then sends the rest of the buffers over the @@ -2864,7 +2988,6 @@ CheckCopyPermissions(CopyStmt *copyStatement) /* *INDENT-OFF* */ bool is_from = copyStatement->is_from; Relation rel; - Oid relid; List *range_table = NIL; TupleDesc tupDesc; AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); @@ -2874,15 +2997,8 @@ CheckCopyPermissions(CopyStmt *copyStatement) rel = heap_openrv(copyStatement->relation, is_from ? RowExclusiveLock : AccessShareLock); - relid = RelationGetRelid(rel); - - RangeTblEntry *rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = relid; - rte->relkind = rel->rd_rel->relkind; - rte->requiredPerms = required_access; - range_table = list_make1(rte); - + range_table = CreateRangeTable(rel, required_access); + RangeTblEntry *rte = (RangeTblEntry*) linitial(range_table); tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist); @@ -2909,6 +3025,21 @@ CheckCopyPermissions(CopyStmt *copyStatement) } +/* + * CreateRangeTable creates a range table with the given relation. + */ +List * +CreateRangeTable(Relation rel, AclMode requiredAccess) +{ + RangeTblEntry *rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = rel->rd_id; + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = requiredAccess; + return list_make1(rte); +} + + /* Helper for CheckCopyPermissions(), copied from postgres */ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) @@ -3087,14 +3218,16 @@ ConnectionStateList(HTAB *connectionStateHash) */ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, - HTAB *connectionStateHash, bool stopOnFailure, bool *found) + HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool + shouldUseLocalCopy, CopyOutState copyOutState) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure); + shardId, stopOnFailure, shouldUseLocalCopy, + copyOutState); } return shardState; @@ -3109,7 +3242,8 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure) + bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState + copyOutState) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3121,6 +3255,7 @@ InitializeCopyShardState(CopyShardState *shardState, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); + /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); @@ -3130,11 +3265,22 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->shardId = shardId; shardState->placementStateList = NIL; + shardState->copyOutState = NULL; + shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); + foreach(placementCell, activePlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) + { + shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState)); + CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState); + LogLocalCopyExecution(shardId); + continue; + } + MultiConnection *connection = CopyGetPlacementConnection(placement, stopOnFailure); if (connection == NULL) @@ -3158,6 +3304,7 @@ InitializeCopyShardState(CopyShardState *shardState, CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState)); placementState->shardState = shardState; placementState->data = makeStringInfo(); + placementState->groupId = placement->groupId; placementState->connectionState = connectionState; /* @@ -3188,6 +3335,43 @@ InitializeCopyShardState(CopyShardState *shardState, } +/* + * CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new + * fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because + * in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each + * placement and the serialization functions take a CopyOutState as a parameter. + */ +static void +CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to) +{ + to->attnumlist = from->attnumlist; + to->binary = from->binary; + to->copy_dest = from->copy_dest; + to->delim = from->delim; + to->file_encoding = from->file_encoding; + to->need_transcoding = from->need_transcoding; + to->null_print = from->null_print; + to->null_print_client = from->null_print_client; + to->rowcontext = from->rowcontext; + to->fe_msgbuf = makeStringInfo(); +} + + +/* + * LogLocalCopyExecution logs that the copy will be done locally for + * the given shard. + */ +static void +LogLocalCopyExecution(uint64 shardId) +{ + if (!(LogRemoteCommands || LogLocalCommands)) + { + return; + } + ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId))); +} + + /* * CopyGetPlacementConnection assigns a connection to the given placement. If * a connection has already been assigned the placement in the current transaction diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c0de40978..c9a02fb50 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -553,6 +553,10 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { + if (distributedPlan->workerJob == NULL) + { + return NULL; + } List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 6663acc49..4b7c7cc32 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -26,6 +26,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" +#include "distributed/local_executor.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" @@ -135,15 +136,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; HTAB *shardStateHash = NULL; - /* - * INSERT .. SELECT via coordinator consists of two steps, a SELECT is - * followd by a COPY. If the SELECT is executed locally, then the COPY - * would fail since Citus currently doesn't know how to handle COPY - * locally. So, to prevent the command fail, we simply disable local - * execution. - */ - DisableLocalExecution(); - /* select query to execute */ Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); @@ -198,7 +190,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) GetDistributedPlan((CustomScan *) selectPlan->planTree); Job *distSelectJob = distSelectPlan->workerJob; List *distSelectTaskList = distSelectJob->taskList; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; bool binaryFormat = @@ -280,11 +271,10 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, - taskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); + uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, + taskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); executorState->es_processed = rowsInserted; } @@ -335,17 +325,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) if (prunedTaskList != NIL) { - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - - ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, - tupleDescriptor, scanState->tuplestorestate, - hasReturning); + ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, + ROW_MODIFY_COMMUTATIVE, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7003ea0b3..4930ad7af 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -231,6 +231,48 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } +/* + * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks + * if local execution can be used and executes them. + */ +uint64 +ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel rowModifyLevel, bool + hasReturning) +{ + uint64 processedRows = 0; + List *localTaskList = NIL; + List *remoteTaskList = NIL; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); + + if (ShouldExecuteTasksLocally(taskList)) + { + bool readOnlyPlan = false; + + /* set local (if any) & remote tasks */ + ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, + &remoteTaskList); + processedRows += ExecuteLocalTaskList(scanState, localTaskList); + } + else + { + /* all tasks should be executed via remote connections */ + remoteTaskList = taskList; + } + + /* execute remote tasks if any */ + if (list_length(remoteTaskList) > 0) + { + processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning); + } + + return processedRows; +} + + /* * ExtractParametersForLocalExecution extracts parameter types and values * from the given ParamListInfo structure, and fills parameter type and diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 5187e5094..3d76f25c0 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -335,7 +335,8 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return true; } - Oid shardOid = GetShardOid(relationShard->relationId, relationShard->shardId); + Oid shardOid = GetShardLocalTableOid(relationShard->relationId, + relationShard->shardId); newRte->relid = shardOid; diff --git a/src/backend/distributed/utils/shard_utils.c b/src/backend/distributed/utils/shard_utils.c index ad3acac67..aa0a17921 100644 --- a/src/backend/distributed/utils/shard_utils.c +++ b/src/backend/distributed/utils/shard_utils.c @@ -16,11 +16,11 @@ #include "distributed/shard_utils.h" /* - * GetShardOid returns the oid of the shard from the given distributed relation + * GetShardLocalTableOid returns the oid of the shard from the given distributed relation * with the shardid. */ Oid -GetShardOid(Oid distRelId, uint64 shardId) +GetShardLocalTableOid(Oid distRelId, uint64 shardId) { char *relationName = get_rel_name(distRelId); AppendShardIdToName(&relationName, shardId); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index d91358839..9ec8f280b 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -130,6 +130,9 @@ typedef struct CitusCopyDestReceiver /* useful for tracking multi shard accesses */ bool multiShardCopy; + /* if true, should copy to local placements in the current session */ + bool shouldUseLocalCopy; + /* copy into intermediate result */ char *intermediateResultIdPrefix; } CitusCopyDestReceiver; @@ -154,6 +157,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList); +extern List * CreateRangeTable(Relation rel, AclMode requiredAccess); extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 8b11e096c..265434b89 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,6 +21,9 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ +extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, + List *taskList, RowModifyLevel + rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h new file mode 100644 index 000000000..a4e46f015 --- /dev/null +++ b/src/include/distributed/local_multi_copy.h @@ -0,0 +1,19 @@ + +#ifndef LOCAL_MULTI_COPY +#define LOCAL_MULTI_COPY + +/* + * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed. + * There will be one buffer for each local placement, when the buffer size + * exceeds this threshold, it will be flushed. + */ +#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024) + +extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + int64 + shardId, + CopyOutState localCopyOutState); +extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, + CopyOutState localCopyOutState); + +#endif /* LOCAL_MULTI_COPY */ diff --git a/src/include/distributed/shard_utils.h b/src/include/distributed/shard_utils.h index e4fd9a2f2..28addce15 100644 --- a/src/include/distributed/shard_utils.h +++ b/src/include/distributed/shard_utils.h @@ -13,6 +13,6 @@ #include "postgres.h" -extern Oid GetShardOid(Oid distRelId, uint64 shardId); +extern Oid GetShardLocalTableOid(Oid distRelId, uint64 shardId); #endif /* SHARD_UTILS_H */ diff --git a/src/test/regress/data/orders.2.data b/src/test/regress/data/orders.2.data index 264c368df..43dbde8de 100644 --- a/src/test/regress/data/orders.2.data +++ b/src/test/regress/data/orders.2.data @@ -1484,4 +1484,4 @@ 14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against 14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after 14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro -14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro +14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro \ No newline at end of file diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 0af6f175a..023f58720 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -37,6 +37,8 @@ SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- router queries execute locally INSERT INTO test VALUES (1, 1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out new file mode 100644 index 000000000..375f4200a --- /dev/null +++ b/src/test/regress/expected/local_shard_copy.out @@ -0,0 +1,494 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; +SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +DEBUG: schema "public" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: extension "plpgsql" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping +DETAIL: NOTICE from localhost:xxxxx +NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx +NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx + master_add_node +--------------------------------------------------------------------- + 32 +(1 row) + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; +CREATE TABLE reference_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" +DEBUG: building index "reference_table_pkey" on table "reference_table" serially +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" +DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially +SELECT create_distributed_table('distributed_table','key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); +DEBUG: distributed INSERT ... SELECT can only select from distributed tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CREATE TABLE local_table (key int PRIMARY KEY); +DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" +DEBUG: building index "local_table_pkey" on table "local_table" serially +INSERT INTO local_table SELECT * from generate_series(1, 10); +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); +DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key" +SELECT create_distributed_table('collections_list', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); +-- connection worker and get ready for the tests +\c - - - :worker_1_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; +-- pick some example values that reside on the shards locally and remote +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(6); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(500); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +SELECT shard_of_distribution_column_is_local(701); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + t +(1 row) + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +SELECT shard_of_distribution_column_is_local(12); + shard_of_distribution_column_is_local +--------------------------------------------------------------------- + f +(1 row) + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify the put ages. + SELECT * FROM distributed_table; +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + key | age +--------------------------------------------------------------------- + 20 | 20 + 24 | 24 + 25 | 25 + 26 | 26 + 31 | 31 + 33 | 33 + 35 | 35 + 1 | 100 + 5 | 500 + 21 | 21 + 28 | 28 + 34 | 34 + 38 | 38 + 39 | 39 + 36 | 36 + 37 | 37 + 40 | 40 + 3 | 300 + 4 | 400 + 22 | 22 + 23 | 23 + 27 | 27 + 29 | 29 + 30 | 30 + 32 | 32 + 2 | 200 +(26 rows) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- +(0 rows) + + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY collections_list, line 1: "1, 0" + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true + count +--------------------------------------------------------------------- + 5 +(1 row) + +ROLLBACK; +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + age +--------------------------------------------------------------------- +(0 rows) + + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 26 +(1 row) + +ROLLBACK; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" +ROLLBACK; +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1, 100" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1570001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +BEGIN; +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +ROLLBACK; +COPY distributed_table FROM STDIN WITH delimiter ','; +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1,9" +ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" +DETAIL: Failing row contains (1, 9). +CONTEXT: COPY distributed_table_1570001, line 1 +ROLLBACK; +TRUNCATE distributed_table; +-- different delimiters +BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- initial size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '|'; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1|10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- initial size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + +COPY distributed_table FROM STDIN WITH delimiter '['; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1[10" +-- new size +SELECT count(*) FROM distributed_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true + count +--------------------------------------------------------------------- + 3 +(1 row) + +ROLLBACK; +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "10,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "100,15" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 2: "200,20" +ROLLBACK; +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1,15" +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1,16" +ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" +DETAIL: Key (key)=(1) already exists. +CONTEXT: COPY distributed_table_1570001, line 1 +ROLLBACK; +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY distributed_table, line 1: "1,15" +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + key +--------------------------------------------------------------------- + 1 +(1 row) + +ROLLBACK; +\c - - - :master_port +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; +TRUNCATE TABLE reference_table; +NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE +TRUNCATE TABLE local_table; +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.enable_local_execution = 'on'; +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY reference_table, line 1: "1" +ROLLBACK; +SET citus.enable_local_execution = 'off'; +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +ROLLBACK; +SET citus.enable_local_execution = 'on'; +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index e926dd31e..9dbf9975a 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -276,6 +276,8 @@ RETURNING *; -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; +NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; @@ -507,28 +509,7 @@ NOTICE: truncate cascades to table "second_distributed_table" ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- - 500 | 500 | 25 -(1 row) - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; -NOTICE: truncate cascades to table "second_distributed_table" - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - key | value | age ---------------------------------------------------------------------- -(0 rows) - -ROLLBACK; +NOTICE: executing the copy locally for shard xxxxx -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -618,25 +599,8 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" -ROLLBACK; --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; @@ -1349,7 +1313,10 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx" NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx -- show that both local, and mixed local-distributed executions -- calculate rows processed correctly BEGIN; diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index d245c324b..c235ed8ef 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -35,6 +35,7 @@ SELECT create_reference_table('ref_table'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage @@ -270,6 +271,7 @@ key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); +NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 @@ -328,7 +330,9 @@ DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 +NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 @@ -767,6 +771,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/master_evaluation_modify.out b/src/test/regress/expected/master_evaluation_modify.out index a8123b94c..5571bce2e 100644 --- a/src/test/regress/expected/master_evaluation_modify.out +++ b/src/test/regress/expected/master_evaluation_modify.out @@ -951,6 +951,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0); @@ -1438,6 +1439,7 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; +NOTICE: executing the copy locally for shard xxxxx PREPARE router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE router_with_param_and_func_on_non_dist_key(0); diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index d61e046f2..469b08b3a 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i (1 row) insert into target_table SELECT a FROM source_table LIMIT 10; -ERROR: cannot execute command because a local execution has accessed a placement in the transaction -DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally -HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint +NOTICE: executing the copy locally for shard xxxxx ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index 6ef2ba445..d108c61ec 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count --------------------------------------------------------------------- - 4 + 2 (1 row) SELECT recover_prepared_transactions(); diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index abb59b761..9825cd69b 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -118,6 +118,8 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i; SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; count diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 714e8353e..e23342e00 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -499,11 +499,13 @@ SELECT create_reference_table('replicate_reference_table_copy'); (1 row) +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; +RESET citus.enable_local_execution; DROP TABLE replicate_reference_table_copy; -- test executing DDL command then adding a new node in a transaction CREATE TABLE replicate_reference_table_ddl(column1 int); diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index c0292606c..f0fb28056 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -17,6 +17,7 @@ SELECT create_reference_table('squares'); (1 row) INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; +NOTICE: executing the copy locally for shard xxxxx -- should be executed locally SELECT count(*) FROM squares; NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index d3083367f..6c9a6c01b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -39,7 +39,7 @@ test: multi_mx_metadata test: master_evaluation master_evaluation_modify master_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution +test: multi_mx_modifications local_shard_execution local_shard_copy test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql new file mode 100644 index 000000000..28bdffd40 --- /dev/null +++ b/src/test/regress/sql/local_shard_copy.sql @@ -0,0 +1,346 @@ +CREATE SCHEMA local_shard_copy; +SET search_path TO local_shard_copy; + +SET client_min_messages TO DEBUG; +SET citus.next_shard_id TO 1570000; + +SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); + +SET citus.shard_count TO 4; +SET citus.shard_replication_factor TO 1; +SET citus.replication_model TO 'streaming'; + + +CREATE TABLE reference_table (key int PRIMARY KEY); +SELECT create_reference_table('reference_table'); + +CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); +SELECT create_distributed_table('distributed_table','key'); + +INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); +INSERT INTO reference_table SELECT * FROM generate_series(1, 10); + +CREATE TABLE local_table (key int PRIMARY KEY); +INSERT INTO local_table SELECT * from generate_series(1, 10); + +-- partitioned table +CREATE TABLE collections_list ( + key bigserial, + collection_id integer +) PARTITION BY LIST (collection_id ); + +SELECT create_distributed_table('collections_list', 'key'); + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 0 ); + +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id) + FOR VALUES IN ( 1 ); + + +-- connection worker and get ready for the tests +\c - - - :worker_1_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +-- returns true of the distribution key filter +-- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard +-- placement which is local to this not +CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ + + DECLARE shard_is_local BOOLEAN := FALSE; + + BEGIN + + WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), + all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) + SELECT + true INTO shard_is_local + FROM + local_shard_ids + WHERE + get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); + + IF shard_is_local IS NULL THEN + shard_is_local = FALSE; + END IF; + + RETURN shard_is_local; + END; +$$ LANGUAGE plpgsql; + +-- pick some example values that reside on the shards locally and remote + +-- distribution key values of 1,6, 500 and 701 are LOCAL to shards, +-- we'll use these values in the tests +SELECT shard_of_distribution_column_is_local(1); +SELECT shard_of_distribution_column_is_local(6); +SELECT shard_of_distribution_column_is_local(500); +SELECT shard_of_distribution_column_is_local(701); + +-- distribution key values of 11 and 12 are REMOTE to shards +SELECT shard_of_distribution_column_is_local(11); +SELECT shard_of_distribution_column_is_local(12); + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify the put ages. + SELECT * FROM distributed_table; + +ROLLBACK; + + +BEGIN; + -- run select with local execution + SELECT count(*) FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM collections_list; + -- the local placements should be executed locally + COPY collections_list FROM STDIN WITH delimiter ','; +1, 0 +2, 0 +3, 0 +4, 1 +5, 1 +\. + -- verify that the copy is successful. + SELECT count(*) FROM collections_list; + +ROLLBACK; + +BEGIN; + -- run select with local execution + SELECT age FROM distributed_table WHERE key = 1; + + SELECT count(*) FROM distributed_table; + -- the local placements should be executed locally + COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + + + -- verify that the copy is successful. + SELECT count(*) FROM distributed_table; + +ROLLBACK; + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +-- Since we are not in a transaction, the copy should not be locally executed. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. + +BEGIN; +-- Since we are in a transaction, the copy should be locally executed. But +-- we are putting duplicate key, so it should error. +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 100 +2, 200 +3, 300 +4, 400 +5, 500 +\. +ROLLBACK; + +TRUNCATE distributed_table; + +BEGIN; + +-- insert a lot of data ( around 8MB), +-- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) +INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); + +ROLLBACK; + +COPY distributed_table FROM STDIN WITH delimiter ','; +1, 9 +\. + +BEGIN; +-- Since we are in a transaction, the execution will be local, however we are putting invalid age. +-- The constaints should give an error +COPY distributed_table FROM STDIN WITH delimiter ','; +1,9 +\. +ROLLBACK; + +TRUNCATE distributed_table; + + +-- different delimiters +BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '|'; +1|10 +2|30 +3|40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + +BEGIN; +-- run select with local execution +SELECT count(*) FROM distributed_table WHERE key = 1; +-- initial size +SELECT count(*) FROM distributed_table; +COPY distributed_table FROM STDIN WITH delimiter '['; +1[10 +2[30 +3[40 +\. +-- new size +SELECT count(*) FROM distributed_table; +ROLLBACK; + + +-- multiple local copies +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +2,20 +3,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +10,15 +20,20 +30,30 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +100,15 +200,20 +300,30 +\. +ROLLBACK; + +-- local copy followed by local copy should see the changes +-- and error since it is a duplicate primary key. +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +COPY distributed_table FROM STDIN WITH delimiter ','; +1,16 +\. +ROLLBACK; + + +-- local copy followed by local copy should see the changes +BEGIN; +COPY distributed_table FROM STDIN WITH delimiter ','; +1,15 +\. +-- select should see the change +SELECT key FROM distributed_table WHERE key = 1; +ROLLBACK; + +\c - - - :master_port + +SET search_path TO local_shard_copy; +SET citus.log_local_commands TO ON; + +TRUNCATE TABLE reference_table; +TRUNCATE TABLE local_table; + +SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; + +SET citus.enable_local_execution = 'on'; + +BEGIN; +-- copy should be executed locally +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'off'; + +BEGIN; +-- copy should not be executed locally as citus.enable_local_execution = off +COPY reference_table FROM STDIN; +1 +2 +3 +4 +\. +ROLLBACK; + +SET citus.enable_local_execution = 'on'; + +SET client_min_messages TO ERROR; +SET search_path TO public; +DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b5ba49caa..22b46abed 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -305,22 +305,6 @@ ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; --- a very similar set of operation, but this time use --- COPY as the first command -BEGIN; - INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; - - -- this could go through local execution, but doesn't because we've already - -- done distributed execution - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; - - -- utility commands could still use distributed execution - TRUNCATE distributed_table CASCADE; - - -- ensure that TRUNCATE made it - SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; -ROLLBACK; - -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -356,15 +340,7 @@ ROLLBACK; BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; - -- even no need to supply any data - COPY distributed_table FROM STDIN WITH CSV; -ROLLBACK; - --- a local query is followed by a command that cannot be executed locally -BEGIN; - SELECT count(*) FROM distributed_table WHERE key = 1; - - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index aa05aa02a..00492246b 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -85,6 +85,8 @@ SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; + -- we can enable local execution when truncate can be executed locally. + SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; TRUNCATE on_update_fkey_table; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index ec476e706..8edfa40b7 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -323,6 +323,7 @@ DROP TABLE replicate_reference_table_insert; CREATE TABLE replicate_reference_table_copy(column1 int); SELECT create_reference_table('replicate_reference_table_copy'); +SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; 1 @@ -334,6 +335,8 @@ COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ROLLBACK; +RESET citus.enable_local_execution; + DROP TABLE replicate_reference_table_copy;