From 59616eb41488407799ec6397ce739425ebab917d Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 19 Mar 2020 15:28:26 +0100 Subject: [PATCH] Revert "Merge pull request #3557 from citusdata/enh/localExecutionCopy" This reverts commit e5a2bbb2bd86d32d24af301fd33941bddb4187de, reversing changes made to 99c5b0add788e0454b0b59c4339d2747e96e4447. --- .../distributed/commands/local_multi_copy.c | 214 -------- src/backend/distributed/commands/multi_copy.c | 222 +------- .../distributed/executor/citus_custom_scan.c | 4 - .../executor/insert_select_executor.c | 28 +- .../distributed/executor/local_executor.c | 42 -- .../distributed/planner/deparse_shard_query.c | 3 +- src/backend/distributed/utils/shard_utils.c | 4 +- src/include/distributed/commands/multi_copy.h | 4 - src/include/distributed/local_executor.h | 3 - src/include/distributed/local_multi_copy.h | 19 - src/include/distributed/shard_utils.h | 2 +- src/test/regress/data/orders.2.data | 2 +- .../expected/coordinator_shouldhaveshards.out | 2 - .../regress/expected/local_shard_copy.out | 494 ------------------ .../expected/local_shard_execution.out | 49 +- .../locally_execute_intermediate_results.out | 5 - .../expected/master_evaluation_modify.out | 2 - .../multi_mx_insert_select_repartition.out | 6 +- .../multi_mx_transaction_recovery.out | 2 +- .../multi_mx_truncate_from_worker.out | 2 - .../multi_replicate_reference_table.out | 2 - ...licate_reference_tables_to_coordinator.out | 1 - src/test/regress/multi_mx_schedule | 2 +- src/test/regress/sql/local_shard_copy.sql | 346 ------------ .../regress/sql/local_shard_execution.sql | 26 +- .../sql/multi_mx_truncate_from_worker.sql | 2 - .../sql/multi_replicate_reference_table.sql | 3 - 27 files changed, 115 insertions(+), 1376 deletions(-) delete mode 100644 src/backend/distributed/commands/local_multi_copy.c delete mode 100644 src/include/distributed/local_multi_copy.h delete mode 100644 src/test/regress/expected/local_shard_copy.out delete mode 100644 src/test/regress/sql/local_shard_copy.sql diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c deleted file mode 100644 index 60eaef3ce..000000000 --- a/src/backend/distributed/commands/local_multi_copy.c +++ /dev/null @@ -1,214 +0,0 @@ -/*------------------------------------------------------------------------- - * - * local_multi_copy.c - * Commands for running a copy locally - * - * For each local placement, we have a buffer. When we receive a slot - * from a copy, the slot will be put to the corresponding buffer based - * on the shard id. When the buffer size exceeds the threshold a local - * copy will be done. Also If we reach to the end of copy, we will send - * the current buffer for local copy. - * - * The existing logic from multi_copy.c and format are used, therefore - * even if user did not do a copy with binary format, it is possible that - * we are going to be using binary format internally. - * - * - * Copyright (c) Citus Data, Inc. - * - *------------------------------------------------------------------------- - */ - -#include "postgres.h" -#include "commands/copy.h" -#include "catalog/namespace.h" -#include "parser/parse_relation.h" -#include "utils/lsyscache.h" -#include "nodes/makefuncs.h" -#include "safe_lib.h" -#include /* for htons */ - -#include "distributed/transmit.h" -#include "distributed/commands/multi_copy.h" -#include "distributed/multi_partitioning_utils.h" -#include "distributed/local_executor.h" -#include "distributed/local_multi_copy.h" -#include "distributed/shard_utils.h" - -static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); -static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, - CopyOutState localCopyOutState); - -static bool ShouldSendCopyNow(StringInfo buffer); -static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, - CopyStmt *copyStatement, bool isEndOfCopy); -static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); - -/* - * LocalCopyBuffer is used in copy callback to return the copied rows. - * The reason this is a global variable is that we cannot pass an additional - * argument to the copy callback. - */ -static StringInfo LocalCopyBuffer; - -/* - * WriteTupleToLocalShard adds the given slot and does a local copy if - * this is the end of copy, or the buffer size exceeds the threshold. - */ -void -WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 - shardId, - CopyOutState localCopyOutState) -{ - /* - * Since we are doing a local copy, the following statements should - * use local execution to see the changes - */ - TransactionAccessedLocalPlacement = true; - - bool isBinaryCopy = localCopyOutState->binary; - if (ShouldAddBinaryHeaders(localCopyOutState->fe_msgbuf, isBinaryCopy)) - { - AppendCopyBinaryHeaders(localCopyOutState); - } - - AddSlotToBuffer(slot, copyDest, localCopyOutState); - - if (ShouldSendCopyNow(localCopyOutState->fe_msgbuf)) - { - if (isBinaryCopy) - { - /* - * We're going to flush the buffer to disk by effectively doing a full - * COPY command. Hence we also need to add footers to the current buffer. - */ - AppendCopyBinaryFooters(localCopyOutState); - } - bool isEndOfCopy = false; - DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, - shardId, - copyDest->copyStatement, isEndOfCopy); - resetStringInfo(localCopyOutState->fe_msgbuf); - } -} - - -/* - * FinishLocalCopyToShard finishes local copy for the given shard with the shard id. - */ -void -FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, - CopyOutState localCopyOutState) -{ - bool isBinaryCopy = localCopyOutState->binary; - if (isBinaryCopy) - { - AppendCopyBinaryFooters(localCopyOutState); - } - bool isEndOfCopy = true; - DoLocalCopy(localCopyOutState->fe_msgbuf, copyDest->distributedRelationId, shardId, - copyDest->copyStatement, isEndOfCopy); -} - - -/* - * AddSlotToBuffer serializes the given slot and adds it to - * the buffer in localCopyOutState. - */ -static void -AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState - localCopyOutState) -{ - Datum *columnValues = slot->tts_values; - bool *columnNulls = slot->tts_isnull; - FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; - CopyCoercionData *columnCoercionPaths = copyDest->columnCoercionPaths; - - AppendCopyRowData(columnValues, columnNulls, copyDest->tupleDescriptor, - localCopyOutState, columnOutputFunctions, - columnCoercionPaths); -} - - -/* - * ShouldSendCopyNow returns true if the given buffer size exceeds the - * local copy buffer size threshold. - */ -static bool -ShouldSendCopyNow(StringInfo buffer) -{ - return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD; -} - - -/* - * DoLocalCopy finds the shard table from the distributed relation id, and copies the given - * buffer into the shard. - * CopyFrom calls ReadFromLocalBufferCallback to read bytes from the buffer - * as though it was reading from stdin. It then parses the tuples and - * writes them to the shardOid table. - */ -static void -DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, - bool isEndOfCopy) -{ - /* - * Set the buffer as a global variable to allow ReadFromLocalBufferCallback - * to read from it. We cannot pass additional arguments to - * ReadFromLocalBufferCallback. - */ - LocalCopyBuffer = buffer; - - Oid shardOid = GetShardLocalTableOid(relationId, shardId); - Relation shard = heap_open(shardOid, RowExclusiveLock); - ParseState *pState = make_parsestate(NULL); - - /* p_rtable of pState is set so that we can check constraints. */ - pState->p_rtable = CreateRangeTable(shard, ACL_INSERT); - - CopyState cstate = BeginCopyFrom(pState, shard, NULL, false, - ReadFromLocalBufferCallback, - copyStatement->attlist, copyStatement->options); - CopyFrom(cstate); - EndCopyFrom(cstate); - - heap_close(shard, NoLock); - free_parsestate(pState); -} - - -/* - * ShouldAddBinaryHeaders returns true if the given buffer - * is empty and the format is binary. - */ -static bool -ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary) -{ - if (!isBinary) - { - return false; - } - return buffer->len == 0; -} - - -/* - * ReadFromLocalBufferCallback is the copy callback. - * It always tries to copy maxRead bytes. - */ -static int -ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead) -{ - int bytesRead = 0; - int avail = LocalCopyBuffer->len - LocalCopyBuffer->cursor; - int bytesToRead = Min(avail, maxRead); - if (bytesToRead > 0) - { - memcpy_s(outBuf, bytesToRead, - &LocalCopyBuffer->data[LocalCopyBuffer->cursor], bytesToRead); - } - bytesRead += bytesToRead; - LocalCopyBuffer->cursor += bytesToRead; - - return bytesRead; -} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 0c17a32e4..0c5d52d0e 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -85,8 +85,6 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" -#include "distributed/local_multi_copy.h" -#include "distributed/hash_helpers.h" #include "executor/executor.h" #include "foreign/foreign.h" #include "libpq/libpq.h" @@ -164,9 +162,6 @@ struct CopyPlacementState /* State of shard to which the placement belongs to. */ CopyShardState *shardState; - /* node group ID of the placement */ - int32 groupId; - /* * Buffered COPY data. When the placement is activePlacementState of * some connection, this is empty. Because in that case we directly @@ -183,12 +178,6 @@ struct CopyShardState /* Used as hash key. */ uint64 shardId; - /* used for doing local copy */ - CopyOutState copyOutState; - - /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ - bool containsLocalPlacement; - /* List of CopyPlacementStates for all active placements of the shard. */ List *placementStateList; }; @@ -243,16 +232,13 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, MultiConnection *connection); static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, - bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState); + bool *found); static MultiConnection * CopyGetPlacementConnection(ShardPlacement *placement, bool stopOnFailure); static List * ConnectionStateList(HTAB *connectionStateHash); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, - uint64 shardId, bool stopOnFailure, bool - canUseLocalCopy, - CopyOutState copyOutState); + uint64 shardId, bool stopOnFailure); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -288,11 +274,6 @@ static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *copyDest); static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); -static bool ContainsLocalPlacement(int64 shardId); -static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); -static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(bool isIntermediateResult); -static void LogLocalCopyExecution(uint64 shardId); /* exports for SQL callable functions */ @@ -2004,55 +1985,6 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu } -/* - * ShouldExecuteCopyLocally returns true if the current copy - * operation should be done locally for local placements. - */ -static bool -ShouldExecuteCopyLocally(bool isIntermediateResult) -{ - if (!EnableLocalExecution) - { - return false; - } - - /* - * Intermediate files are written to a file, and files are visible to all - * transactions, and we use a custom copy format for copy therefore we will - * use the existing logic for that. - */ - if (isIntermediateResult) - { - return false; - } - - if (TransactionAccessedLocalPlacement) - { - /* - * For various reasons, including the transaction visibility - * rules (e.g., read-your-own-writes), we have to use local - * execution again if it has already happened within this - * transaction block. - * - * We might error out later in the execution if it is not suitable - * to execute the tasks locally. - */ - Assert(IsMultiStatementTransaction() || InCoordinatedTransaction()); - - /* - * TODO: A future improvement could be to keep track of which placements - * have been locally executed. At this point, only use local execution for - * those placements. That'd help to benefit more from parallelism. - */ - - return true; - } - - /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return !TransactionConnectedToLocalGroup && IsMultiStatementTransaction(); -} - - /* * CitusCopyDestReceiverStartup implements the rStartup interface of * CitusCopyDestReceiver. It opens the relation, acquires necessary @@ -2064,8 +1996,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId); @@ -2083,6 +2013,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; + /* Citus currently doesn't know how to handle COPY command locally */ + ErrorIfTransactionAccessedPlacementsLocally(); + /* look up table properties */ Relation distributedRelation = heap_open(tableId, RowExclusiveLock); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId); @@ -2212,7 +2145,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } - copyStatement->query = NULL; copyStatement->attlist = attributeList; copyStatement->is_from = true; @@ -2296,9 +2228,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, - &cachedShardStateFound, - copyDest->shouldUseLocalCopy, - copyDest->copyOutState); + &cachedShardStateFound); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2319,12 +2249,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } } - if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) - { - WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState); - } - - foreach(placementStateCell, shardState->placementStateList) { CopyPlacementState *currentPlacementState = lfirst(placementStateCell); @@ -2352,7 +2276,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest { StartPlacementStateCopyCommand(currentPlacementState, copyStatement, copyOutState); - dlist_delete(¤tPlacementState->bufferedPlacementNode); connectionState->activePlacementState = currentPlacementState; @@ -2407,30 +2330,6 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } -/* - * ContainsLocalPlacement returns true if the current node has - * a local placement for the given shard id. - */ -static bool -ContainsLocalPlacement(int64 shardId) -{ - ListCell *placementCell = NULL; - List *activePlacementList = ActiveShardPlacementList(shardId); - int32 localGroupId = GetLocalGroupId(); - - foreach(placementCell, activePlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - - if (placement->groupId == localGroupId) - { - return true; - } - } - return false; -} - - /* * ShardIdForTuple returns id of the shard to which the given tuple belongs to. */ @@ -2508,7 +2407,6 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; List *connectionStateList = ConnectionStateList(connectionStateHash); - FinishLocalCopy(copyDest); PG_TRY(); { @@ -2536,28 +2434,6 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) } -/* - * FinishLocalCopy sends the remaining copies for local placements. - */ -static void -FinishLocalCopy(CitusCopyDestReceiver *copyDest) -{ - HTAB *shardStateHash = copyDest->shardStateHash; - HASH_SEQ_STATUS status; - CopyShardState *copyShardState; - - foreach_htab(copyShardState, &status, shardStateHash) - { - if (copyShardState->copyOutState != NULL && - copyShardState->copyOutState->fe_msgbuf->len > 0) - { - FinishLocalCopyToShard(copyDest, copyShardState->shardId, - copyShardState->copyOutState); - } - } -} - - /* * ShutdownCopyConnectionState ends the copy command for the current active * placement on connection, and then sends the rest of the buffers over the @@ -2988,6 +2864,7 @@ CheckCopyPermissions(CopyStmt *copyStatement) /* *INDENT-OFF* */ bool is_from = copyStatement->is_from; Relation rel; + Oid relid; List *range_table = NIL; TupleDesc tupDesc; AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT); @@ -2997,8 +2874,15 @@ CheckCopyPermissions(CopyStmt *copyStatement) rel = heap_openrv(copyStatement->relation, is_from ? RowExclusiveLock : AccessShareLock); - range_table = CreateRangeTable(rel, required_access); - RangeTblEntry *rte = (RangeTblEntry*) linitial(range_table); + relid = RelationGetRelid(rel); + + RangeTblEntry *rte = makeNode(RangeTblEntry); + rte->rtekind = RTE_RELATION; + rte->relid = relid; + rte->relkind = rel->rd_rel->relkind; + rte->requiredPerms = required_access; + range_table = list_make1(rte); + tupDesc = RelationGetDescr(rel); attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist); @@ -3025,21 +2909,6 @@ CheckCopyPermissions(CopyStmt *copyStatement) } -/* - * CreateRangeTable creates a range table with the given relation. - */ -List * -CreateRangeTable(Relation rel, AclMode requiredAccess) -{ - RangeTblEntry *rte = makeNode(RangeTblEntry); - rte->rtekind = RTE_RELATION; - rte->relid = rel->rd_id; - rte->relkind = rel->rd_rel->relkind; - rte->requiredPerms = requiredAccess; - return list_make1(rte); -} - - /* Helper for CheckCopyPermissions(), copied from postgres */ static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist) @@ -3218,16 +3087,14 @@ ConnectionStateList(HTAB *connectionStateHash) */ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, - HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, CopyOutState copyOutState) + HTAB *connectionStateHash, bool stopOnFailure, bool *found) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); if (!*found) { InitializeCopyShardState(shardState, connectionStateHash, - shardId, stopOnFailure, shouldUseLocalCopy, - copyOutState); + shardId, stopOnFailure); } return shardState; @@ -3242,8 +3109,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState - copyOutState) + bool stopOnFailure) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3255,7 +3121,6 @@ InitializeCopyShardState(CopyShardState *shardState, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); @@ -3265,22 +3130,11 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->shardId = shardId; shardState->placementStateList = NIL; - shardState->copyOutState = NULL; - shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); - foreach(placementCell, activePlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); - if (shouldUseLocalCopy && placement->groupId == GetLocalGroupId()) - { - shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState)); - CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState); - LogLocalCopyExecution(shardId); - continue; - } - MultiConnection *connection = CopyGetPlacementConnection(placement, stopOnFailure); if (connection == NULL) @@ -3304,7 +3158,6 @@ InitializeCopyShardState(CopyShardState *shardState, CopyPlacementState *placementState = palloc0(sizeof(CopyPlacementState)); placementState->shardState = shardState; placementState->data = makeStringInfo(); - placementState->groupId = placement->groupId; placementState->connectionState = connectionState; /* @@ -3335,43 +3188,6 @@ InitializeCopyShardState(CopyShardState *shardState, } -/* - * CloneCopyOutStateForLocalCopy creates a shallow copy of the CopyOutState with a new - * fe_msgbuf. We keep a separate CopyOutState for every local shard placement, because - * in case of local copy we serialize and buffer incoming tuples into fe_msgbuf for each - * placement and the serialization functions take a CopyOutState as a parameter. - */ -static void -CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to) -{ - to->attnumlist = from->attnumlist; - to->binary = from->binary; - to->copy_dest = from->copy_dest; - to->delim = from->delim; - to->file_encoding = from->file_encoding; - to->need_transcoding = from->need_transcoding; - to->null_print = from->null_print; - to->null_print_client = from->null_print_client; - to->rowcontext = from->rowcontext; - to->fe_msgbuf = makeStringInfo(); -} - - -/* - * LogLocalCopyExecution logs that the copy will be done locally for - * the given shard. - */ -static void -LogLocalCopyExecution(uint64 shardId) -{ - if (!(LogRemoteCommands || LogLocalCommands)) - { - return; - } - ereport(NOTICE, (errmsg("executing the copy locally for shard %lu", shardId))); -} - - /* * CopyGetPlacementConnection assigns a connection to the given placement. If * a connection has already been assigned the placement in the current transaction diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index c9a02fb50..c0de40978 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -553,10 +553,6 @@ CacheLocalPlanForShardQuery(Task *task, DistributedPlan *originalDistributedPlan PlannedStmt * GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { - if (distributedPlan->workerJob == NULL) - { - return NULL; - } List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 4b7c7cc32..6663acc49 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -26,7 +26,6 @@ #include "distributed/listutils.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_planner.h" -#include "distributed/local_executor.h" #include "distributed/distributed_planner.h" #include "distributed/recursive_planning.h" #include "distributed/relation_access_tracking.h" @@ -136,6 +135,15 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) bool hasReturning = distributedPlan->hasReturning; HTAB *shardStateHash = NULL; + /* + * INSERT .. SELECT via coordinator consists of two steps, a SELECT is + * followd by a COPY. If the SELECT is executed locally, then the COPY + * would fail since Citus currently doesn't know how to handle COPY + * locally. So, to prevent the command fail, we simply disable local + * execution. + */ + DisableLocalExecution(); + /* select query to execute */ Query *selectQuery = BuildSelectForInsertSelect(insertSelectQuery); @@ -190,6 +198,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) GetDistributedPlan((CustomScan *) selectPlan->planTree); Job *distSelectJob = distSelectPlan->workerJob; List *distSelectTaskList = distSelectJob->taskList; + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; bool binaryFormat = @@ -271,10 +280,11 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - uint64 rowsInserted = ExtractAndExecuteLocalAndRemoteTasks(scanState, - taskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + uint64 rowsInserted = ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, + taskList, + tupleDescriptor, + scanState->tuplestorestate, + hasReturning); executorState->es_processed = rowsInserted; } @@ -325,15 +335,17 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) if (prunedTaskList != NIL) { + TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); bool randomAccess = true; bool interTransactions = false; Assert(scanState->tuplestorestate == NULL); scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); - ExtractAndExecuteLocalAndRemoteTasks(scanState, prunedTaskList, - ROW_MODIFY_COMMUTATIVE, - hasReturning); + + ExecuteTaskListIntoTupleStore(ROW_MODIFY_COMMUTATIVE, prunedTaskList, + tupleDescriptor, scanState->tuplestorestate, + hasReturning); if (SortReturning && hasReturning) { diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 4930ad7af..7003ea0b3 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -231,48 +231,6 @@ ExecuteLocalTaskList(CitusScanState *scanState, List *taskList) } -/* - * ExtractAndExecuteLocalAndRemoteTasks extracts local and remote tasks - * if local execution can be used and executes them. - */ -uint64 -ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel rowModifyLevel, bool - hasReturning) -{ - uint64 processedRows = 0; - List *localTaskList = NIL; - List *remoteTaskList = NIL; - TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); - - if (ShouldExecuteTasksLocally(taskList)) - { - bool readOnlyPlan = false; - - /* set local (if any) & remote tasks */ - ExtractLocalAndRemoteTasks(readOnlyPlan, taskList, &localTaskList, - &remoteTaskList); - processedRows += ExecuteLocalTaskList(scanState, localTaskList); - } - else - { - /* all tasks should be executed via remote connections */ - remoteTaskList = taskList; - } - - /* execute remote tasks if any */ - if (list_length(remoteTaskList) > 0) - { - processedRows += ExecuteTaskListIntoTupleStore(rowModifyLevel, remoteTaskList, - tupleDescriptor, - scanState->tuplestorestate, - hasReturning); - } - - return processedRows; -} - - /* * ExtractParametersForLocalExecution extracts parameter types and values * from the given ParamListInfo structure, and fills parameter type and diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 3d76f25c0..5187e5094 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -335,8 +335,7 @@ UpdateRelationsToLocalShardTables(Node *node, List *relationShardList) return true; } - Oid shardOid = GetShardLocalTableOid(relationShard->relationId, - relationShard->shardId); + Oid shardOid = GetShardOid(relationShard->relationId, relationShard->shardId); newRte->relid = shardOid; diff --git a/src/backend/distributed/utils/shard_utils.c b/src/backend/distributed/utils/shard_utils.c index aa0a17921..ad3acac67 100644 --- a/src/backend/distributed/utils/shard_utils.c +++ b/src/backend/distributed/utils/shard_utils.c @@ -16,11 +16,11 @@ #include "distributed/shard_utils.h" /* - * GetShardLocalTableOid returns the oid of the shard from the given distributed relation + * GetShardOid returns the oid of the shard from the given distributed relation * with the shardid. */ Oid -GetShardLocalTableOid(Oid distRelId, uint64 shardId) +GetShardOid(Oid distRelId, uint64 shardId) { char *relationName = get_rel_name(distRelId); AppendShardIdToName(&relationName, shardId); diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 9ec8f280b..d91358839 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -130,9 +130,6 @@ typedef struct CitusCopyDestReceiver /* useful for tracking multi shard accesses */ bool multiShardCopy; - /* if true, should copy to local placements in the current session */ - bool shouldUseLocalCopy; - /* copy into intermediate result */ char *intermediateResultIdPrefix; } CitusCopyDestReceiver; @@ -157,7 +154,6 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void EndRemoteCopy(int64 shardId, List *connectionList); -extern List * CreateRangeTable(Relation rel, AclMode requiredAccess); extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); diff --git a/src/include/distributed/local_executor.h b/src/include/distributed/local_executor.h index 265434b89..8b11e096c 100644 --- a/src/include/distributed/local_executor.h +++ b/src/include/distributed/local_executor.h @@ -21,9 +21,6 @@ extern bool TransactionAccessedLocalPlacement; extern bool TransactionConnectedToLocalGroup; /* extern function declarations */ -extern uint64 ExtractAndExecuteLocalAndRemoteTasks(CitusScanState *scanState, - List *taskList, RowModifyLevel - rowModifyLevel, bool hasReturning); extern uint64 ExecuteLocalTaskList(CitusScanState *scanState, List *taskList); extern void ExecuteLocalUtilityTaskList(List *localTaskList); extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h deleted file mode 100644 index a4e46f015..000000000 --- a/src/include/distributed/local_multi_copy.h +++ /dev/null @@ -1,19 +0,0 @@ - -#ifndef LOCAL_MULTI_COPY -#define LOCAL_MULTI_COPY - -/* - * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed. - * There will be one buffer for each local placement, when the buffer size - * exceeds this threshold, it will be flushed. - */ -#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024) - -extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, - int64 - shardId, - CopyOutState localCopyOutState); -extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, - CopyOutState localCopyOutState); - -#endif /* LOCAL_MULTI_COPY */ diff --git a/src/include/distributed/shard_utils.h b/src/include/distributed/shard_utils.h index 28addce15..e4fd9a2f2 100644 --- a/src/include/distributed/shard_utils.h +++ b/src/include/distributed/shard_utils.h @@ -13,6 +13,6 @@ #include "postgres.h" -extern Oid GetShardLocalTableOid(Oid distRelId, uint64 shardId); +extern Oid GetShardOid(Oid distRelId, uint64 shardId); #endif /* SHARD_UTILS_H */ diff --git a/src/test/regress/data/orders.2.data b/src/test/regress/data/orders.2.data index 43dbde8de..264c368df 100644 --- a/src/test/regress/data/orders.2.data +++ b/src/test/regress/data/orders.2.data @@ -1484,4 +1484,4 @@ 14944|535|O|119586.69|1997-10-14|2-HIGH|Clerk#000000962|0|lly. even instructions against 14945|68|O|210519.05|1996-03-30|1-URGENT|Clerk#000000467|0|nts? fluffily bold grouches after 14946|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro -14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro \ No newline at end of file +14947|580|O|100402.47|1996-11-12|1-URGENT|Clerk#000000116|0|ffily bold dependencies wake. furiously regular instructions aro diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 023f58720..0af6f175a 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -37,8 +37,6 @@ SET client_min_messages TO LOG; SET citus.log_local_commands TO ON; -- INSERT..SELECT with COPY under the covers INSERT INTO test SELECT s,s FROM generate_series(2,100) s; -NOTICE: executing the copy locally for shard xxxxx -NOTICE: executing the copy locally for shard xxxxx -- router queries execute locally INSERT INTO test VALUES (1, 1); NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.test_1503000 (x, y) VALUES (1, 1) diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out deleted file mode 100644 index 375f4200a..000000000 --- a/src/test/regress/expected/local_shard_copy.out +++ /dev/null @@ -1,494 +0,0 @@ -CREATE SCHEMA local_shard_copy; -SET search_path TO local_shard_copy; -SET client_min_messages TO DEBUG; -SET citus.next_shard_id TO 1570000; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); -DEBUG: schema "public" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -DEBUG: extension "plpgsql" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -DEBUG: schema "citus_mx_test_schema" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -DEBUG: schema "citus_mx_test_schema_join_1" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -DEBUG: schema "citus_mx_test_schema_join_2" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -DEBUG: schema "citus_mx_schema_for_xacts" already exists, skipping -DETAIL: NOTICE from localhost:xxxxx -NOTICE: Replicating reference table "customer_mx" to the node localhost:xxxxx -NOTICE: Replicating reference table "nation_mx" to the node localhost:xxxxx -NOTICE: Replicating reference table "part_mx" to the node localhost:xxxxx -NOTICE: Replicating reference table "supplier_mx" to the node localhost:xxxxx - master_add_node ---------------------------------------------------------------------- - 32 -(1 row) - -SET citus.shard_count TO 4; -SET citus.shard_replication_factor TO 1; -SET citus.replication_model TO 'streaming'; -CREATE TABLE reference_table (key int PRIMARY KEY); -DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" -DEBUG: building index "reference_table_pkey" on table "reference_table" serially -SELECT create_reference_table('reference_table'); - create_reference_table ---------------------------------------------------------------------- - -(1 row) - -CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); -DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" -DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially -SELECT create_distributed_table('distributed_table','key'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); -DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator -INSERT INTO reference_table SELECT * FROM generate_series(1, 10); -DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator -CREATE TABLE local_table (key int PRIMARY KEY); -DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "local_table_pkey" for table "local_table" -DEBUG: building index "local_table_pkey" on table "local_table" serially -INSERT INTO local_table SELECT * from generate_series(1, 10); --- partitioned table -CREATE TABLE collections_list ( - key bigserial, - collection_id integer -) PARTITION BY LIST (collection_id ); -DEBUG: CREATE TABLE will create implicit sequence "collections_list_key_seq" for serial column "collections_list.key" -SELECT create_distributed_table('collections_list', 'key'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -CREATE TABLE collections_list_0 - PARTITION OF collections_list (key, collection_id) - FOR VALUES IN ( 0 ); -CREATE TABLE collections_list_1 - PARTITION OF collections_list (key, collection_id) - FOR VALUES IN ( 1 ); --- connection worker and get ready for the tests -\c - - - :worker_1_port -SET search_path TO local_shard_copy; -SET citus.log_local_commands TO ON; --- returns true of the distribution key filter --- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard --- placement which is local to this not -CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ - - DECLARE shard_is_local BOOLEAN := FALSE; - - BEGIN - - WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), - all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) - SELECT - true INTO shard_is_local - FROM - local_shard_ids - WHERE - get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); - - IF shard_is_local IS NULL THEN - shard_is_local = FALSE; - END IF; - - RETURN shard_is_local; - END; -$$ LANGUAGE plpgsql; --- pick some example values that reside on the shards locally and remote --- distribution key values of 1,6, 500 and 701 are LOCAL to shards, --- we'll use these values in the tests -SELECT shard_of_distribution_column_is_local(1); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - t -(1 row) - -SELECT shard_of_distribution_column_is_local(6); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - t -(1 row) - -SELECT shard_of_distribution_column_is_local(500); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - t -(1 row) - -SELECT shard_of_distribution_column_is_local(701); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - t -(1 row) - --- distribution key values of 11 and 12 are REMOTE to shards -SELECT shard_of_distribution_column_is_local(11); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - f -(1 row) - -SELECT shard_of_distribution_column_is_local(12); - shard_of_distribution_column_is_local ---------------------------------------------------------------------- - f -(1 row) - -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 21 -(1 row) - - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 26 -(1 row) - -ROLLBACK; -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 21 -(1 row) - - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" - -- verify the put ages. - SELECT * FROM distributed_table; -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT key, age FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - key | age ---------------------------------------------------------------------- - 20 | 20 - 24 | 24 - 25 | 25 - 26 | 26 - 31 | 31 - 33 | 33 - 35 | 35 - 1 | 100 - 5 | 500 - 21 | 21 - 28 | 28 - 34 | 34 - 38 | 38 - 39 | 39 - 36 | 36 - 37 | 37 - 40 | 40 - 3 | 300 - 4 | 400 - 22 | 22 - 23 | 23 - 27 | 27 - 29 | 29 - 30 | 30 - 32 | 32 - 2 | 200 -(26 rows) - -ROLLBACK; -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 21 -(1 row) - - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 26 -(1 row) - -ROLLBACK; -BEGIN; - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - age ---------------------------------------------------------------------- -(0 rows) - - SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true - count ---------------------------------------------------------------------- - 0 -(1 row) - - -- the local placements should be executed locally - COPY collections_list FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY collections_list, line 1: "1, 0" - -- verify that the copy is successful. - SELECT count(*) FROM collections_list; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570005 collections_list WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.collections_list_1570007 collections_list WHERE true - count ---------------------------------------------------------------------- - 5 -(1 row) - -ROLLBACK; -BEGIN; - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT age FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - age ---------------------------------------------------------------------- -(0 rows) - - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 21 -(1 row) - - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 26 -(1 row) - -ROLLBACK; -BEGIN; --- Since we are in a transaction, the copy should be locally executed. -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" -ROLLBACK; --- Since we are not in a transaction, the copy should not be locally executed. -COPY distributed_table FROM STDIN WITH delimiter ','; -BEGIN; --- Since we are in a transaction, the copy should be locally executed. But --- we are putting duplicate key, so it should error. -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1, 100" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" -DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1570001, line 1 -ROLLBACK; -TRUNCATE distributed_table; -BEGIN; --- insert a lot of data ( around 8MB), --- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) -INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); -NOTICE: executing the copy locally for shard xxxxx -NOTICE: executing the copy locally for shard xxxxx -ROLLBACK; -COPY distributed_table FROM STDIN WITH delimiter ','; -ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" -DETAIL: Failing row contains (1, 9). -BEGIN; --- Since we are in a transaction, the execution will be local, however we are putting invalid age. --- The constaints should give an error -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1,9" -ERROR: new row for relation "distributed_table_1570001" violates check constraint "distributed_table_age_check" -DETAIL: Failing row contains (1, 9). -CONTEXT: COPY distributed_table_1570001, line 1 -ROLLBACK; -TRUNCATE distributed_table; --- different delimiters -BEGIN; --- run select with local execution -SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - --- initial size -SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 0 -(1 row) - -COPY distributed_table FROM STDIN WITH delimiter '|'; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1|10" --- new size -SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 3 -(1 row) - -ROLLBACK; -BEGIN; --- run select with local execution -SELECT count(*) FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - count ---------------------------------------------------------------------- - 0 -(1 row) - --- initial size -SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 0 -(1 row) - -COPY distributed_table FROM STDIN WITH delimiter '['; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1[10" --- new size -SELECT count(*) FROM distributed_table; -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE true -NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_copy.distributed_table_1570003 distributed_table WHERE true - count ---------------------------------------------------------------------- - 3 -(1 row) - -ROLLBACK; --- multiple local copies -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1,15" -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "10,15" -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "100,15" -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 2: "200,20" -ROLLBACK; --- local copy followed by local copy should see the changes --- and error since it is a duplicate primary key. -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1,15" -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1,16" -ERROR: duplicate key value violates unique constraint "distributed_table_pkey_1570001" -DETAIL: Key (key)=(1) already exists. -CONTEXT: COPY distributed_table_1570001, line 1 -ROLLBACK; --- local copy followed by local copy should see the changes -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY distributed_table, line 1: "1,15" --- select should see the change -SELECT key FROM distributed_table WHERE key = 1; -NOTICE: executing the command locally: SELECT key FROM local_shard_copy.distributed_table_1570001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) - key ---------------------------------------------------------------------- - 1 -(1 row) - -ROLLBACK; -\c - - - :master_port -SET search_path TO local_shard_copy; -SET citus.log_local_commands TO ON; -TRUNCATE TABLE reference_table; -NOTICE: executing the command locally: TRUNCATE TABLE local_shard_copy.reference_table_xxxxx CASCADE -TRUNCATE TABLE local_table; -SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; - count ---------------------------------------------------------------------- - 0 -(1 row) - -SET citus.enable_local_execution = 'on'; -BEGIN; --- copy should be executed locally -COPY reference_table FROM STDIN; -NOTICE: executing the copy locally for shard xxxxx -CONTEXT: COPY reference_table, line 1: "1" -ROLLBACK; -SET citus.enable_local_execution = 'off'; -BEGIN; --- copy should not be executed locally as citus.enable_local_execution = off -COPY reference_table FROM STDIN; -ROLLBACK; -SET citus.enable_local_execution = 'on'; -SET client_min_messages TO ERROR; -SET search_path TO public; -DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 9dbf9975a..e926dd31e 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -276,8 +276,6 @@ RETURNING *; -- that's why it is disallowed to use local execution even if the SELECT -- can be executed locally INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; -NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 -NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; @@ -509,7 +507,28 @@ NOTICE: truncate cascades to table "second_distributed_table" ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; -NOTICE: executing the copy locally for shard xxxxx +-- a very similar set of operation, but this time use +-- COPY as the first command +BEGIN; + INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; + -- this could go through local execution, but doesn't because we've already + -- done distributed execution + SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; + key | value | age +--------------------------------------------------------------------- + 500 | 500 | 25 +(1 row) + + -- utility commands could still use distributed execution + TRUNCATE distributed_table CASCADE; +NOTICE: truncate cascades to table "second_distributed_table" + -- ensure that TRUNCATE made it + SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; + key | value | age +--------------------------------------------------------------------- +(0 rows) + +ROLLBACK; -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -599,8 +618,25 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar 0 (1 row) - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; -NOTICE: executing the copy locally for shard xxxxx + -- even no need to supply any data + COPY distributed_table FROM STDIN WITH CSV; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" +ROLLBACK; +-- a local query is followed by a command that cannot be executed locally +BEGIN; + SELECT count(*) FROM distributed_table WHERE key = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; -- a local query is followed by a command that cannot be executed locally BEGIN; @@ -1313,10 +1349,7 @@ NOTICE: truncate cascades to table "second_distributed_table_xxxxx" NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE local_shard_execution.second_distributed_table_xxxxx CASCADE INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; -NOTICE: executing the copy locally for shard xxxxx INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; -NOTICE: executing the copy locally for shard xxxxx -NOTICE: executing the copy locally for shard xxxxx -- show that both local, and mixed local-distributed executions -- calculate rows processed correctly BEGIN; diff --git a/src/test/regress/expected/locally_execute_intermediate_results.out b/src/test/regress/expected/locally_execute_intermediate_results.out index c235ed8ef..d245c324b 100644 --- a/src/test/regress/expected/locally_execute_intermediate_results.out +++ b/src/test/regress/expected/locally_execute_intermediate_results.out @@ -35,7 +35,6 @@ SELECT create_reference_table('ref_table'); INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); INSERT INTO table_2 VALUES (3, '3'), (4, '4'), (5, '5'), (6, '6'); INSERT INTO ref_table VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); -NOTICE: executing the command locally: INSERT INTO locally_execute_intermediate_results.ref_table_1580008 AS citus_table_alias (key, value) VALUES (1,'1'::text), (2,'2'::text), (3,'3'::text), (4,'4'::text), (5,'5'::text), (6,'6'::text) -- prevent PG 11 - PG 12 outputs to diverge -- and have a lot more CTEs recursively planned for the -- sake of increasing the test coverage @@ -271,7 +270,6 @@ key FROM a JOIN ref_table USING (key) GROUP BY key HAVING (max(ref_table.value) <= (SELECT value FROM a)); -NOTICE: executing the command locally: WITH a AS (SELECT max(ref_table_1.key) AS key, max(ref_table_1.value) AS value FROM locally_execute_intermediate_results.ref_table_1580008 ref_table_1) SELECT count(*) AS count, a.key FROM (a JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) GROUP BY a.key HAVING (max(ref_table.value) OPERATOR(pg_catalog.<=) (SELECT a_1.value FROM a a_1)) count | key --------------------------------------------------------------------- 1 | 6 @@ -330,9 +328,7 @@ DEBUG: Subplan XXX_2 will be written to local file NOTICE: executing the command locally: SELECT key, value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_1 DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx NOTICE: executing the command locally: SELECT max(key) AS key FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) cte_2 -NOTICE: executing the command locally: SELECT cte_3.key, ref_table.value FROM ((SELECT intermediate_result.key FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(key integer)) cte_3 JOIN locally_execute_intermediate_results.ref_table_1580008 ref_table(key, value) USING (key)) key | value --------------------------------------------------------------------- 4 | 4 @@ -771,7 +767,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT cte_3.key, re DEBUG: Subplan XXX_1 will be written to local file DEBUG: Subplan XXX_2 will be written to local file DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx -DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx DEBUG: Subplan XXX_3 will be sent to localhost:xxxxx key | value --------------------------------------------------------------------- diff --git a/src/test/regress/expected/master_evaluation_modify.out b/src/test/regress/expected/master_evaluation_modify.out index 5571bce2e..a8123b94c 100644 --- a/src/test/regress/expected/master_evaluation_modify.out +++ b/src/test/regress/expected/master_evaluation_modify.out @@ -951,7 +951,6 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; -NOTICE: executing the copy locally for shard xxxxx PREPARE fast_path_router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE fast_path_router_with_param_and_func_on_non_dist_key(0); @@ -1439,7 +1438,6 @@ NOTICE: executing the command locally: DELETE FROM master_evaluation_combinatio (1 row) INSERT INTO user_info_data SELECT 3, ('test', get_local_node_id_stable() > 0)::user_data, i FROM generate_series(0,7)i; -NOTICE: executing the copy locally for shard xxxxx PREPARE router_with_param_and_func_on_non_dist_key(int) AS DELETE FROM user_info_data WHERE user_id = 3 AND user_id = 3 AND user_index = $1 AND u_data = ('test', (get_local_node_id_stable() > 0)::int)::user_data RETURNING user_id, user_index; EXECUTE router_with_param_and_func_on_non_dist_key(0); diff --git a/src/test/regress/expected/multi_mx_insert_select_repartition.out b/src/test/regress/expected/multi_mx_insert_select_repartition.out index 469b08b3a..d61e046f2 100644 --- a/src/test/regress/expected/multi_mx_insert_select_repartition.out +++ b/src/test/regress/expected/multi_mx_insert_select_repartition.out @@ -102,9 +102,9 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i (1 row) insert into target_table SELECT a FROM source_table LIMIT 10; -NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true LIMIT '10'::bigint -NOTICE: executing the command locally: SELECT a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true LIMIT '10'::bigint -NOTICE: executing the copy locally for shard xxxxx +ERROR: cannot execute command because a local execution has accessed a placement in the transaction +DETAIL: Some parallel commands cannot be executed if a previous command has already been executed locally +HINT: Try re-running the transaction with "SET LOCAL citus.enable_local_execution TO OFF;" ROLLBACK; \c - - - :master_port SET search_path TO multi_mx_insert_select_repartition; diff --git a/src/test/regress/expected/multi_mx_transaction_recovery.out b/src/test/regress/expected/multi_mx_transaction_recovery.out index d108c61ec..6ef2ba445 100644 --- a/src/test/regress/expected/multi_mx_transaction_recovery.out +++ b/src/test/regress/expected/multi_mx_transaction_recovery.out @@ -142,7 +142,7 @@ INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s; SELECT count(*) FROM pg_dist_transaction; count --------------------------------------------------------------------- - 2 + 4 (1 row) SELECT recover_prepared_transactions(); diff --git a/src/test/regress/expected/multi_mx_truncate_from_worker.out b/src/test/regress/expected/multi_mx_truncate_from_worker.out index 9825cd69b..abb59b761 100644 --- a/src/test/regress/expected/multi_mx_truncate_from_worker.out +++ b/src/test/regress/expected/multi_mx_truncate_from_worker.out @@ -118,8 +118,6 @@ INSERT INTO "refer'ence_table" SELECT i FROM generate_series(0, 100) i; SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; - -- we can enable local execution when truncate can be executed locally. - SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; count diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index e23342e00..714e8353e 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -499,13 +499,11 @@ SELECT create_reference_table('replicate_reference_table_copy'); (1 row) -SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ERROR: cannot open new connections after the first modification command within a transaction ROLLBACK; -RESET citus.enable_local_execution; DROP TABLE replicate_reference_table_copy; -- test executing DDL command then adding a new node in a transaction CREATE TABLE replicate_reference_table_ddl(column1 int); diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index f0fb28056..c0292606c 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -17,7 +17,6 @@ SELECT create_reference_table('squares'); (1 row) INSERT INTO squares SELECT i, i * i FROM generate_series(1, 10) i; -NOTICE: executing the copy locally for shard xxxxx -- should be executed locally SELECT count(*) FROM squares; NOTICE: executing the command locally: SELECT count(*) AS count FROM replicate_ref_to_coordinator.squares_8000000 squares diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 6c9a6c01b..d3083367f 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -39,7 +39,7 @@ test: multi_mx_metadata test: master_evaluation master_evaluation_modify master_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution local_shard_copy +test: multi_mx_modifications local_shard_execution test: multi_mx_transaction_recovery test: multi_mx_modifying_xacts test: multi_mx_explain diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql deleted file mode 100644 index 28bdffd40..000000000 --- a/src/test/regress/sql/local_shard_copy.sql +++ /dev/null @@ -1,346 +0,0 @@ -CREATE SCHEMA local_shard_copy; -SET search_path TO local_shard_copy; - -SET client_min_messages TO DEBUG; -SET citus.next_shard_id TO 1570000; - -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); - -SET citus.shard_count TO 4; -SET citus.shard_replication_factor TO 1; -SET citus.replication_model TO 'streaming'; - - -CREATE TABLE reference_table (key int PRIMARY KEY); -SELECT create_reference_table('reference_table'); - -CREATE TABLE distributed_table (key int PRIMARY KEY, age bigint CHECK (age >= 10)); -SELECT create_distributed_table('distributed_table','key'); - -INSERT INTO distributed_table SELECT *,* FROM generate_series(20, 40); -INSERT INTO reference_table SELECT * FROM generate_series(1, 10); - -CREATE TABLE local_table (key int PRIMARY KEY); -INSERT INTO local_table SELECT * from generate_series(1, 10); - --- partitioned table -CREATE TABLE collections_list ( - key bigserial, - collection_id integer -) PARTITION BY LIST (collection_id ); - -SELECT create_distributed_table('collections_list', 'key'); - -CREATE TABLE collections_list_0 - PARTITION OF collections_list (key, collection_id) - FOR VALUES IN ( 0 ); - -CREATE TABLE collections_list_1 - PARTITION OF collections_list (key, collection_id) - FOR VALUES IN ( 1 ); - - --- connection worker and get ready for the tests -\c - - - :worker_1_port - -SET search_path TO local_shard_copy; -SET citus.log_local_commands TO ON; - --- returns true of the distribution key filter --- on the distributed tables (e.g., WHERE key = 1), we'll hit a shard --- placement which is local to this not -CREATE OR REPLACE FUNCTION shard_of_distribution_column_is_local(dist_key int) RETURNS bool AS $$ - - DECLARE shard_is_local BOOLEAN := FALSE; - - BEGIN - - WITH local_shard_ids AS (SELECT get_shard_id_for_distribution_column('local_shard_copy.distributed_table', dist_key)), - all_local_shard_ids_on_node AS (SELECT shardid FROM pg_dist_placement WHERE groupid IN (SELECT groupid FROM pg_dist_local_group)) - SELECT - true INTO shard_is_local - FROM - local_shard_ids - WHERE - get_shard_id_for_distribution_column IN (SELECT * FROM all_local_shard_ids_on_node); - - IF shard_is_local IS NULL THEN - shard_is_local = FALSE; - END IF; - - RETURN shard_is_local; - END; -$$ LANGUAGE plpgsql; - --- pick some example values that reside on the shards locally and remote - --- distribution key values of 1,6, 500 and 701 are LOCAL to shards, --- we'll use these values in the tests -SELECT shard_of_distribution_column_is_local(1); -SELECT shard_of_distribution_column_is_local(6); -SELECT shard_of_distribution_column_is_local(500); -SELECT shard_of_distribution_column_is_local(701); - --- distribution key values of 11 and 12 are REMOTE to shards -SELECT shard_of_distribution_column_is_local(11); -SELECT shard_of_distribution_column_is_local(12); - -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; - - SELECT count(*) FROM distributed_table; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; - -ROLLBACK; - -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; - - SELECT count(*) FROM distributed_table; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. - -- verify the put ages. - SELECT * FROM distributed_table; - -ROLLBACK; - - -BEGIN; - -- run select with local execution - SELECT count(*) FROM distributed_table WHERE key = 1; - - SELECT count(*) FROM distributed_table; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; - -ROLLBACK; - -BEGIN; - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; - - SELECT count(*) FROM collections_list; - -- the local placements should be executed locally - COPY collections_list FROM STDIN WITH delimiter ','; -1, 0 -2, 0 -3, 0 -4, 1 -5, 1 -\. - -- verify that the copy is successful. - SELECT count(*) FROM collections_list; - -ROLLBACK; - -BEGIN; - -- run select with local execution - SELECT age FROM distributed_table WHERE key = 1; - - SELECT count(*) FROM distributed_table; - -- the local placements should be executed locally - COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. - - - -- verify that the copy is successful. - SELECT count(*) FROM distributed_table; - -ROLLBACK; - -BEGIN; --- Since we are in a transaction, the copy should be locally executed. -COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. -ROLLBACK; - --- Since we are not in a transaction, the copy should not be locally executed. -COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. - -BEGIN; --- Since we are in a transaction, the copy should be locally executed. But --- we are putting duplicate key, so it should error. -COPY distributed_table FROM STDIN WITH delimiter ','; -1, 100 -2, 200 -3, 300 -4, 400 -5, 500 -\. -ROLLBACK; - -TRUNCATE distributed_table; - -BEGIN; - --- insert a lot of data ( around 8MB), --- this should use local copy and it will exceed the LOCAL_COPY_FLUSH_THRESHOLD (512KB) -INSERT INTO distributed_table SELECT * , * FROM generate_series(20, 1000000); - -ROLLBACK; - -COPY distributed_table FROM STDIN WITH delimiter ','; -1, 9 -\. - -BEGIN; --- Since we are in a transaction, the execution will be local, however we are putting invalid age. --- The constaints should give an error -COPY distributed_table FROM STDIN WITH delimiter ','; -1,9 -\. -ROLLBACK; - -TRUNCATE distributed_table; - - --- different delimiters -BEGIN; --- run select with local execution -SELECT count(*) FROM distributed_table WHERE key = 1; --- initial size -SELECT count(*) FROM distributed_table; -COPY distributed_table FROM STDIN WITH delimiter '|'; -1|10 -2|30 -3|40 -\. --- new size -SELECT count(*) FROM distributed_table; -ROLLBACK; - -BEGIN; --- run select with local execution -SELECT count(*) FROM distributed_table WHERE key = 1; --- initial size -SELECT count(*) FROM distributed_table; -COPY distributed_table FROM STDIN WITH delimiter '['; -1[10 -2[30 -3[40 -\. --- new size -SELECT count(*) FROM distributed_table; -ROLLBACK; - - --- multiple local copies -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -1,15 -2,20 -3,30 -\. -COPY distributed_table FROM STDIN WITH delimiter ','; -10,15 -20,20 -30,30 -\. -COPY distributed_table FROM STDIN WITH delimiter ','; -100,15 -200,20 -300,30 -\. -ROLLBACK; - --- local copy followed by local copy should see the changes --- and error since it is a duplicate primary key. -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -1,15 -\. -COPY distributed_table FROM STDIN WITH delimiter ','; -1,16 -\. -ROLLBACK; - - --- local copy followed by local copy should see the changes -BEGIN; -COPY distributed_table FROM STDIN WITH delimiter ','; -1,15 -\. --- select should see the change -SELECT key FROM distributed_table WHERE key = 1; -ROLLBACK; - -\c - - - :master_port - -SET search_path TO local_shard_copy; -SET citus.log_local_commands TO ON; - -TRUNCATE TABLE reference_table; -TRUNCATE TABLE local_table; - -SELECT count(*) FROM reference_table, local_table WHERE reference_table.key = local_table.key; - -SET citus.enable_local_execution = 'on'; - -BEGIN; --- copy should be executed locally -COPY reference_table FROM STDIN; -1 -2 -3 -4 -\. -ROLLBACK; - -SET citus.enable_local_execution = 'off'; - -BEGIN; --- copy should not be executed locally as citus.enable_local_execution = off -COPY reference_table FROM STDIN; -1 -2 -3 -4 -\. -ROLLBACK; - -SET citus.enable_local_execution = 'on'; - -SET client_min_messages TO ERROR; -SET search_path TO public; -DROP SCHEMA local_shard_copy CASCADE; diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 22b46abed..b5ba49caa 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -305,6 +305,22 @@ ROLLBACK; -- load some data so that foreign keys won't complain with the next tests INSERT INTO reference_table SELECT i FROM generate_series(500, 600) i; +-- a very similar set of operation, but this time use +-- COPY as the first command +BEGIN; + INSERT INTO distributed_table SELECT i, i::text, i % 10 + 25 FROM generate_series(500, 600) i; + + -- this could go through local execution, but doesn't because we've already + -- done distributed execution + SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; + + -- utility commands could still use distributed execution + TRUNCATE distributed_table CASCADE; + + -- ensure that TRUNCATE made it + SELECT * FROM distributed_table WHERE key = 500 ORDER BY 1,2,3; +ROLLBACK; + -- show that cascading foreign keys just works fine with local execution BEGIN; INSERT INTO reference_table VALUES (701); @@ -340,7 +356,15 @@ ROLLBACK; BEGIN; SELECT count(*) FROM distributed_table WHERE key = 1; - INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,1) i; + -- even no need to supply any data + COPY distributed_table FROM STDIN WITH CSV; +ROLLBACK; + +-- a local query is followed by a command that cannot be executed locally +BEGIN; + SELECT count(*) FROM distributed_table WHERE key = 1; + + INSERT INTO distributed_table (key) SELECT i FROM generate_series(1,10)i; ROLLBACK; -- a local query is followed by a command that cannot be executed locally diff --git a/src/test/regress/sql/multi_mx_truncate_from_worker.sql b/src/test/regress/sql/multi_mx_truncate_from_worker.sql index 00492246b..aa05aa02a 100644 --- a/src/test/regress/sql/multi_mx_truncate_from_worker.sql +++ b/src/test/regress/sql/multi_mx_truncate_from_worker.sql @@ -85,8 +85,6 @@ SET search_path TO 'truncate_from_workers'; -- make sure that DMLs-SELECTs works along with TRUNCATE worker fine BEGIN; - -- we can enable local execution when truncate can be executed locally. - SET citus.enable_local_execution = 'off'; INSERT INTO on_update_fkey_table SELECT i, i % 100 FROM generate_series(0, 1000) i; SELECT count(*) FROM on_update_fkey_table; TRUNCATE on_update_fkey_table; diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 8edfa40b7..ec476e706 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -323,7 +323,6 @@ DROP TABLE replicate_reference_table_insert; CREATE TABLE replicate_reference_table_copy(column1 int); SELECT create_reference_table('replicate_reference_table_copy'); -SET citus.enable_local_execution = 'off'; BEGIN; COPY replicate_reference_table_copy FROM STDIN; 1 @@ -335,8 +334,6 @@ COPY replicate_reference_table_copy FROM STDIN; SELECT 1 FROM master_add_node('localhost', :worker_2_port); ROLLBACK; -RESET citus.enable_local_execution; - DROP TABLE replicate_reference_table_copy;