mirror of https://github.com/citusdata/citus.git
Local node connection management
When Citus needs to parallelize queries on the local node (e.g., the node executing the distributed query and the shards are the same), we need to be mindful about the connection management. The reason is that the client backends that are running distributed queries are competing with the client backends that Citus initiates to parallelize the queries in order to get a slot on the max_connections. In that regard, we implemented a "failover" mechanism where if the distributed queries cannot get a connection, the execution failovers the tasks to the local execution. The failover logic is follows: - As the connection manager if it is OK to get a connection - If yes, we are good. - If no, we fail the workerPool and the failure triggers the failover of the tasks to local execution queue The decision of getting a connection is follows: /* * For local nodes, solely relying on citus.max_shared_pool_size or * max_connections might not be sufficient. The former gives us * a preview of the future (e.g., we let the new connections to establish, * but they are not established yet). The latter gives us the close to * precise view of the past (e.g., the active number of client backends). * * Overall, we want to limit both of the metrics. The former limit typically * kics in under regular loads, where the load of the database increases in * a reasonable pace. The latter limit typically kicks in when the database * is issued lots of concurrent sessions at the same time, such as benchmarks. */pull/4338/head
parent
c2f60b6422
commit
c546ec5e78
|
@ -23,6 +23,7 @@
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "catalog/pg_authid.h"
|
#include "catalog/pg_authid.h"
|
||||||
#include "commands/dbcommands.h"
|
#include "commands/dbcommands.h"
|
||||||
|
#include "distributed/backend_data.h"
|
||||||
#include "distributed/cancel_utils.h"
|
#include "distributed/cancel_utils.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
@ -97,6 +98,14 @@ typedef struct SharedConnStatsHashEntry
|
||||||
*/
|
*/
|
||||||
int MaxSharedPoolSize = 0;
|
int MaxSharedPoolSize = 0;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
|
||||||
|
* "0" means adjust LocalSharedPoolSize automatically by using MaxConnections.
|
||||||
|
* "-1" means do not use any remote connections for local tasks
|
||||||
|
* Anything else means use that number
|
||||||
|
*/
|
||||||
|
int LocalSharedPoolSize = 0;
|
||||||
|
|
||||||
|
|
||||||
/* the following two structs are used for accessing shared memory */
|
/* the following two structs are used for accessing shared memory */
|
||||||
static HTAB *SharedConnStatsHash = NULL;
|
static HTAB *SharedConnStatsHash = NULL;
|
||||||
|
@ -205,6 +214,25 @@ GetMaxSharedPoolSize(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is
|
||||||
|
* controlled via a GUC.
|
||||||
|
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections
|
||||||
|
* "-1" means do not use any remote connections for local tasks
|
||||||
|
* Anything else means use that number
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
GetLocalSharedPoolSize(void)
|
||||||
|
{
|
||||||
|
if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
|
||||||
|
{
|
||||||
|
return MaxConnections * 0.5;
|
||||||
|
}
|
||||||
|
|
||||||
|
return LocalSharedPoolSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WaitLoopForSharedConnection tries to increment the shared connection
|
* WaitLoopForSharedConnection tries to increment the shared connection
|
||||||
* counter for the given hostname/port and the current database in
|
* counter for the given hostname/port and the current database in
|
||||||
|
@ -270,6 +298,32 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
|
||||||
connKey.port = port;
|
connKey.port = port;
|
||||||
connKey.databaseOid = MyDatabaseId;
|
connKey.databaseOid = MyDatabaseId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Handle adaptive connection management for the local node slightly different
|
||||||
|
* as local node can failover to local execution.
|
||||||
|
*/
|
||||||
|
bool connectionToLocalNode = false;
|
||||||
|
int activeBackendCount = 0;
|
||||||
|
WorkerNode *workerNode = FindWorkerNode(hostname, port);
|
||||||
|
if (workerNode)
|
||||||
|
{
|
||||||
|
connectionToLocalNode = (workerNode->groupId == GetLocalGroupId());
|
||||||
|
if (connectionToLocalNode &&
|
||||||
|
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* This early return is required as LocalNodeParallelExecutionFactor
|
||||||
|
* is ignored for the first connection below. This check makes the
|
||||||
|
* user experience is more accurate and also makes it easy for
|
||||||
|
* having regression tests which emulates the local node adaptive
|
||||||
|
* connection management.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
activeBackendCount = GetAllActiveClientBackendCount();
|
||||||
|
}
|
||||||
|
|
||||||
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
LockConnectionSharedMemory(LW_EXCLUSIVE);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -300,6 +354,34 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
|
||||||
|
|
||||||
counterIncremented = true;
|
counterIncremented = true;
|
||||||
}
|
}
|
||||||
|
else if (connectionToLocalNode)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For local nodes, solely relying on citus.max_shared_pool_size or
|
||||||
|
* max_connections might not be sufficient. The former gives us
|
||||||
|
* a preview of the future (e.g., we let the new connections to establish,
|
||||||
|
* but they are not established yet). The latter gives us the close to
|
||||||
|
* precise view of the past (e.g., the active number of client backends).
|
||||||
|
*
|
||||||
|
* Overall, we want to limit both of the metrics. The former limit typically
|
||||||
|
* kicks in under regular loads, where the load of the database increases in
|
||||||
|
* a reasonable pace. The latter limit typically kicks in when the database
|
||||||
|
* is issued lots of concurrent sessions at the same time, such as benchmarks.
|
||||||
|
*/
|
||||||
|
if (activeBackendCount + 1 > GetLocalSharedPoolSize())
|
||||||
|
{
|
||||||
|
counterIncremented = false;
|
||||||
|
}
|
||||||
|
else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize())
|
||||||
|
{
|
||||||
|
counterIncremented = false;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
connectionEntry->connectionCount++;
|
||||||
|
counterIncremented = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
|
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
|
||||||
{
|
{
|
||||||
/* there is no space left for this connection */
|
/* there is no space left for this connection */
|
||||||
|
@ -618,7 +700,7 @@ SharedConnectionStatsShmemInit(void)
|
||||||
* optional connections.
|
* optional connections.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
AdaptiveConnectionManagementFlag(int activeConnectionCount)
|
AdaptiveConnectionManagementFlag(bool connectToLocalNode, int activeConnectionCount)
|
||||||
{
|
{
|
||||||
if (UseConnectionPerPlacement())
|
if (UseConnectionPerPlacement())
|
||||||
{
|
{
|
||||||
|
@ -633,6 +715,14 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount)
|
||||||
*/
|
*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
else if (connectToLocalNode)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Connection to local node is always optional because the executor is capable
|
||||||
|
* of falling back to local execution.
|
||||||
|
*/
|
||||||
|
return OPTIONAL_CONNECTION;
|
||||||
|
}
|
||||||
else if (ShouldWaitForConnection(activeConnectionCount))
|
else if (ShouldWaitForConnection(activeConnectionCount))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -298,6 +298,25 @@ typedef struct DistributedExecution
|
||||||
} DistributedExecution;
|
} DistributedExecution;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WorkerPoolFailureState indicates the current state of the
|
||||||
|
* pool.
|
||||||
|
*/
|
||||||
|
typedef enum WorkerPoolFailureState
|
||||||
|
{
|
||||||
|
/* safe to continue execution*/
|
||||||
|
WORKER_POOL_NOT_FAILED,
|
||||||
|
|
||||||
|
/* if a pool fails, the execution fails */
|
||||||
|
WORKER_POOL_FAILED,
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The remote execution over the pool failed, but we failed over
|
||||||
|
* to the local execution and still finish the execution.
|
||||||
|
*/
|
||||||
|
WORKER_POOL_FAILED_OVER_TO_LOCAL
|
||||||
|
} WorkerPoolFailureState;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* WorkerPool represents a pool of sessions on the same worker.
|
* WorkerPool represents a pool of sessions on the same worker.
|
||||||
*
|
*
|
||||||
|
@ -383,11 +402,17 @@ typedef struct WorkerPool
|
||||||
/* maximum number of connections we are allowed to open at once */
|
/* maximum number of connections we are allowed to open at once */
|
||||||
uint32 maxNewConnectionsPerCycle;
|
uint32 maxNewConnectionsPerCycle;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Set to true if the pool is to local node. We use this value to
|
||||||
|
* avoid re-calculating often.
|
||||||
|
*/
|
||||||
|
bool poolToLocalNode;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* This is only set in WorkerPoolFailed() function. Once a pool fails, we do not
|
* This is only set in WorkerPoolFailed() function. Once a pool fails, we do not
|
||||||
* use it anymore.
|
* use it anymore.
|
||||||
*/
|
*/
|
||||||
bool failed;
|
WorkerPoolFailureState failureState;
|
||||||
} WorkerPool;
|
} WorkerPool;
|
||||||
|
|
||||||
struct TaskPlacementExecution;
|
struct TaskPlacementExecution;
|
||||||
|
@ -457,7 +482,8 @@ typedef enum TaskExecutionState
|
||||||
{
|
{
|
||||||
TASK_EXECUTION_NOT_FINISHED,
|
TASK_EXECUTION_NOT_FINISHED,
|
||||||
TASK_EXECUTION_FINISHED,
|
TASK_EXECUTION_FINISHED,
|
||||||
TASK_EXECUTION_FAILED
|
TASK_EXECUTION_FAILED,
|
||||||
|
TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION
|
||||||
} TaskExecutionState;
|
} TaskExecutionState;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -515,6 +541,7 @@ typedef enum TaskPlacementExecutionState
|
||||||
PLACEMENT_EXECUTION_READY,
|
PLACEMENT_EXECUTION_READY,
|
||||||
PLACEMENT_EXECUTION_RUNNING,
|
PLACEMENT_EXECUTION_RUNNING,
|
||||||
PLACEMENT_EXECUTION_FINISHED,
|
PLACEMENT_EXECUTION_FINISHED,
|
||||||
|
PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION,
|
||||||
PLACEMENT_EXECUTION_FAILED
|
PLACEMENT_EXECUTION_FAILED
|
||||||
} TaskPlacementExecutionState;
|
} TaskPlacementExecutionState;
|
||||||
|
|
||||||
|
@ -582,7 +609,6 @@ static void StartDistributedExecution(DistributedExecution *execution);
|
||||||
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
|
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
|
||||||
static void RunDistributedExecution(DistributedExecution *execution);
|
static void RunDistributedExecution(DistributedExecution *execution);
|
||||||
static void SequentialRunDistributedExecution(DistributedExecution *execution);
|
static void SequentialRunDistributedExecution(DistributedExecution *execution);
|
||||||
|
|
||||||
static void FinishDistributedExecution(DistributedExecution *execution);
|
static void FinishDistributedExecution(DistributedExecution *execution);
|
||||||
static void CleanUpSessions(DistributedExecution *execution);
|
static void CleanUpSessions(DistributedExecution *execution);
|
||||||
|
|
||||||
|
@ -634,6 +660,8 @@ static void PlacementExecutionDone(TaskPlacementExecution *placementExecution,
|
||||||
bool succeeded);
|
bool succeeded);
|
||||||
static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution,
|
static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution,
|
||||||
bool succeeded);
|
bool succeeded);
|
||||||
|
static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *
|
||||||
|
placementExecution);
|
||||||
static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution);
|
static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution);
|
||||||
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
|
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
|
||||||
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
|
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
|
||||||
|
@ -1080,7 +1108,11 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
execution->remoteTaskList = execution->remoteAndLocalTaskList;
|
/*
|
||||||
|
* Get a shallow copy of the list as we rely on remoteAndLocalTaskList
|
||||||
|
* across the execution.
|
||||||
|
*/
|
||||||
|
execution->remoteTaskList = list_copy(execution->remoteAndLocalTaskList);
|
||||||
}
|
}
|
||||||
|
|
||||||
execution->totalTaskCount = list_length(execution->remoteTaskList);
|
execution->totalTaskCount = list_length(execution->remoteTaskList);
|
||||||
|
@ -1630,7 +1662,6 @@ CleanUpSessions(DistributedExecution *execution)
|
||||||
* changing any states in the ConnectionStateMachine.
|
* changing any states in the ConnectionStateMachine.
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
CloseConnection(connection);
|
CloseConnection(connection);
|
||||||
}
|
}
|
||||||
else if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
else if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
|
||||||
|
@ -1708,8 +1739,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
RowModifyLevel modLevel = execution->modLevel;
|
RowModifyLevel modLevel = execution->modLevel;
|
||||||
List *taskList = execution->remoteTaskList;
|
List *taskList = execution->remoteTaskList;
|
||||||
|
|
||||||
int32 localGroupId = GetLocalGroupId();
|
|
||||||
|
|
||||||
Task *task = NULL;
|
Task *task = NULL;
|
||||||
foreach_ptr(task, taskList)
|
foreach_ptr(task, taskList)
|
||||||
{
|
{
|
||||||
|
@ -1849,11 +1878,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
|
||||||
*/
|
*/
|
||||||
placementExecutionReady = false;
|
placementExecutionReady = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskPlacement->groupId == localGroupId)
|
|
||||||
{
|
|
||||||
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2020,6 +2044,13 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
|
||||||
workerPool->nodeName = pstrdup(nodeName);
|
workerPool->nodeName = pstrdup(nodeName);
|
||||||
workerPool->nodePort = nodePort;
|
workerPool->nodePort = nodePort;
|
||||||
|
|
||||||
|
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
|
||||||
|
if (workerNode)
|
||||||
|
{
|
||||||
|
workerPool->poolToLocalNode =
|
||||||
|
workerNode->groupId == GetLocalGroupId();
|
||||||
|
}
|
||||||
|
|
||||||
/* "open" connections aggressively when there are cached connections */
|
/* "open" connections aggressively when there are cached connections */
|
||||||
int nodeConnectionCount = MaxCachedConnectionsPerWorker;
|
int nodeConnectionCount = MaxCachedConnectionsPerWorker;
|
||||||
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
|
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
|
||||||
|
@ -2142,13 +2173,11 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
|
||||||
* executions, so make sure to set it.
|
* executions, so make sure to set it.
|
||||||
*/
|
*/
|
||||||
MultiShardConnectionType = SEQUENTIAL_CONNECTION;
|
MultiShardConnectionType = SEQUENTIAL_CONNECTION;
|
||||||
|
|
||||||
Task *taskToExecute = NULL;
|
Task *taskToExecute = NULL;
|
||||||
foreach_ptr(taskToExecute, taskList)
|
foreach_ptr(taskToExecute, taskList)
|
||||||
{
|
{
|
||||||
/* execute each task one by one */
|
|
||||||
execution->remoteAndLocalTaskList = list_make1(taskToExecute);
|
execution->remoteAndLocalTaskList = list_make1(taskToExecute);
|
||||||
execution->remoteTaskList = execution->remoteAndLocalTaskList;
|
execution->remoteTaskList = list_make1(taskToExecute);
|
||||||
execution->totalTaskCount = 1;
|
execution->totalTaskCount = 1;
|
||||||
execution->unfinishedTaskCount = 1;
|
execution->unfinishedTaskCount = 1;
|
||||||
|
|
||||||
|
@ -2201,15 +2230,21 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
|
|
||||||
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
|
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
|
||||||
{
|
{
|
||||||
long timeout = NextEventTimeout(execution);
|
|
||||||
|
|
||||||
WorkerPool *workerPool = NULL;
|
WorkerPool *workerPool = NULL;
|
||||||
foreach_ptr(workerPool, execution->workerList)
|
foreach_ptr(workerPool, execution->workerList)
|
||||||
{
|
{
|
||||||
ManageWorkerPool(workerPool);
|
ManageWorkerPool(workerPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (execution->rebuildWaitEventSet)
|
if (execution->remoteTaskList == NIL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* All the tasks are failed over to the local execution, no need
|
||||||
|
* to wait for any connection activity.
|
||||||
|
*/
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
else if (execution->rebuildWaitEventSet)
|
||||||
{
|
{
|
||||||
if (events != NULL)
|
if (events != NULL)
|
||||||
{
|
{
|
||||||
|
@ -2221,7 +2256,6 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
events = NULL;
|
events = NULL;
|
||||||
}
|
}
|
||||||
eventSetSize = RebuildWaitEventSet(execution);
|
eventSetSize = RebuildWaitEventSet(execution);
|
||||||
|
|
||||||
events = palloc0(eventSetSize * sizeof(WaitEvent));
|
events = palloc0(eventSetSize * sizeof(WaitEvent));
|
||||||
}
|
}
|
||||||
else if (execution->waitFlagsChanged)
|
else if (execution->waitFlagsChanged)
|
||||||
|
@ -2231,6 +2265,7 @@ RunDistributedExecution(DistributedExecution *execution)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* wait for I/O events */
|
/* wait for I/O events */
|
||||||
|
long timeout = NextEventTimeout(execution);
|
||||||
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
|
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
|
||||||
eventSetSize, WAIT_EVENT_CLIENT_READ);
|
eventSetSize, WAIT_EVENT_CLIENT_READ);
|
||||||
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived);
|
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived);
|
||||||
|
@ -2369,7 +2404,6 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
}
|
}
|
||||||
|
|
||||||
int newConnectionCount = CalculateNewConnectionCount(workerPool);
|
int newConnectionCount = CalculateNewConnectionCount(workerPool);
|
||||||
|
|
||||||
if (newConnectionCount <= 0)
|
if (newConnectionCount <= 0)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
@ -2377,6 +2411,46 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
|
|
||||||
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
|
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Cannot establish new connections to the local host, most probably because the
|
||||||
|
* local node cannot accept new connections (e.g., hit max_connections). Switch
|
||||||
|
* the tasks to the local execution.
|
||||||
|
*
|
||||||
|
* We prefer initiatedConnectionCount over the new connection establishments happen
|
||||||
|
* in this iteration via OpenNewConnections(). The reason is that it is expected for
|
||||||
|
* OpenNewConnections() to not open any new connections as long as the connections
|
||||||
|
* are optional (e.g., the second or later connections in the pool). But, for
|
||||||
|
* initiatedConnectionCount to be zero, the connection to the local pool should have
|
||||||
|
* been failed.
|
||||||
|
*/
|
||||||
|
int initiatedConnectionCount = list_length(workerPool->sessionList);
|
||||||
|
if (initiatedConnectionCount == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Only the pools to the local node are allowed to have optional
|
||||||
|
* connections for the first connection. Hence, initiatedConnectionCount
|
||||||
|
* could only be zero for poolToLocalNode. For other pools, the connection
|
||||||
|
* manager would wait until it gets at least one connection.
|
||||||
|
*/
|
||||||
|
Assert(workerPool->poolToLocalNode);
|
||||||
|
|
||||||
|
WorkerPoolFailed(workerPool);
|
||||||
|
|
||||||
|
if (execution->failed)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg(
|
||||||
|
"could not establish any connections to the node %s:%d "
|
||||||
|
"when local execution is also disabled.",
|
||||||
|
workerPool->nodeName,
|
||||||
|
workerPool->nodePort),
|
||||||
|
errhint("Enable local execution via SET "
|
||||||
|
"citus.enable_local_execution TO true;")));
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
|
||||||
execution->rebuildWaitEventSet = true;
|
execution->rebuildWaitEventSet = true;
|
||||||
}
|
}
|
||||||
|
@ -2389,7 +2463,8 @@ ManageWorkerPool(WorkerPool *workerPool)
|
||||||
static bool
|
static bool
|
||||||
HasAnyConnectionFailure(WorkerPool *workerPool)
|
HasAnyConnectionFailure(WorkerPool *workerPool)
|
||||||
{
|
{
|
||||||
if (workerPool->failed)
|
if (workerPool->failureState == WORKER_POOL_FAILED ||
|
||||||
|
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
|
||||||
{
|
{
|
||||||
/* connection pool failed */
|
/* connection pool failed */
|
||||||
return true;
|
return true;
|
||||||
|
@ -2520,7 +2595,6 @@ CalculateNewConnectionCount(WorkerPool *workerPool)
|
||||||
* than the target pool size.
|
* than the target pool size.
|
||||||
*/
|
*/
|
||||||
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
|
||||||
|
|
||||||
if (newConnectionCount > 0)
|
if (newConnectionCount > 0)
|
||||||
{
|
{
|
||||||
/* increase the open rate every cycle (like TCP slow start) */
|
/* increase the open rate every cycle (like TCP slow start) */
|
||||||
|
@ -2557,7 +2631,8 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
|
||||||
* throttle connections if citus.max_shared_pool_size reached)
|
* throttle connections if citus.max_shared_pool_size reached)
|
||||||
*/
|
*/
|
||||||
int adaptiveConnectionManagementFlag =
|
int adaptiveConnectionManagementFlag =
|
||||||
AdaptiveConnectionManagementFlag(list_length(workerPool->sessionList));
|
AdaptiveConnectionManagementFlag(workerPool->poolToLocalNode,
|
||||||
|
list_length(workerPool->sessionList));
|
||||||
connectionFlags |= adaptiveConnectionManagementFlag;
|
connectionFlags |= adaptiveConnectionManagementFlag;
|
||||||
|
|
||||||
/* open a new connection to the worker */
|
/* open a new connection to the worker */
|
||||||
|
@ -2660,13 +2735,24 @@ CheckConnectionTimeout(WorkerPool *workerPool)
|
||||||
*/
|
*/
|
||||||
WorkerPoolFailed(workerPool);
|
WorkerPoolFailed(workerPool);
|
||||||
|
|
||||||
|
if (workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* When the pool is failed over to local execution, warning
|
||||||
|
* the user just creates chatter as the executor is capable of
|
||||||
|
* finishing the execution.
|
||||||
|
*/
|
||||||
|
logLevel = DEBUG1;
|
||||||
|
}
|
||||||
|
else if (execution->transactionProperties->errorOnAnyFailure ||
|
||||||
|
execution->failed)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* The enforcement is not always erroring out. For example, if a SELECT task
|
* The enforcement is not always erroring out. For example, if a SELECT task
|
||||||
* has two different placements, we'd warn the user, fail the pool and continue
|
* has two different placements, we'd warn the user, fail the pool and continue
|
||||||
* with the next placement.
|
* with the next placement.
|
||||||
*/
|
*/
|
||||||
if (execution->transactionProperties->errorOnAnyFailure || execution->failed)
|
|
||||||
{
|
|
||||||
logLevel = ERROR;
|
logLevel = ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2758,7 +2844,7 @@ NextEventTimeout(DistributedExecution *execution)
|
||||||
WorkerPool *workerPool = NULL;
|
WorkerPool *workerPool = NULL;
|
||||||
foreach_ptr(workerPool, execution->workerList)
|
foreach_ptr(workerPool, execution->workerList)
|
||||||
{
|
{
|
||||||
if (workerPool->failed)
|
if (workerPool->failureState == WORKER_POOL_FAILED)
|
||||||
{
|
{
|
||||||
/* worker pool may have already timed out */
|
/* worker pool may have already timed out */
|
||||||
continue;
|
continue;
|
||||||
|
@ -2974,14 +3060,28 @@ ConnectionStateMachine(WorkerSession *session)
|
||||||
* or WorkerPoolFailed.
|
* or WorkerPoolFailed.
|
||||||
*/
|
*/
|
||||||
if (execution->failed ||
|
if (execution->failed ||
|
||||||
execution->transactionProperties->errorOnAnyFailure)
|
(execution->transactionProperties->errorOnAnyFailure &&
|
||||||
|
workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL))
|
||||||
{
|
{
|
||||||
/* a task has failed due to this connection failure */
|
/* a task has failed due to this connection failure */
|
||||||
ReportConnectionError(connection, ERROR);
|
ReportConnectionError(connection, ERROR);
|
||||||
}
|
}
|
||||||
|
else if (workerPool->activeConnectionCount > 0 ||
|
||||||
|
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We already have active connection(s) to the node, and the
|
||||||
|
* executor is capable of using those connections to successfully
|
||||||
|
* finish the execution. So, there is not much value in warning
|
||||||
|
* the user.
|
||||||
|
*
|
||||||
|
* Similarly when the pool is failed over to local execution, warning
|
||||||
|
* the user just creates chatter.
|
||||||
|
*/
|
||||||
|
ReportConnectionError(connection, DEBUG1);
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
/* can continue with the remaining nodes */
|
|
||||||
ReportConnectionError(connection, WARNING);
|
ReportConnectionError(connection, WARNING);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3558,6 +3658,17 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
|
||||||
if (querySent)
|
if (querySent)
|
||||||
{
|
{
|
||||||
session->commandsSent++;
|
session->commandsSent++;
|
||||||
|
|
||||||
|
if (workerPool->poolToLocalNode)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* As we started remote execution to the local node,
|
||||||
|
* we cannot switch back to local execution as that
|
||||||
|
* would cause self-deadlocks and breaking
|
||||||
|
* read-your-own-writes consistency.
|
||||||
|
*/
|
||||||
|
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return querySent;
|
return querySent;
|
||||||
|
@ -4004,7 +4115,8 @@ WorkerPoolFailed(WorkerPool *workerPool)
|
||||||
* A pool cannot fail multiple times, the necessary actions
|
* A pool cannot fail multiple times, the necessary actions
|
||||||
* has already be taken, so bail out.
|
* has already be taken, so bail out.
|
||||||
*/
|
*/
|
||||||
if (workerPool->failed)
|
if (workerPool->failureState == WORKER_POOL_FAILED ||
|
||||||
|
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4033,7 +4145,11 @@ WorkerPoolFailed(WorkerPool *workerPool)
|
||||||
|
|
||||||
/* we do not want more connections in this pool */
|
/* we do not want more connections in this pool */
|
||||||
workerPool->readyTaskCount = 0;
|
workerPool->readyTaskCount = 0;
|
||||||
workerPool->failed = true;
|
if (workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)
|
||||||
|
{
|
||||||
|
/* we prefer not to override WORKER_POOL_FAILED_OVER_TO_LOCAL */
|
||||||
|
workerPool->failureState = WORKER_POOL_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The reason is that when replication factor is > 1 and we are performing
|
* The reason is that when replication factor is > 1 and we are performing
|
||||||
|
@ -4050,7 +4166,8 @@ WorkerPoolFailed(WorkerPool *workerPool)
|
||||||
foreach_ptr(pool, workerList)
|
foreach_ptr(pool, workerList)
|
||||||
{
|
{
|
||||||
/* failed pools or pools without any connection attempts ignored */
|
/* failed pools or pools without any connection attempts ignored */
|
||||||
if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime))
|
if (pool->failureState == WORKER_POOL_FAILED ||
|
||||||
|
INSTR_TIME_IS_ZERO(pool->poolStartTime))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -4126,11 +4243,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* mark the placement execution as finished */
|
|
||||||
if (succeeded)
|
if (succeeded)
|
||||||
{
|
{
|
||||||
|
/* mark the placement execution as finished */
|
||||||
placementExecution->executionState = PLACEMENT_EXECUTION_FINISHED;
|
placementExecution->executionState = PLACEMENT_EXECUTION_FINISHED;
|
||||||
}
|
}
|
||||||
|
else if (CanFailoverPlacementExecutionToLocalExecution(placementExecution))
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The placement execution can be done over local execution, so it is a soft
|
||||||
|
* failure for now.
|
||||||
|
*/
|
||||||
|
placementExecution->executionState =
|
||||||
|
PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
if (ShouldMarkPlacementsInvalidOnFailure(execution))
|
if (ShouldMarkPlacementsInvalidOnFailure(execution))
|
||||||
|
@ -4174,13 +4300,36 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
|
||||||
* Update unfinishedTaskCount only when state changes from not finished to
|
* Update unfinishedTaskCount only when state changes from not finished to
|
||||||
* finished or failed state.
|
* finished or failed state.
|
||||||
*/
|
*/
|
||||||
TaskExecutionState newExecutionState = TaskExecutionStateMachine(
|
TaskExecutionState newExecutionState =
|
||||||
shardCommandExecution);
|
TaskExecutionStateMachine(shardCommandExecution);
|
||||||
if (newExecutionState == TASK_EXECUTION_FINISHED)
|
if (newExecutionState == TASK_EXECUTION_FINISHED)
|
||||||
{
|
{
|
||||||
execution->unfinishedTaskCount--;
|
execution->unfinishedTaskCount--;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
else if (newExecutionState == TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION)
|
||||||
|
{
|
||||||
|
execution->unfinishedTaskCount--;
|
||||||
|
|
||||||
|
/* move the task to the local execution */
|
||||||
|
Task *task = shardCommandExecution->task;
|
||||||
|
execution->localTaskList = lappend(execution->localTaskList, task);
|
||||||
|
|
||||||
|
/* remove the task from the remote execution list */
|
||||||
|
execution->remoteTaskList = list_delete_ptr(execution->remoteTaskList, task);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* As we decided to failover this task to local execution, we cannot
|
||||||
|
* allow remote execution to this pool during this distributedExecution.
|
||||||
|
*/
|
||||||
|
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
|
||||||
|
workerPool->failureState = WORKER_POOL_FAILED_OVER_TO_LOCAL;
|
||||||
|
|
||||||
|
ereport(DEBUG4, (errmsg("Task %d execution is failed over to local execution",
|
||||||
|
task->taskId)));
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
else if (newExecutionState == TASK_EXECUTION_FAILED)
|
else if (newExecutionState == TASK_EXECUTION_FAILED)
|
||||||
{
|
{
|
||||||
execution->unfinishedTaskCount--;
|
execution->unfinishedTaskCount--;
|
||||||
|
@ -4199,6 +4348,60 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CanFailoverPlacementExecutionToLocalExecution returns true if the input
|
||||||
|
* TaskPlacementExecution can be fail overed to local execution. In other words,
|
||||||
|
* the execution can be deferred to local execution.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementExecution)
|
||||||
|
{
|
||||||
|
if (!EnableLocalExecution)
|
||||||
|
{
|
||||||
|
/* the user explicitly disabled local execution */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If the current transaction accessed the local node over a connection
|
||||||
|
* then we can't use local execution because of visibility issues.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
WorkerPool *workerPool = placementExecution->workerPool;
|
||||||
|
if (!workerPool->poolToLocalNode)
|
||||||
|
{
|
||||||
|
/* we can only fail over tasks to local execution for local pools */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (workerPool->activeConnectionCount > 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The pool has already active connections, the executor is capable
|
||||||
|
* of using those active connections. So, no need to failover
|
||||||
|
* to the local execution.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (placementExecution->assignedSession != NULL)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If the placement execution has been assigned to a specific session,
|
||||||
|
* it has to be executed over that session. Otherwise, it would cause
|
||||||
|
* self-deadlocks and break read-your-own-writes consistency.
|
||||||
|
*/
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ScheduleNextPlacementExecution is triggered if the query needs to be
|
* ScheduleNextPlacementExecution is triggered if the query needs to be
|
||||||
* executed on any or all placements in order and there is a placement on
|
* executed on any or all placements in order and there is a placement on
|
||||||
|
@ -4361,6 +4564,7 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
|
||||||
PlacementExecutionOrder executionOrder = shardCommandExecution->executionOrder;
|
PlacementExecutionOrder executionOrder = shardCommandExecution->executionOrder;
|
||||||
int donePlacementCount = 0;
|
int donePlacementCount = 0;
|
||||||
int failedPlacementCount = 0;
|
int failedPlacementCount = 0;
|
||||||
|
int failedOverPlacementCount = 0;
|
||||||
int placementCount = 0;
|
int placementCount = 0;
|
||||||
int placementExecutionIndex = 0;
|
int placementExecutionIndex = 0;
|
||||||
int placementExecutionCount = shardCommandExecution->placementExecutionCount;
|
int placementExecutionCount = shardCommandExecution->placementExecutionCount;
|
||||||
|
@ -4386,6 +4590,10 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
|
||||||
{
|
{
|
||||||
failedPlacementCount++;
|
failedPlacementCount++;
|
||||||
}
|
}
|
||||||
|
else if (executionState == PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION)
|
||||||
|
{
|
||||||
|
failedOverPlacementCount++;
|
||||||
|
}
|
||||||
|
|
||||||
placementCount++;
|
placementCount++;
|
||||||
}
|
}
|
||||||
|
@ -4402,6 +4610,21 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
|
||||||
{
|
{
|
||||||
currentTaskExecutionState = TASK_EXECUTION_FINISHED;
|
currentTaskExecutionState = TASK_EXECUTION_FINISHED;
|
||||||
}
|
}
|
||||||
|
else if (failedOverPlacementCount + donePlacementCount + failedPlacementCount ==
|
||||||
|
placementCount)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* For any given task, we could have 3 end states:
|
||||||
|
* - "donePlacementCount" indicates the successful placement executions
|
||||||
|
* - "failedPlacementCount" indicates the failed placement executions
|
||||||
|
* - "failedOverPlacementCount" indicates the placement executions that
|
||||||
|
* are failed when using remote execution due to connection errors,
|
||||||
|
* but there is still a possibility of being successful via
|
||||||
|
* local execution. So, for now they are considered as soft
|
||||||
|
* errors.
|
||||||
|
*/
|
||||||
|
currentTaskExecutionState = TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION;
|
||||||
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
currentTaskExecutionState = TASK_EXECUTION_NOT_FINISHED;
|
currentTaskExecutionState = TASK_EXECUTION_NOT_FINISHED;
|
||||||
|
|
|
@ -113,6 +113,7 @@ static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSourc
|
||||||
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
|
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
|
||||||
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
|
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
|
||||||
static const char * MaxSharedPoolSizeGucShowHook(void);
|
static const char * MaxSharedPoolSizeGucShowHook(void);
|
||||||
|
static const char * LocalPoolSizeGucShowHook(void);
|
||||||
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
|
||||||
source);
|
source);
|
||||||
static void CitusAuthHook(Port *port, int status);
|
static void CitusAuthHook(Port *port, int status);
|
||||||
|
@ -693,6 +694,21 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL,
|
GUC_NO_SHOW_ALL,
|
||||||
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.local_shared_pool_size",
|
||||||
|
gettext_noop(
|
||||||
|
"Sets the maximum number of connections allowed for the shards on the "
|
||||||
|
"local node across all the backends from this node. Setting to -1 disables "
|
||||||
|
"connections throttling. Setting to 0 makes it auto-adjust, meaning "
|
||||||
|
"equal to the half of max_connections on the coordinator."),
|
||||||
|
gettext_noop("As a rule of thumb, the value should be at most equal to the "
|
||||||
|
"max_connections on the local node."),
|
||||||
|
&LocalSharedPoolSize,
|
||||||
|
0, -1, INT_MAX,
|
||||||
|
PGC_SIGHUP,
|
||||||
|
GUC_SUPERUSER_ONLY,
|
||||||
|
NULL, NULL, LocalPoolSizeGucShowHook);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.log_multi_join_order",
|
"citus.log_multi_join_order",
|
||||||
gettext_noop("Logs the distributed join order to the server log."),
|
gettext_noop("Logs the distributed join order to the server log."),
|
||||||
|
@ -1764,6 +1780,21 @@ MaxSharedPoolSizeGucShowHook(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* LocalPoolSizeGucShowHook overrides the value that is shown to the
|
||||||
|
* user when the default value has not been set.
|
||||||
|
*/
|
||||||
|
static const char *
|
||||||
|
LocalPoolSizeGucShowHook(void)
|
||||||
|
{
|
||||||
|
StringInfo newvalue = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(newvalue, "%d", GetLocalSharedPoolSize());
|
||||||
|
|
||||||
|
return (const char *) newvalue->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
|
||||||
{
|
{
|
||||||
|
|
|
@ -13,19 +13,23 @@
|
||||||
|
|
||||||
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
|
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
|
||||||
#define DISABLE_CONNECTION_THROTTLING -1
|
#define DISABLE_CONNECTION_THROTTLING -1
|
||||||
|
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
|
||||||
|
|
||||||
|
|
||||||
extern int MaxSharedPoolSize;
|
extern int MaxSharedPoolSize;
|
||||||
|
extern int LocalSharedPoolSize;
|
||||||
|
|
||||||
|
|
||||||
extern void InitializeSharedConnectionStats(void);
|
extern void InitializeSharedConnectionStats(void);
|
||||||
extern void WaitForSharedConnection(void);
|
extern void WaitForSharedConnection(void);
|
||||||
extern void WakeupWaiterBackendsForSharedConnection(void);
|
extern void WakeupWaiterBackendsForSharedConnection(void);
|
||||||
extern int GetMaxSharedPoolSize(void);
|
extern int GetMaxSharedPoolSize(void);
|
||||||
|
extern int GetLocalSharedPoolSize(void);
|
||||||
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void WaitLoopForSharedConnection(const char *hostname, int port);
|
extern void WaitLoopForSharedConnection(const char *hostname, int port);
|
||||||
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
|
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
|
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
|
||||||
extern int AdaptiveConnectionManagementFlag(int activeConnectionCount);
|
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int
|
||||||
|
activeConnectionCount);
|
||||||
|
|
||||||
#endif /* SHARED_CONNECTION_STATS_H */
|
#endif /* SHARED_CONNECTION_STATS_H */
|
||||||
|
|
|
@ -81,11 +81,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
CREATE TABLE distributed_result_info AS
|
CREATE TABLE distributed_result_info AS
|
||||||
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
NATURAL JOIN pg_dist_node;
|
NATURAL JOIN pg_dist_node;
|
||||||
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
|
DEBUG: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
|
||||||
This probably means the server terminated abnormally
|
This probably means the server terminated abnormally
|
||||||
before or while processing the request.
|
before or while processing the request.
|
||||||
SELECT * FROM distributed_result_info ORDER BY resultId;
|
SELECT * FROM distributed_result_info ORDER BY resultId;
|
||||||
|
|
|
@ -0,0 +1,120 @@
|
||||||
|
CREATE SCHEMA failure_failover_to_local_execution;
|
||||||
|
SET search_path TO failure_failover_to_local_execution;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 1980000;
|
||||||
|
-- on Citus-mx, we can have
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
start_metadata_sync_to_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
|
||||||
|
SELECT create_distributed_table('failover_to_local', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO failure_failover_to_local_execution;
|
||||||
|
-- we don't want any cached connections
|
||||||
|
SET citus.max_cached_conns_per_worker to 0;
|
||||||
|
INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
|
||||||
|
-- even if the connection establishment fails, Citus can
|
||||||
|
-- failover to local exection
|
||||||
|
SET citus.node_connection_timeout TO 400;
|
||||||
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
DEBUG: could not establish any connections to the node localhost:xxxxx after 400 ms
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- if the remote query execution fails, Citus
|
||||||
|
-- does not try to fallback to local execution
|
||||||
|
SELECT key / 0 FROM failover_to_local;
|
||||||
|
ERROR: division by zero
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
-- if the local execution is disabled, Citus does
|
||||||
|
-- not try to fallback to local execution
|
||||||
|
SET citus.enable_local_execution TO false;
|
||||||
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.enable_local_execution;
|
||||||
|
-- even if we are on a multi-shard command, Citus can
|
||||||
|
-- failover to the local execution if no connection attempts succeed
|
||||||
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980001 failover_to_local WHERE true
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980003 failover_to_local WHERE true
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
|
||||||
|
This probably means the server terminated abnormally
|
||||||
|
before or while processing the request.
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
21
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
mitmproxy
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA failure_failover_to_local_execution CASCADE;
|
|
@ -48,3 +48,42 @@ BEGIN
|
||||||
RETURN QUERY SELECT * FROM mitmproxy_result;
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
-- Add some helper functions for sending commands to mitmproxy
|
||||||
|
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
|
||||||
|
DECLARE
|
||||||
|
command ALIAS FOR $1;
|
||||||
|
BEGIN
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
|
||||||
|
|
||||||
|
INSERT INTO mitmproxy_command VALUES (command);
|
||||||
|
|
||||||
|
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
|
||||||
|
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM citus.mitmproxy('recorder.reset()');
|
||||||
|
RETURN; -- return void
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
CREATE FUNCTION citus.dump_network_traffic()
|
||||||
|
RETURNS TABLE(conn int, source text, message text) AS $$
|
||||||
|
BEGIN
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_result (
|
||||||
|
conn int, source text, message text
|
||||||
|
) ON COMMIT DROP;
|
||||||
|
|
||||||
|
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
|
||||||
|
|
||||||
|
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
|
||||||
|
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
|
@ -49,7 +49,7 @@ SELECT create_reference_table('ref');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
CREATE TABLE local(c int, d int);
|
CREATE TABLE local(c int, d int);
|
||||||
CREATE TABLE public.another_schema_table(a int);
|
CREATE TABLE public.another_schema_table(a int, b int);
|
||||||
SELECT create_distributed_table('public.another_schema_table', 'a');
|
SELECT create_distributed_table('public.another_schema_table', 'a');
|
||||||
create_distributed_table
|
create_distributed_table
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -1097,9 +1097,9 @@ SELECT pg_catalog.get_all_active_client_backend_count();
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.shard_count TO 32;
|
SET LOCAL citus.shard_count TO 32;
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET LOCAL citus.force_max_query_parallelization TO ON;
|
||||||
SET citus.enable_local_execution TO false;
|
SET LOCAL citus.enable_local_execution TO false;
|
||||||
CREATE TABLE test (a int);
|
CREATE TABLE test (a int);
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SELECT create_distributed_table('test', 'a');
|
SELECT create_distributed_table('test', 'a');
|
||||||
|
@ -1122,11 +1122,101 @@ BEGIN;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO single_node;
|
||||||
|
-- simulate that even if there is no connection slots
|
||||||
|
-- to connect, Citus can switch to local execution
|
||||||
|
SET citus.force_max_query_parallelization TO false;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
pg_reload_conf
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT pg_sleep(0.1);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.executor_slow_start_interval TO 10;
|
||||||
|
SELECT count(*) from another_schema_table;
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630509 another_schema_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630510 another_schema_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE another_schema_table SET b = b;
|
||||||
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630509 another_schema_table SET b = b
|
||||||
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630510 another_schema_table SET b = b
|
||||||
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b
|
||||||
|
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b
|
||||||
|
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
|
||||||
|
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
|
||||||
|
-- COPY command
|
||||||
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE (a IS NOT NULL)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE (a IS NOT NULL)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL)
|
||||||
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||||
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||||
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||||
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||||
|
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
|
||||||
|
-- multi-row INSERTs
|
||||||
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6)
|
||||||
|
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2)
|
||||||
|
-- intermediate results
|
||||||
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||||
|
SELECT count(*) FROM cte_1;
|
||||||
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
|
||||||
|
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
7
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- if the local execution is disabled, we cannot failover to
|
||||||
|
-- local execution and the queries would fail
|
||||||
|
SET citus.enable_local_execution TO false;
|
||||||
|
SELECT count(*) from another_schema_table;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
|
UPDATE another_schema_table SET b = b;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||||
|
SELECT count(*) FROM cte_1;
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||||
|
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
|
||||||
|
HINT: Enable local execution via SET citus.enable_local_execution TO true;
|
||||||
-- set the values to originals back
|
-- set the values to originals back
|
||||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
|
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
pg_reload_conf
|
pg_reload_conf
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -35,6 +35,9 @@ test: failure_multi_row_insert
|
||||||
test: failure_mx_metadata_sync
|
test: failure_mx_metadata_sync
|
||||||
test: failure_connection_establishment
|
test: failure_connection_establishment
|
||||||
|
|
||||||
|
# this test syncs metadata to the workers
|
||||||
|
test: failure_failover_to_local_execution
|
||||||
|
|
||||||
# test that no tests leaked intermediate results. This should always be last
|
# test that no tests leaked intermediate results. This should always be last
|
||||||
test: ensure_no_intermediate_data_leak
|
test: ensure_no_intermediate_data_leak
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,7 @@ ROLLBACk;
|
||||||
-- with failure, results from 100802 should be retried and succeed on 57637
|
-- with failure, results from 100802 should be retried and succeed on 57637
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()');
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
CREATE TABLE distributed_result_info AS
|
CREATE TABLE distributed_result_info AS
|
||||||
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
|
||||||
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
|
||||||
|
|
|
@ -0,0 +1,58 @@
|
||||||
|
CREATE SCHEMA failure_failover_to_local_execution;
|
||||||
|
SET search_path TO failure_failover_to_local_execution;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
SET citus.next_shard_id TO 1980000;
|
||||||
|
|
||||||
|
-- on Citus-mx, we can have
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
|
SET citus.replication_model TO 'streaming';
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
|
||||||
|
SELECT create_distributed_table('failover_to_local', 'key');
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SET search_path TO failure_failover_to_local_execution;
|
||||||
|
|
||||||
|
-- we don't want any cached connections
|
||||||
|
SET citus.max_cached_conns_per_worker to 0;
|
||||||
|
INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
|
||||||
|
|
||||||
|
|
||||||
|
-- even if the connection establishment fails, Citus can
|
||||||
|
-- failover to local exection
|
||||||
|
SET citus.node_connection_timeout TO 400;
|
||||||
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
-- if the remote query execution fails, Citus
|
||||||
|
-- does not try to fallback to local execution
|
||||||
|
SELECT key / 0 FROM failover_to_local;
|
||||||
|
|
||||||
|
-- if the local execution is disabled, Citus does
|
||||||
|
-- not try to fallback to local execution
|
||||||
|
SET citus.enable_local_execution TO false;
|
||||||
|
SELECT citus.mitmproxy('conn.delay(500)');
|
||||||
|
SET citus.log_local_commands TO ON;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
RESET citus.enable_local_execution;
|
||||||
|
|
||||||
|
-- even if we are on a multi-shard command, Citus can
|
||||||
|
-- failover to the local execution if no connection attempts succeed
|
||||||
|
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
SELECT count(*) FROM failover_to_local;
|
||||||
|
|
||||||
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA failure_failover_to_local_execution CASCADE;
|
||||||
|
|
|
@ -47,3 +47,47 @@ BEGIN
|
||||||
RETURN QUERY SELECT * FROM mitmproxy_result;
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE plpgsql;
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
|
||||||
|
-- Add some helper functions for sending commands to mitmproxy
|
||||||
|
|
||||||
|
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
|
||||||
|
DECLARE
|
||||||
|
command ALIAS FOR $1;
|
||||||
|
BEGIN
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
|
||||||
|
|
||||||
|
INSERT INTO mitmproxy_command VALUES (command);
|
||||||
|
|
||||||
|
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
|
||||||
|
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
PERFORM citus.mitmproxy('recorder.reset()');
|
||||||
|
RETURN; -- return void
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE FUNCTION citus.dump_network_traffic()
|
||||||
|
RETURNS TABLE(conn int, source text, message text) AS $$
|
||||||
|
BEGIN
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
|
||||||
|
CREATE TEMPORARY TABLE mitmproxy_result (
|
||||||
|
conn int, source text, message text
|
||||||
|
) ON COMMIT DROP;
|
||||||
|
|
||||||
|
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
|
||||||
|
|
||||||
|
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
|
||||||
|
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
|
||||||
|
|
||||||
|
RETURN QUERY SELECT * FROM mitmproxy_result;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
|
@ -30,7 +30,7 @@ CREATE TABLE ref(a int, b int);
|
||||||
SELECT create_reference_table('ref');
|
SELECT create_reference_table('ref');
|
||||||
CREATE TABLE local(c int, d int);
|
CREATE TABLE local(c int, d int);
|
||||||
|
|
||||||
CREATE TABLE public.another_schema_table(a int);
|
CREATE TABLE public.another_schema_table(a int, b int);
|
||||||
SELECT create_distributed_table('public.another_schema_table', 'a');
|
SELECT create_distributed_table('public.another_schema_table', 'a');
|
||||||
|
|
||||||
-- Confirm the basics work
|
-- Confirm the basics work
|
||||||
|
@ -611,9 +611,9 @@ SET search_path TO single_node;
|
||||||
SELECT count(*) from should_commit;
|
SELECT count(*) from should_commit;
|
||||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SET citus.shard_count TO 32;
|
SET LOCAL citus.shard_count TO 32;
|
||||||
SET citus.force_max_query_parallelization TO ON;
|
SET LOCAL citus.force_max_query_parallelization TO ON;
|
||||||
SET citus.enable_local_execution TO false;
|
SET LOCAL citus.enable_local_execution TO false;
|
||||||
|
|
||||||
CREATE TABLE test (a int);
|
CREATE TABLE test (a int);
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
@ -624,11 +624,54 @@ BEGIN;
|
||||||
SELECT pg_catalog.get_all_active_client_backend_count();
|
SELECT pg_catalog.get_all_active_client_backend_count();
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO single_node;
|
||||||
|
|
||||||
|
|
||||||
|
-- simulate that even if there is no connection slots
|
||||||
|
-- to connect, Citus can switch to local execution
|
||||||
|
SET citus.force_max_query_parallelization TO false;
|
||||||
|
SET citus.log_remote_commands TO ON;
|
||||||
|
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
SELECT pg_sleep(0.1);
|
||||||
|
SET citus.executor_slow_start_interval TO 10;
|
||||||
|
SELECT count(*) from another_schema_table;
|
||||||
|
|
||||||
|
UPDATE another_schema_table SET b = b;
|
||||||
|
|
||||||
|
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
|
||||||
|
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
|
||||||
|
-- COPY command
|
||||||
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
||||||
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||||
|
|
||||||
|
-- multi-row INSERTs
|
||||||
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||||
|
|
||||||
|
-- intermediate results
|
||||||
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||||
|
SELECT count(*) FROM cte_1;
|
||||||
|
|
||||||
|
-- if the local execution is disabled, we cannot failover to
|
||||||
|
-- local execution and the queries would fail
|
||||||
|
SET citus.enable_local_execution TO false;
|
||||||
|
SELECT count(*) from another_schema_table;
|
||||||
|
UPDATE another_schema_table SET b = b;
|
||||||
|
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
|
||||||
|
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
|
||||||
|
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
|
||||||
|
SELECT count(*) FROM cte_1;
|
||||||
|
|
||||||
|
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
|
||||||
|
|
||||||
-- set the values to originals back
|
-- set the values to originals back
|
||||||
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
ALTER SYSTEM RESET citus.recover_2pc_interval;
|
||||||
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
||||||
|
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||||
SELECT pg_reload_conf();
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
-- suppress notices
|
-- suppress notices
|
||||||
|
|
Loading…
Reference in New Issue