diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 1c82f3889..649f2e9b5 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -23,6 +23,7 @@ #include "access/htup_details.h" #include "catalog/pg_authid.h" #include "commands/dbcommands.h" +#include "distributed/backend_data.h" #include "distributed/cancel_utils.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" @@ -97,6 +98,14 @@ typedef struct SharedConnStatsHashEntry */ int MaxSharedPoolSize = 0; +/* + * Controlled via a GUC, never access directly, use GetLocalSharedPoolSize(). + * "0" means adjust LocalSharedPoolSize automatically by using MaxConnections. + * "-1" means do not use any remote connections for local tasks + * Anything else means use that number + */ +int LocalSharedPoolSize = 0; + /* the following two structs are used for accessing shared memory */ static HTAB *SharedConnStatsHash = NULL; @@ -205,6 +214,25 @@ GetMaxSharedPoolSize(void) } +/* + * GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is + * controlled via a GUC. + * "0" means adjust MaxSharedPoolSize automatically by using MaxConnections + * "-1" means do not use any remote connections for local tasks + * Anything else means use that number + */ +int +GetLocalSharedPoolSize(void) +{ + if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY) + { + return MaxConnections * 0.5; + } + + return LocalSharedPoolSize; +} + + /* * WaitLoopForSharedConnection tries to increment the shared connection * counter for the given hostname/port and the current database in @@ -270,6 +298,32 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) connKey.port = port; connKey.databaseOid = MyDatabaseId; + /* + * Handle adaptive connection management for the local node slightly different + * as local node can failover to local execution. + */ + bool connectionToLocalNode = false; + int activeBackendCount = 0; + WorkerNode *workerNode = FindWorkerNode(hostname, port); + if (workerNode) + { + connectionToLocalNode = (workerNode->groupId == GetLocalGroupId()); + if (connectionToLocalNode && + GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES) + { + /* + * This early return is required as LocalNodeParallelExecutionFactor + * is ignored for the first connection below. This check makes the + * user experience is more accurate and also makes it easy for + * having regression tests which emulates the local node adaptive + * connection management. + */ + return false; + } + + activeBackendCount = GetAllActiveClientBackendCount(); + } + LockConnectionSharedMemory(LW_EXCLUSIVE); /* @@ -300,6 +354,34 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port) counterIncremented = true; } + else if (connectionToLocalNode) + { + /* + * For local nodes, solely relying on citus.max_shared_pool_size or + * max_connections might not be sufficient. The former gives us + * a preview of the future (e.g., we let the new connections to establish, + * but they are not established yet). The latter gives us the close to + * precise view of the past (e.g., the active number of client backends). + * + * Overall, we want to limit both of the metrics. The former limit typically + * kicks in under regular loads, where the load of the database increases in + * a reasonable pace. The latter limit typically kicks in when the database + * is issued lots of concurrent sessions at the same time, such as benchmarks. + */ + if (activeBackendCount + 1 > GetLocalSharedPoolSize()) + { + counterIncremented = false; + } + else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize()) + { + counterIncremented = false; + } + else + { + connectionEntry->connectionCount++; + counterIncremented = true; + } + } else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize()) { /* there is no space left for this connection */ @@ -618,7 +700,7 @@ SharedConnectionStatsShmemInit(void) * optional connections. */ int -AdaptiveConnectionManagementFlag(int activeConnectionCount) +AdaptiveConnectionManagementFlag(bool connectToLocalNode, int activeConnectionCount) { if (UseConnectionPerPlacement()) { @@ -633,6 +715,14 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount) */ return 0; } + else if (connectToLocalNode) + { + /* + * Connection to local node is always optional because the executor is capable + * of falling back to local execution. + */ + return OPTIONAL_CONNECTION; + } else if (ShouldWaitForConnection(activeConnectionCount)) { /* diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index b36cf93ae..8ad9d7e11 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -298,6 +298,25 @@ typedef struct DistributedExecution } DistributedExecution; +/* + * WorkerPoolFailureState indicates the current state of the + * pool. + */ +typedef enum WorkerPoolFailureState +{ + /* safe to continue execution*/ + WORKER_POOL_NOT_FAILED, + + /* if a pool fails, the execution fails */ + WORKER_POOL_FAILED, + + /* + * The remote execution over the pool failed, but we failed over + * to the local execution and still finish the execution. + */ + WORKER_POOL_FAILED_OVER_TO_LOCAL +} WorkerPoolFailureState; + /* * WorkerPool represents a pool of sessions on the same worker. * @@ -383,11 +402,17 @@ typedef struct WorkerPool /* maximum number of connections we are allowed to open at once */ uint32 maxNewConnectionsPerCycle; + /* + * Set to true if the pool is to local node. We use this value to + * avoid re-calculating often. + */ + bool poolToLocalNode; + /* * This is only set in WorkerPoolFailed() function. Once a pool fails, we do not * use it anymore. */ - bool failed; + WorkerPoolFailureState failureState; } WorkerPool; struct TaskPlacementExecution; @@ -457,7 +482,8 @@ typedef enum TaskExecutionState { TASK_EXECUTION_NOT_FINISHED, TASK_EXECUTION_FINISHED, - TASK_EXECUTION_FAILED + TASK_EXECUTION_FAILED, + TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION } TaskExecutionState; /* @@ -515,6 +541,7 @@ typedef enum TaskPlacementExecutionState PLACEMENT_EXECUTION_READY, PLACEMENT_EXECUTION_RUNNING, PLACEMENT_EXECUTION_FINISHED, + PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION, PLACEMENT_EXECUTION_FAILED } TaskPlacementExecutionState; @@ -582,7 +609,6 @@ static void StartDistributedExecution(DistributedExecution *execution); static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution); static void RunDistributedExecution(DistributedExecution *execution); static void SequentialRunDistributedExecution(DistributedExecution *execution); - static void FinishDistributedExecution(DistributedExecution *execution); static void CleanUpSessions(DistributedExecution *execution); @@ -634,6 +660,8 @@ static void PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeeded); static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution, bool succeeded); +static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution * + placementExecution); static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution); static void PlacementExecutionReady(TaskPlacementExecution *placementExecution); static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution * @@ -1080,7 +1108,11 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, } else { - execution->remoteTaskList = execution->remoteAndLocalTaskList; + /* + * Get a shallow copy of the list as we rely on remoteAndLocalTaskList + * across the execution. + */ + execution->remoteTaskList = list_copy(execution->remoteAndLocalTaskList); } execution->totalTaskCount = list_length(execution->remoteTaskList); @@ -1630,7 +1662,6 @@ CleanUpSessions(DistributedExecution *execution) * changing any states in the ConnectionStateMachine. * */ - CloseConnection(connection); } else if (connection->connectionState == MULTI_CONNECTION_CONNECTED) @@ -1708,8 +1739,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) RowModifyLevel modLevel = execution->modLevel; List *taskList = execution->remoteTaskList; - int32 localGroupId = GetLocalGroupId(); - Task *task = NULL; foreach_ptr(task, taskList) { @@ -1849,11 +1878,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) */ placementExecutionReady = false; } - - if (taskPlacement->groupId == localGroupId) - { - SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); - } } } @@ -2020,6 +2044,13 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node workerPool->nodeName = pstrdup(nodeName); workerPool->nodePort = nodePort; + WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort); + if (workerNode) + { + workerPool->poolToLocalNode = + workerNode->groupId == GetLocalGroupId(); + } + /* "open" connections aggressively when there are cached connections */ int nodeConnectionCount = MaxCachedConnectionsPerWorker; workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount); @@ -2142,13 +2173,11 @@ SequentialRunDistributedExecution(DistributedExecution *execution) * executions, so make sure to set it. */ MultiShardConnectionType = SEQUENTIAL_CONNECTION; - Task *taskToExecute = NULL; foreach_ptr(taskToExecute, taskList) { - /* execute each task one by one */ execution->remoteAndLocalTaskList = list_make1(taskToExecute); - execution->remoteTaskList = execution->remoteAndLocalTaskList; + execution->remoteTaskList = list_make1(taskToExecute); execution->totalTaskCount = 1; execution->unfinishedTaskCount = 1; @@ -2201,15 +2230,21 @@ RunDistributedExecution(DistributedExecution *execution) while (execution->unfinishedTaskCount > 0 && !cancellationReceived) { - long timeout = NextEventTimeout(execution); - WorkerPool *workerPool = NULL; foreach_ptr(workerPool, execution->workerList) { ManageWorkerPool(workerPool); } - if (execution->rebuildWaitEventSet) + if (execution->remoteTaskList == NIL) + { + /* + * All the tasks are failed over to the local execution, no need + * to wait for any connection activity. + */ + continue; + } + else if (execution->rebuildWaitEventSet) { if (events != NULL) { @@ -2221,7 +2256,6 @@ RunDistributedExecution(DistributedExecution *execution) events = NULL; } eventSetSize = RebuildWaitEventSet(execution); - events = palloc0(eventSetSize * sizeof(WaitEvent)); } else if (execution->waitFlagsChanged) @@ -2231,6 +2265,7 @@ RunDistributedExecution(DistributedExecution *execution) } /* wait for I/O events */ + long timeout = NextEventTimeout(execution); int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events, eventSetSize, WAIT_EVENT_CLIENT_READ); ProcessWaitEvents(execution, events, eventCount, &cancellationReceived); @@ -2369,7 +2404,6 @@ ManageWorkerPool(WorkerPool *workerPool) } int newConnectionCount = CalculateNewConnectionCount(workerPool); - if (newConnectionCount <= 0) { return; @@ -2377,6 +2411,46 @@ ManageWorkerPool(WorkerPool *workerPool) OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties); + /* + * Cannot establish new connections to the local host, most probably because the + * local node cannot accept new connections (e.g., hit max_connections). Switch + * the tasks to the local execution. + * + * We prefer initiatedConnectionCount over the new connection establishments happen + * in this iteration via OpenNewConnections(). The reason is that it is expected for + * OpenNewConnections() to not open any new connections as long as the connections + * are optional (e.g., the second or later connections in the pool). But, for + * initiatedConnectionCount to be zero, the connection to the local pool should have + * been failed. + */ + int initiatedConnectionCount = list_length(workerPool->sessionList); + if (initiatedConnectionCount == 0) + { + /* + * Only the pools to the local node are allowed to have optional + * connections for the first connection. Hence, initiatedConnectionCount + * could only be zero for poolToLocalNode. For other pools, the connection + * manager would wait until it gets at least one connection. + */ + Assert(workerPool->poolToLocalNode); + + WorkerPoolFailed(workerPool); + + if (execution->failed) + { + ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg( + "could not establish any connections to the node %s:%d " + "when local execution is also disabled.", + workerPool->nodeName, + workerPool->nodePort), + errhint("Enable local execution via SET " + "citus.enable_local_execution TO true;"))); + } + + return; + } + INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime); execution->rebuildWaitEventSet = true; } @@ -2389,7 +2463,8 @@ ManageWorkerPool(WorkerPool *workerPool) static bool HasAnyConnectionFailure(WorkerPool *workerPool) { - if (workerPool->failed) + if (workerPool->failureState == WORKER_POOL_FAILED || + workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL) { /* connection pool failed */ return true; @@ -2520,7 +2595,6 @@ CalculateNewConnectionCount(WorkerPool *workerPool) * than the target pool size. */ newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount); - if (newConnectionCount > 0) { /* increase the open rate every cycle (like TCP slow start) */ @@ -2557,7 +2631,8 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount, * throttle connections if citus.max_shared_pool_size reached) */ int adaptiveConnectionManagementFlag = - AdaptiveConnectionManagementFlag(list_length(workerPool->sessionList)); + AdaptiveConnectionManagementFlag(workerPool->poolToLocalNode, + list_length(workerPool->sessionList)); connectionFlags |= adaptiveConnectionManagementFlag; /* open a new connection to the worker */ @@ -2660,13 +2735,24 @@ CheckConnectionTimeout(WorkerPool *workerPool) */ WorkerPoolFailed(workerPool); - /* - * The enforcement is not always erroring out. For example, if a SELECT task - * has two different placements, we'd warn the user, fail the pool and continue - * with the next placement. - */ - if (execution->transactionProperties->errorOnAnyFailure || execution->failed) + if (workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL) { + /* + * + * When the pool is failed over to local execution, warning + * the user just creates chatter as the executor is capable of + * finishing the execution. + */ + logLevel = DEBUG1; + } + else if (execution->transactionProperties->errorOnAnyFailure || + execution->failed) + { + /* + * The enforcement is not always erroring out. For example, if a SELECT task + * has two different placements, we'd warn the user, fail the pool and continue + * with the next placement. + */ logLevel = ERROR; } @@ -2758,7 +2844,7 @@ NextEventTimeout(DistributedExecution *execution) WorkerPool *workerPool = NULL; foreach_ptr(workerPool, execution->workerList) { - if (workerPool->failed) + if (workerPool->failureState == WORKER_POOL_FAILED) { /* worker pool may have already timed out */ continue; @@ -2974,14 +3060,28 @@ ConnectionStateMachine(WorkerSession *session) * or WorkerPoolFailed. */ if (execution->failed || - execution->transactionProperties->errorOnAnyFailure) + (execution->transactionProperties->errorOnAnyFailure && + workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)) { /* a task has failed due to this connection failure */ ReportConnectionError(connection, ERROR); } + else if (workerPool->activeConnectionCount > 0 || + workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL) + { + /* + * We already have active connection(s) to the node, and the + * executor is capable of using those connections to successfully + * finish the execution. So, there is not much value in warning + * the user. + * + * Similarly when the pool is failed over to local execution, warning + * the user just creates chatter. + */ + ReportConnectionError(connection, DEBUG1); + } else { - /* can continue with the remaining nodes */ ReportConnectionError(connection, WARNING); } @@ -3558,6 +3658,17 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution, if (querySent) { session->commandsSent++; + + if (workerPool->poolToLocalNode) + { + /* + * As we started remote execution to the local node, + * we cannot switch back to local execution as that + * would cause self-deadlocks and breaking + * read-your-own-writes consistency. + */ + SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); + } } return querySent; @@ -4004,7 +4115,8 @@ WorkerPoolFailed(WorkerPool *workerPool) * A pool cannot fail multiple times, the necessary actions * has already be taken, so bail out. */ - if (workerPool->failed) + if (workerPool->failureState == WORKER_POOL_FAILED || + workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL) { return; } @@ -4033,7 +4145,11 @@ WorkerPoolFailed(WorkerPool *workerPool) /* we do not want more connections in this pool */ workerPool->readyTaskCount = 0; - workerPool->failed = true; + if (workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL) + { + /* we prefer not to override WORKER_POOL_FAILED_OVER_TO_LOCAL */ + workerPool->failureState = WORKER_POOL_FAILED; + } /* * The reason is that when replication factor is > 1 and we are performing @@ -4050,7 +4166,8 @@ WorkerPoolFailed(WorkerPool *workerPool) foreach_ptr(pool, workerList) { /* failed pools or pools without any connection attempts ignored */ - if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime)) + if (pool->failureState == WORKER_POOL_FAILED || + INSTR_TIME_IS_ZERO(pool->poolStartTime)) { continue; } @@ -4126,11 +4243,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede return; } - /* mark the placement execution as finished */ if (succeeded) { + /* mark the placement execution as finished */ placementExecution->executionState = PLACEMENT_EXECUTION_FINISHED; } + else if (CanFailoverPlacementExecutionToLocalExecution(placementExecution)) + { + /* + * The placement execution can be done over local execution, so it is a soft + * failure for now. + */ + placementExecution->executionState = + PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION; + } else { if (ShouldMarkPlacementsInvalidOnFailure(execution)) @@ -4174,13 +4300,36 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede * Update unfinishedTaskCount only when state changes from not finished to * finished or failed state. */ - TaskExecutionState newExecutionState = TaskExecutionStateMachine( - shardCommandExecution); + TaskExecutionState newExecutionState = + TaskExecutionStateMachine(shardCommandExecution); if (newExecutionState == TASK_EXECUTION_FINISHED) { execution->unfinishedTaskCount--; return; } + else if (newExecutionState == TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION) + { + execution->unfinishedTaskCount--; + + /* move the task to the local execution */ + Task *task = shardCommandExecution->task; + execution->localTaskList = lappend(execution->localTaskList, task); + + /* remove the task from the remote execution list */ + execution->remoteTaskList = list_delete_ptr(execution->remoteTaskList, task); + + /* + * As we decided to failover this task to local execution, we cannot + * allow remote execution to this pool during this distributedExecution. + */ + SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED); + workerPool->failureState = WORKER_POOL_FAILED_OVER_TO_LOCAL; + + ereport(DEBUG4, (errmsg("Task %d execution is failed over to local execution", + task->taskId))); + + return; + } else if (newExecutionState == TASK_EXECUTION_FAILED) { execution->unfinishedTaskCount--; @@ -4199,6 +4348,60 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede } +/* + * CanFailoverPlacementExecutionToLocalExecution returns true if the input + * TaskPlacementExecution can be fail overed to local execution. In other words, + * the execution can be deferred to local execution. + */ +static bool +CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementExecution) +{ + if (!EnableLocalExecution) + { + /* the user explicitly disabled local execution */ + return false; + } + + if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) + { + /* + * If the current transaction accessed the local node over a connection + * then we can't use local execution because of visibility issues. + */ + return false; + } + + WorkerPool *workerPool = placementExecution->workerPool; + if (!workerPool->poolToLocalNode) + { + /* we can only fail over tasks to local execution for local pools */ + return false; + } + + if (workerPool->activeConnectionCount > 0) + { + /* + * The pool has already active connections, the executor is capable + * of using those active connections. So, no need to failover + * to the local execution. + */ + return false; + } + + if (placementExecution->assignedSession != NULL) + { + /* + * If the placement execution has been assigned to a specific session, + * it has to be executed over that session. Otherwise, it would cause + * self-deadlocks and break read-your-own-writes consistency. + */ + return false; + } + + return true; +} + + /* * ScheduleNextPlacementExecution is triggered if the query needs to be * executed on any or all placements in order and there is a placement on @@ -4361,6 +4564,7 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution) PlacementExecutionOrder executionOrder = shardCommandExecution->executionOrder; int donePlacementCount = 0; int failedPlacementCount = 0; + int failedOverPlacementCount = 0; int placementCount = 0; int placementExecutionIndex = 0; int placementExecutionCount = shardCommandExecution->placementExecutionCount; @@ -4386,6 +4590,10 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution) { failedPlacementCount++; } + else if (executionState == PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION) + { + failedOverPlacementCount++; + } placementCount++; } @@ -4402,6 +4610,21 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution) { currentTaskExecutionState = TASK_EXECUTION_FINISHED; } + else if (failedOverPlacementCount + donePlacementCount + failedPlacementCount == + placementCount) + { + /* + * For any given task, we could have 3 end states: + * - "donePlacementCount" indicates the successful placement executions + * - "failedPlacementCount" indicates the failed placement executions + * - "failedOverPlacementCount" indicates the placement executions that + * are failed when using remote execution due to connection errors, + * but there is still a possibility of being successful via + * local execution. So, for now they are considered as soft + * errors. + */ + currentTaskExecutionState = TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION; + } else { currentTaskExecutionState = TASK_EXECUTION_NOT_FINISHED; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d51de7202..6c4de8fe5 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -113,6 +113,7 @@ static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSourc static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); static const char * MaxSharedPoolSizeGucShowHook(void); +static const char * LocalPoolSizeGucShowHook(void); static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source); static void CitusAuthHook(Port *port, int status); @@ -693,6 +694,21 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomIntVariable( + "citus.local_shared_pool_size", + gettext_noop( + "Sets the maximum number of connections allowed for the shards on the " + "local node across all the backends from this node. Setting to -1 disables " + "connections throttling. Setting to 0 makes it auto-adjust, meaning " + "equal to the half of max_connections on the coordinator."), + gettext_noop("As a rule of thumb, the value should be at most equal to the " + "max_connections on the local node."), + &LocalSharedPoolSize, + 0, -1, INT_MAX, + PGC_SIGHUP, + GUC_SUPERUSER_ONLY, + NULL, NULL, LocalPoolSizeGucShowHook); + DefineCustomBoolVariable( "citus.log_multi_join_order", gettext_noop("Logs the distributed join order to the server log."), @@ -1764,6 +1780,21 @@ MaxSharedPoolSizeGucShowHook(void) } +/* + * LocalPoolSizeGucShowHook overrides the value that is shown to the + * user when the default value has not been set. + */ +static const char * +LocalPoolSizeGucShowHook(void) +{ + StringInfo newvalue = makeStringInfo(); + + appendStringInfo(newvalue, "%d", GetLocalSharedPoolSize()); + + return (const char *) newvalue->data; +} + + static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source) { diff --git a/src/include/distributed/shared_connection_stats.h b/src/include/distributed/shared_connection_stats.h index 9c31fdcf6..1efb37d28 100644 --- a/src/include/distributed/shared_connection_stats.h +++ b/src/include/distributed/shared_connection_stats.h @@ -13,19 +13,23 @@ #define ADJUST_POOLSIZE_AUTOMATICALLY 0 #define DISABLE_CONNECTION_THROTTLING -1 +#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1 extern int MaxSharedPoolSize; +extern int LocalSharedPoolSize; extern void InitializeSharedConnectionStats(void); extern void WaitForSharedConnection(void); extern void WakeupWaiterBackendsForSharedConnection(void); extern int GetMaxSharedPoolSize(void); +extern int GetLocalSharedPoolSize(void); extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port); extern void WaitLoopForSharedConnection(const char *hostname, int port); extern void DecrementSharedConnectionCounter(const char *hostname, int port); extern void IncrementSharedConnectionCounter(const char *hostname, int port); -extern int AdaptiveConnectionManagementFlag(int activeConnectionCount); +extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int + activeConnectionCount); #endif /* SHARED_CONNECTION_STATS_H */ diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out index 4065b648a..bc1cfca00 100644 --- a/src/test/regress/expected/failure_distributed_results.out +++ b/src/test/regress/expected/failure_distributed_results.out @@ -81,11 +81,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_ (1 row) BEGIN; +SET LOCAL client_min_messages TO DEBUG1; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') NATURAL JOIN pg_dist_node; -WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly +DEBUG: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. SELECT * FROM distributed_result_info ORDER BY resultId; diff --git a/src/test/regress/expected/failure_failover_to_local_execution.out b/src/test/regress/expected/failure_failover_to_local_execution.out new file mode 100644 index 000000000..5427ca9ae --- /dev/null +++ b/src/test/regress/expected/failure_failover_to_local_execution.out @@ -0,0 +1,120 @@ +CREATE SCHEMA failure_failover_to_local_execution; +SET search_path TO failure_failover_to_local_execution; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.next_shard_id TO 1980000; +-- on Citus-mx, we can have +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; +CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10)); +SELECT create_distributed_table('failover_to_local', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +\c - - - :worker_2_port +SET search_path TO failure_failover_to_local_execution; +-- we don't want any cached connections +SET citus.max_cached_conns_per_worker to 0; +INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i; +-- even if the connection establishment fails, Citus can +-- failover to local exection +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.delay(500)'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG1; +SELECT count(*) FROM failover_to_local; +DEBUG: could not establish any connections to the node localhost:xxxxx after 400 ms +NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + +RESET client_min_messages; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +-- if the remote query execution fails, Citus +-- does not try to fallback to local execution +SELECT key / 0 FROM failover_to_local; +ERROR: division by zero +CONTEXT: while executing command on localhost:xxxxx +-- if the local execution is disabled, Citus does +-- not try to fallback to local execution +SET citus.enable_local_execution TO false; +SELECT citus.mitmproxy('conn.delay(500)'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.log_local_commands TO ON; +SELECT count(*) FROM failover_to_local; +ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +RESET citus.enable_local_execution; +-- even if we are on a multi-shard command, Citus can +-- failover to the local execution if no connection attempts succeed +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SET citus.log_remote_commands TO ON; +SELECT count(*) FROM failover_to_local; +NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980001 failover_to_local WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980003 failover_to_local WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. +NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true + count +--------------------------------------------------------------------- + 21 +(1 row) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA failure_failover_to_local_execution CASCADE; diff --git a/src/test/regress/expected/failure_test_helpers.out b/src/test/regress/expected/failure_test_helpers.out index a66749dff..8c2be9825 100644 --- a/src/test/regress/expected/failure_test_helpers.out +++ b/src/test/regress/expected/failure_test_helpers.out @@ -48,3 +48,42 @@ BEGIN RETURN QUERY SELECT * FROM mitmproxy_result; END; $$ LANGUAGE plpgsql; +\c - - - :worker_2_port +-- Add some helper functions for sending commands to mitmproxy +CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$ +DECLARE + command ALIAS FOR $1; +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES (command); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; +CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$ +BEGIN + PERFORM citus.mitmproxy('recorder.reset()'); + RETURN; -- return void +END; +$$ LANGUAGE plpgsql; +CREATE FUNCTION citus.dump_network_traffic() +RETURNS TABLE(conn int, source text, message text) AS $$ +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result ( + conn int, source text, message text + ) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES ('recorder.dump()'); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index a1250150a..83050929f 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -49,7 +49,7 @@ SELECT create_reference_table('ref'); (1 row) CREATE TABLE local(c int, d int); -CREATE TABLE public.another_schema_table(a int); +CREATE TABLE public.another_schema_table(a int, b int); SELECT create_distributed_table('public.another_schema_table', 'a'); create_distributed_table --------------------------------------------------------------------- @@ -265,37 +265,37 @@ BEGIN; ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- single shard insert with citus_table_size + -- single shard insert with citus_table_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_table_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- multi shard modification with citus_table_size + -- multi shard modification with citus_table_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_table_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- single shard insert with citus_relation_size + -- single shard insert with citus_relation_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- multi shard modification with citus_relation_size + -- multi shard modification with citus_relation_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- single shard insert with citus_total_relation_size + -- single shard insert with citus_total_relation_size INSERT INTO test_citus_size_func VALUES (3); SELECT citus_total_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications ROLLBACK; BEGIN; - -- multi shard modification with citus_total_relation_size + -- multi shard modification with citus_total_relation_size INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func; SELECT citus_total_relation_size('test_citus_size_func'); ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications @@ -1097,9 +1097,9 @@ SELECT pg_catalog.get_all_active_client_backend_count(); (1 row) BEGIN; - SET citus.shard_count TO 32; - SET citus.force_max_query_parallelization TO ON; - SET citus.enable_local_execution TO false; + SET LOCAL citus.shard_count TO 32; + SET LOCAL citus.force_max_query_parallelization TO ON; + SET LOCAL citus.enable_local_execution TO false; CREATE TABLE test (a int); SET citus.shard_replication_factor TO 1; SELECT create_distributed_table('test', 'a'); @@ -1122,11 +1122,101 @@ BEGIN; (1 row) ROLLBACK; +\c - - - :master_port +SET search_path TO single_node; +-- simulate that even if there is no connection slots +-- to connect, Citus can switch to local execution +SET citus.force_max_query_parallelization TO false; +SET citus.log_remote_commands TO ON; +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SET citus.executor_slow_start_interval TO 10; +SELECT count(*) from another_schema_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630509 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630510 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + +UPDATE another_schema_table SET b = b; +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630509 another_schema_table SET b = b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630510 another_schema_table SET b = b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b +NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b +-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning +-- not that we ignore INSERT .. SELECT via coordinator as it relies on +-- COPY command +INSERT INTO another_schema_table SELECT * FROM another_schema_table; +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE (a IS NOT NULL) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE (a IS NOT NULL) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL) +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0 +-- multi-row INSERTs +INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) VALUES (1,1), (5,5) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6) +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2) +-- intermediate results +WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) + SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 7 +(1 row) + +-- if the local execution is disabled, we cannot failover to +-- local execution and the queries would fail +SET citus.enable_local_execution TO false; +SELECT count(*) from another_schema_table; +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; +UPDATE another_schema_table SET b = b; +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; +INSERT INTO another_schema_table SELECT * FROM another_schema_table; +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; +WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) + SELECT count(*) FROM cte_1; +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; +INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. +HINT: Enable local execution via SET citus.enable_local_execution TO true; -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.local_shared_pool_size; SELECT pg_reload_conf(); pg_reload_conf --------------------------------------------------------------------- diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index e7d183399..d82d5236f 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -35,6 +35,9 @@ test: failure_multi_row_insert test: failure_mx_metadata_sync test: failure_connection_establishment +# this test syncs metadata to the workers +test: failure_failover_to_local_execution + # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak diff --git a/src/test/regress/sql/failure_distributed_results.sql b/src/test/regress/sql/failure_distributed_results.sql index 6f6e12805..95e4d5513 100644 --- a/src/test/regress/sql/failure_distributed_results.sql +++ b/src/test/regress/sql/failure_distributed_results.sql @@ -50,6 +50,7 @@ ROLLBACk; -- with failure, results from 100802 should be retried and succeed on 57637 SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()'); BEGIN; +SET LOCAL client_min_messages TO DEBUG1; CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') diff --git a/src/test/regress/sql/failure_failover_to_local_execution.sql b/src/test/regress/sql/failure_failover_to_local_execution.sql new file mode 100644 index 000000000..c43f2b41d --- /dev/null +++ b/src/test/regress/sql/failure_failover_to_local_execution.sql @@ -0,0 +1,58 @@ +CREATE SCHEMA failure_failover_to_local_execution; +SET search_path TO failure_failover_to_local_execution; +SELECT citus.mitmproxy('conn.allow()'); + +SET citus.next_shard_id TO 1980000; + +-- on Citus-mx, we can have +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + +SET citus.replication_model TO 'streaming'; +SET citus.shard_replication_factor TO 1; +CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10)); +SELECT create_distributed_table('failover_to_local', 'key'); + +\c - - - :worker_2_port +SET search_path TO failure_failover_to_local_execution; + +-- we don't want any cached connections +SET citus.max_cached_conns_per_worker to 0; +INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i; + + +-- even if the connection establishment fails, Citus can +-- failover to local exection +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.log_local_commands TO ON; +SET client_min_messages TO DEBUG1; +SELECT count(*) FROM failover_to_local; +RESET client_min_messages; +SELECT citus.mitmproxy('conn.allow()'); + +-- if the remote query execution fails, Citus +-- does not try to fallback to local execution +SELECT key / 0 FROM failover_to_local; + +-- if the local execution is disabled, Citus does +-- not try to fallback to local execution +SET citus.enable_local_execution TO false; +SELECT citus.mitmproxy('conn.delay(500)'); +SET citus.log_local_commands TO ON; +SELECT count(*) FROM failover_to_local; +SELECT citus.mitmproxy('conn.allow()'); +RESET citus.enable_local_execution; + +-- even if we are on a multi-shard command, Citus can +-- failover to the local execution if no connection attempts succeed +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); +SET citus.log_remote_commands TO ON; +SELECT count(*) FROM failover_to_local; + +SELECT citus.mitmproxy('conn.allow()'); + +\c - - - :master_port +SET client_min_messages TO ERROR; +DROP SCHEMA failure_failover_to_local_execution CASCADE; + diff --git a/src/test/regress/sql/failure_test_helpers.sql b/src/test/regress/sql/failure_test_helpers.sql index 13a9ea24a..7053905ac 100644 --- a/src/test/regress/sql/failure_test_helpers.sql +++ b/src/test/regress/sql/failure_test_helpers.sql @@ -47,3 +47,47 @@ BEGIN RETURN QUERY SELECT * FROM mitmproxy_result; END; $$ LANGUAGE plpgsql; + +\c - - - :worker_2_port + +-- Add some helper functions for sending commands to mitmproxy + +CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$ +DECLARE + command ALIAS FOR $1; +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES (command); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; + +CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$ +BEGIN + PERFORM citus.mitmproxy('recorder.reset()'); + RETURN; -- return void +END; +$$ LANGUAGE plpgsql; + +CREATE FUNCTION citus.dump_network_traffic() +RETURNS TABLE(conn int, source text, message text) AS $$ +BEGIN + CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP; + CREATE TEMPORARY TABLE mitmproxy_result ( + conn int, source text, message text + ) ON COMMIT DROP; + + INSERT INTO mitmproxy_command VALUES ('recorder.dump()'); + + EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo')); + EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo')); + + RETURN QUERY SELECT * FROM mitmproxy_result; +END; +$$ LANGUAGE plpgsql; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 1e78c3030..ce8c82cbd 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -30,7 +30,7 @@ CREATE TABLE ref(a int, b int); SELECT create_reference_table('ref'); CREATE TABLE local(c int, d int); -CREATE TABLE public.another_schema_table(a int); +CREATE TABLE public.another_schema_table(a int, b int); SELECT create_distributed_table('public.another_schema_table', 'a'); -- Confirm the basics work @@ -611,9 +611,9 @@ SET search_path TO single_node; SELECT count(*) from should_commit; SELECT pg_catalog.get_all_active_client_backend_count(); BEGIN; - SET citus.shard_count TO 32; - SET citus.force_max_query_parallelization TO ON; - SET citus.enable_local_execution TO false; + SET LOCAL citus.shard_count TO 32; + SET LOCAL citus.force_max_query_parallelization TO ON; + SET LOCAL citus.enable_local_execution TO false; CREATE TABLE test (a int); SET citus.shard_replication_factor TO 1; @@ -624,11 +624,54 @@ BEGIN; SELECT pg_catalog.get_all_active_client_backend_count(); ROLLBACK; + +\c - - - :master_port +SET search_path TO single_node; + + +-- simulate that even if there is no connection slots +-- to connect, Citus can switch to local execution +SET citus.force_max_query_parallelization TO false; +SET citus.log_remote_commands TO ON; +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +SET citus.executor_slow_start_interval TO 10; +SELECT count(*) from another_schema_table; + +UPDATE another_schema_table SET b = b; + +-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning +-- not that we ignore INSERT .. SELECT via coordinator as it relies on +-- COPY command +INSERT INTO another_schema_table SELECT * FROM another_schema_table; +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; + +-- multi-row INSERTs +INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); + +-- intermediate results +WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) + SELECT count(*) FROM cte_1; + +-- if the local execution is disabled, we cannot failover to +-- local execution and the queries would fail +SET citus.enable_local_execution TO false; +SELECT count(*) from another_schema_table; +UPDATE another_schema_table SET b = b; +INSERT INTO another_schema_table SELECT * FROM another_schema_table; +INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table; +WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) + SELECT count(*) FROM cte_1; + +INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); + -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.recover_2pc_interval; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; +ALTER SYSTEM RESET citus.local_shared_pool_size; SELECT pg_reload_conf(); -- suppress notices