diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 50ca597e3..4ee5ce7e8 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -79,6 +79,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_executor.h" +#include "distributed/listutils.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -214,6 +215,18 @@ typedef struct ShardConnections } ShardConnections; +/* + * Represents the state for allowing copy via local + * execution. + */ +typedef enum LocalCopyStatus +{ + LOCAL_COPY_REQUIRED, + LOCAL_COPY_OPTIONAL, + LOCAL_COPY_DISABLED +} LocalCopyStatus; + + /* Local functions forward declarations */ static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); @@ -323,7 +336,9 @@ static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uin processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(bool isIntermediateResult); +static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool + isIntermediateResult); +static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static void LogLocalCopyExecution(uint64 shardId); @@ -2076,28 +2091,29 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu /* - * ShouldExecuteCopyLocally returns true if the current copy - * operation should be done locally for local placements. + * GetLocalCopyStatus returns the status for executing copy locally. + * If LOCAL_COPY_DISABLED or LOCAL_COPY_REQUIRED, the caller has to + * follow that. Else, the caller may decide to use local or remote + * execution depending on other information. */ -static bool -ShouldExecuteCopyLocally(bool isIntermediateResult) +static LocalCopyStatus +GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult) { - if (!EnableLocalExecution) + if (!EnableLocalExecution || + GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { - return false; + return LOCAL_COPY_DISABLED; } - - /* - * Intermediate files are written to a file, and files are visible to all - * transactions, and we use a custom copy format for copy therefore we will - * use the existing logic for that. - */ - if (isIntermediateResult) + else if (isIntermediateResult) { - return false; + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + return LOCAL_COPY_DISABLED; } - - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) + else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2116,12 +2132,35 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) * those placements. That'd help to benefit more from parallelism. */ - return true; + return LOCAL_COPY_REQUIRED; + } + else if (IsMultiStatementTransaction()) + { + return LOCAL_COPY_REQUIRED; } - /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED && - IsMultiStatementTransaction(); + return LOCAL_COPY_OPTIONAL; +} + + +/* + * ShardIntervalListHasLocalPlacements returns true if any of the input + * shard placement has a local placement; + */ +static bool +ShardIntervalListHasLocalPlacements(List *shardIntervalList) +{ + int32 localGroupId = GetLocalGroupId(); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL) + { + return true; + } + } + + return false; } @@ -2136,8 +2175,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId); @@ -2291,13 +2328,53 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* - * For all the primary (e.g., writable) nodes, reserve a shared connection. - * We do this upfront because we cannot know which nodes are going to be - * accessed. Since the order of the reservation is important, we need to - * do it right here. For the details on why the order important, see - * the function. + * For all the primary (e.g., writable) remote nodes, reserve a shared + * connection. We do this upfront because we cannot know which nodes + * are going to be accessed. Since the order of the reservation is + * important, we need to do it right here. For the details on why the + * order important, see EnsureConnectionPossibilityForNodeList(). + * + * We don't need to care about local node because we either get a + * connection or use local connection, so it cannot be part of + * the starvation. As an edge case, if it cannot get a connection + * and cannot switch to local execution (e.g., disabled by user), + * COPY would fail hinting the user to change the relevant settiing. */ - EnsureConnectionPossibilityForPrimaryNodes(); + EnsureConnectionPossibilityForRemotePrimaryNodes(); + + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + LocalCopyStatus localCopyStatus = + GetLocalCopyStatus(shardIntervalList, isIntermediateResult); + if (localCopyStatus == LOCAL_COPY_DISABLED) + { + copyDest->shouldUseLocalCopy = false; + } + else if (localCopyStatus == LOCAL_COPY_REQUIRED) + { + copyDest->shouldUseLocalCopy = true; + } + else if (localCopyStatus == LOCAL_COPY_OPTIONAL) + { + /* + * At this point, there is no requirements for doing the copy locally. + * However, if there are local placements, we can try to reserve + * a connection to local node. If we cannot reserve, we can still use + * local execution. + * + * NB: It is not advantageous to use remote execution just with a + * single remote connection. In other words, a single remote connection + * would not perform better than local execution. However, we prefer to + * do this because it is likely that the COPY would get more connections + * to parallelize the operation. In the future, we might relax this + * requirement and failover to local execution as on connection attempt + * failures as the executor does. + */ + if (ShardIntervalListHasLocalPlacements(shardIntervalList)) + { + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); + copyDest->shouldUseLocalCopy = !reservedConnection; + } + } } @@ -3424,6 +3501,7 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + if (placement->groupId == GetLocalGroupId()) { /* @@ -3445,7 +3523,6 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } - CopyConnectionState *connectionState = GetConnectionState(connectionStateHash, connection); diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a4bd95f4c..19bc93ae6 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -89,12 +89,15 @@ typedef struct ReservedConnectionHashEntry static void StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName, - int nodePort, Oid - userId, Oid - databaseOid, - bool *found); +static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *hostName, + int nodePort, + Oid + userId, Oid + databaseOid, + bool *found); static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, + bool waitForConnection); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -294,11 +297,11 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, /* - * EnsureConnectionPossibilityForPrimaryNodes is a wrapper around + * EnsureConnectionPossibilityForRemotePrimaryNodes is a wrapper around * EnsureConnectionPossibilityForNodeList. */ void -EnsureConnectionPossibilityForPrimaryNodes(void) +EnsureConnectionPossibilityForRemotePrimaryNodes(void) { /* * By using NoLock there is a tiny risk of that we miss to reserve a @@ -306,17 +309,42 @@ EnsureConnectionPossibilityForPrimaryNodes(void) * seem to cause any problems as none of the placements that we are * going to access would be on the new node. */ - List *primaryNodeList = ActivePrimaryNodeList(NoLock); - + List *primaryNodeList = ActivePrimaryRemoteNodeList(NoLock); EnsureConnectionPossibilityForNodeList(primaryNodeList); } +/* + * TryConnectionPossibilityForLocalPrimaryNode returns true if the primary + * local node is in the metadata an we can reserve a connection for the node. + * If not, the function returns false. + */ +bool +TryConnectionPossibilityForLocalPrimaryNode(void) +{ + bool nodeIsInMetadata = false; + WorkerNode *localNode = + PrimaryNodeForGroup(GetLocalGroupId(), &nodeIsInMetadata); + + if (localNode == NULL) + { + /* + * If the local node is not a primary node, we should not try to + * reserve a connection as there cannot be any shards. + */ + return false; + } + + bool waitForConnection = false; + return EnsureConnectionPossibilityForNode(localNode, waitForConnection); +} + + /* * EnsureConnectionPossibilityForNodeList reserves a shared connection * counter per node in the nodeList unless: - * - Reservation is needed (see IsReservationPossible()) - * - there is at least one connection to the node so that we are guranteed + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guaranteed * to get a connection * - An earlier call already reserved a connection (e.g., we allow only a * single reservation per backend) @@ -324,11 +352,6 @@ EnsureConnectionPossibilityForPrimaryNodes(void) static void EnsureConnectionPossibilityForNodeList(List *nodeList) { - if (!IsReservationPossible()) - { - return; - } - /* * We sort the workerList because adaptive connection management * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions @@ -342,62 +365,114 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) */ nodeList = SortList(nodeList, CompareWorkerNodes); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + bool waitForConnection = true; + EnsureConnectionPossibilityForNode(workerNode, waitForConnection); + } +} + + +/* + * EnsureConnectionPossibilityForNode reserves a shared connection + * counter per node in the nodeList unless: + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guranteed + * to get a connection + * - An earlier call already reserved a connection (e.g., we allow only a + * single reservation per backend) + * - waitForConnection is false. When this is false, the function still tries + * to ensure connection possibility. If it fails (e.g., we + * reached max_shared_pool_size), it doesn't wait to get the connection. Instead, + * return false. + */ +static bool +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) +{ + if (!IsReservationPossible()) + { + return false; + } + char *databaseName = get_database_name(MyDatabaseId); Oid userId = GetUserId(); char *userName = GetUserNameFromId(userId, false); - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) + if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, + userName, databaseName) != NULL) { - if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, - userName, databaseName) != NULL) - { - /* - * The same user has already an active connection for the node. It - * means that the execution can use the same connection, so reservation - * is not necessary. - */ - continue; - } - /* - * We are trying to be defensive here by ensuring that the required hash - * table entry can be allocated. The main goal is that we don't want to be - * in a situation where shared connection counter is incremented but not - * the local reserved counter due to out-of-memory. - * - * Note that shared connection stats operate on the shared memory, and we - * pre-allocate all the necessary memory. In other words, it would never - * throw out of memory error. + * The same user has already an active connection for the node. It + * means that the execution can use the same connection, so reservation + * is not necessary. */ - bool found = false; - ReservedConnectionHashEntry *hashEntry = - AllocateOrGetReservedConectionEntry(workerNode->workerName, - workerNode->workerPort, - userId, MyDatabaseId, &found); + return true; + } - if (found) - { - /* - * We have already reserved a connection for this user and database - * on the worker. We only allow a single reservation per - * transaction block. The reason is that the earlier command (either in - * a transaction block or a function call triggered by a single command) - * was able to reserve or establish a connection. That connection is - * guranteed to be avaliable for us. - */ - continue; - } + /* + * We are trying to be defensive here by ensuring that the required hash + * table entry can be allocated. The main goal is that we don't want to be + * in a situation where shared connection counter is incremented but not + * the local reserved counter due to out-of-memory. + * + * Note that shared connection stats operate on the shared memory, and we + * pre-allocate all the necessary memory. In other words, it would never + * throw out of memory error. + */ + bool found = false; + ReservedConnectionHashEntry *hashEntry = + AllocateOrGetReservedConnectionEntry(workerNode->workerName, + workerNode->workerPort, + userId, MyDatabaseId, &found); + if (found) + { + /* + * We have already reserved a connection for this user and database + * on the worker. We only allow a single reservation per + * transaction block. The reason is that the earlier command (either in + * a transaction block or a function call triggered by a single command) + * was able to reserve or establish a connection. That connection is + * guranteed to be available for us. + */ + return true; + } + + if (waitForConnection) + { /* * Increment the shared counter, we may need to wait if there are * no space left. */ WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); - - /* locally mark that we have one connection reserved */ - hashEntry->usedReservation = false; } + else + { + bool incremented = + TryToIncrementSharedConnectionCounter(workerNode->workerName, + workerNode->workerPort); + if (!incremented) + { + /* + * We could not reserve a connection. First, remove the entry from the + * hash. The reason is that we allow single reservation per transaction + * block and leaving the entry in the hash would be qualified as there is a + * reserved connection to the node. + */ + bool foundForRemove = false; + hash_search(SessionLocalReservedConnections, hashEntry, HASH_REMOVE, + &foundForRemove); + Assert(foundForRemove); + + return false; + } + } + + /* locally mark that we have one connection reserved */ + hashEntry->usedReservation = false; + + return true; } @@ -442,8 +517,8 @@ IsReservationPossible(void) * the entry. */ static ReservedConnectionHashEntry * -AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId, - Oid databaseOid, bool *found) +AllocateOrGetReservedConnectionEntry(char *hostName, int nodePort, Oid userId, + Oid databaseOid, bool *found) { ReservedConnectionHashKey key; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 400fff505..c1f522ed1 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -105,6 +105,7 @@ static void SetUpDistributedTableDependencies(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); +static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); @@ -695,6 +696,17 @@ GroupForNode(char *nodeName, int nodePort) } +/* + * NodeIsPrimaryAndLocal returns whether the argument represents the local + * primary node. + */ +bool +NodeIsPrimaryAndRemote(WorkerNode *worker) +{ + return NodeIsPrimary(worker) && !NodeIsLocal(worker); +} + + /* * NodeIsPrimary returns whether the argument represents a primary node. */ @@ -713,6 +725,16 @@ NodeIsPrimary(WorkerNode *worker) } +/* + * NodeIsLocal returns whether the argument represents the local node. + */ +static bool +NodeIsLocal(WorkerNode *worker) +{ + return worker->groupId == GetLocalGroupId(); +} + + /* * NodeIsSecondary returns whether the argument represents a secondary node. */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 57b4871c8..7fbc53e32 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -392,6 +392,18 @@ ActivePrimaryNodeList(LOCKMODE lockMode) } +/* + * ActivePrimaryRemoteNodeList returns a list of all active primary nodes in + * workerNodeHash. + */ +List * +ActivePrimaryRemoteNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote); +} + + /* * NodeIsPrimaryWorker returns true if the node is a primary worker node. */ diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h index a282beac0..adec8c9c4 100644 --- a/src/include/distributed/locally_reserved_shared_connections.h +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -20,7 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, Oid databaseOid); extern void DeallocateReservedConnections(void); -extern void EnsureConnectionPossibilityForPrimaryNodes(void); +extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); +extern bool TryConnectionPossibilityForLocalPrimaryNode(void); extern bool IsReservationPossible(void); #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 047c01225..158c5a7ce 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -74,6 +74,7 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); +extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode); extern bool CoordinatorAddedAsWorkerNode(void); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); @@ -90,6 +91,7 @@ extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); +extern bool NodeIsPrimaryAndRemote(WorkerNode *worker); extern bool NodeIsPrimary(WorkerNode *worker); extern bool NodeIsSecondary(WorkerNode *worker); extern bool NodeIsReadable(WorkerNode *worker); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 16f3c37fd..bdaafa689 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1591,6 +1591,16 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 7 (1 row) +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 1: "1" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 2: "2" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 3: "3" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 6: "6" -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -1613,6 +1623,11 @@ HINT: Enable local execution via SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. HINT: Enable local execution via SET citus.enable_local_execution TO true; +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +ERROR: could not find an available connection +HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish +CONTEXT: COPY another_schema_table, line 1: "1" -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; diff --git a/src/test/regress/input/multi_mx_copy_data.source b/src/test/regress/input/multi_mx_copy_data.source index f65bc42af..22ce69e6c 100644 --- a/src/test/regress/input/multi_mx_copy_data.source +++ b/src/test/regress/input/multi_mx_copy_data.source @@ -22,6 +22,25 @@ SET search_path TO public; \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; + +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/output/multi_mx_copy_data.source b/src/test/regress/output/multi_mx_copy_data.source index 521ebca99..53a36f7dc 100644 --- a/src/test/regress/output/multi_mx_copy_data.source +++ b/src/test/regress/output/multi_mx_copy_data.source @@ -16,6 +16,84 @@ SET search_path TO public; -- and use second worker as well \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + -1 +(1 row) + +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..." +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request" +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir" +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos" +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully " +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..." +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol" +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..." +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + 50 +(1 row) + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index a837258fe..4d7343668 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -817,6 +817,9 @@ ROLLBACK; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -829,6 +832,9 @@ WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;