Merge pull request #4338 from citusdata/single_node_conn_mngmt_main

Local node connection management
pull/4380/head
Önder Kalacı 2020-12-03 14:35:35 +03:00 committed by GitHub
commit 9789a7005f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 802 additions and 55 deletions

View File

@ -23,6 +23,7 @@
#include "access/htup_details.h"
#include "catalog/pg_authid.h"
#include "commands/dbcommands.h"
#include "distributed/backend_data.h"
#include "distributed/cancel_utils.h"
#include "distributed/connection_management.h"
#include "distributed/listutils.h"
@ -97,6 +98,14 @@ typedef struct SharedConnStatsHashEntry
*/
int MaxSharedPoolSize = 0;
/*
* Controlled via a GUC, never access directly, use GetLocalSharedPoolSize().
* "0" means adjust LocalSharedPoolSize automatically by using MaxConnections.
* "-1" means do not use any remote connections for local tasks
* Anything else means use that number
*/
int LocalSharedPoolSize = 0;
/* the following two structs are used for accessing shared memory */
static HTAB *SharedConnStatsHash = NULL;
@ -205,6 +214,25 @@ GetMaxSharedPoolSize(void)
}
/*
* GetLocalSharedPoolSize is a wrapper around LocalSharedPoolSize which is
* controlled via a GUC.
* "0" means adjust MaxSharedPoolSize automatically by using MaxConnections
* "-1" means do not use any remote connections for local tasks
* Anything else means use that number
*/
int
GetLocalSharedPoolSize(void)
{
if (LocalSharedPoolSize == ADJUST_POOLSIZE_AUTOMATICALLY)
{
return MaxConnections * 0.5;
}
return LocalSharedPoolSize;
}
/*
* WaitLoopForSharedConnection tries to increment the shared connection
* counter for the given hostname/port and the current database in
@ -270,6 +298,32 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
connKey.port = port;
connKey.databaseOid = MyDatabaseId;
/*
* Handle adaptive connection management for the local node slightly different
* as local node can failover to local execution.
*/
bool connectionToLocalNode = false;
int activeBackendCount = 0;
WorkerNode *workerNode = FindWorkerNode(hostname, port);
if (workerNode)
{
connectionToLocalNode = (workerNode->groupId == GetLocalGroupId());
if (connectionToLocalNode &&
GetLocalSharedPoolSize() == DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES)
{
/*
* This early return is required as LocalNodeParallelExecutionFactor
* is ignored for the first connection below. This check makes the
* user experience is more accurate and also makes it easy for
* having regression tests which emulates the local node adaptive
* connection management.
*/
return false;
}
activeBackendCount = GetAllActiveClientBackendCount();
}
LockConnectionSharedMemory(LW_EXCLUSIVE);
/*
@ -300,6 +354,34 @@ TryToIncrementSharedConnectionCounter(const char *hostname, int port)
counterIncremented = true;
}
else if (connectionToLocalNode)
{
/*
* For local nodes, solely relying on citus.max_shared_pool_size or
* max_connections might not be sufficient. The former gives us
* a preview of the future (e.g., we let the new connections to establish,
* but they are not established yet). The latter gives us the close to
* precise view of the past (e.g., the active number of client backends).
*
* Overall, we want to limit both of the metrics. The former limit typically
* kicks in under regular loads, where the load of the database increases in
* a reasonable pace. The latter limit typically kicks in when the database
* is issued lots of concurrent sessions at the same time, such as benchmarks.
*/
if (activeBackendCount + 1 > GetLocalSharedPoolSize())
{
counterIncremented = false;
}
else if (connectionEntry->connectionCount + 1 > GetLocalSharedPoolSize())
{
counterIncremented = false;
}
else
{
connectionEntry->connectionCount++;
counterIncremented = true;
}
}
else if (connectionEntry->connectionCount + 1 > GetMaxSharedPoolSize())
{
/* there is no space left for this connection */
@ -618,7 +700,7 @@ SharedConnectionStatsShmemInit(void)
* optional connections.
*/
int
AdaptiveConnectionManagementFlag(int activeConnectionCount)
AdaptiveConnectionManagementFlag(bool connectToLocalNode, int activeConnectionCount)
{
if (UseConnectionPerPlacement())
{
@ -633,6 +715,14 @@ AdaptiveConnectionManagementFlag(int activeConnectionCount)
*/
return 0;
}
else if (connectToLocalNode)
{
/*
* Connection to local node is always optional because the executor is capable
* of falling back to local execution.
*/
return OPTIONAL_CONNECTION;
}
else if (ShouldWaitForConnection(activeConnectionCount))
{
/*

View File

@ -298,6 +298,25 @@ typedef struct DistributedExecution
} DistributedExecution;
/*
* WorkerPoolFailureState indicates the current state of the
* pool.
*/
typedef enum WorkerPoolFailureState
{
/* safe to continue execution*/
WORKER_POOL_NOT_FAILED,
/* if a pool fails, the execution fails */
WORKER_POOL_FAILED,
/*
* The remote execution over the pool failed, but we failed over
* to the local execution and still finish the execution.
*/
WORKER_POOL_FAILED_OVER_TO_LOCAL
} WorkerPoolFailureState;
/*
* WorkerPool represents a pool of sessions on the same worker.
*
@ -383,11 +402,17 @@ typedef struct WorkerPool
/* maximum number of connections we are allowed to open at once */
uint32 maxNewConnectionsPerCycle;
/*
* Set to true if the pool is to local node. We use this value to
* avoid re-calculating often.
*/
bool poolToLocalNode;
/*
* This is only set in WorkerPoolFailed() function. Once a pool fails, we do not
* use it anymore.
*/
bool failed;
WorkerPoolFailureState failureState;
} WorkerPool;
struct TaskPlacementExecution;
@ -457,7 +482,8 @@ typedef enum TaskExecutionState
{
TASK_EXECUTION_NOT_FINISHED,
TASK_EXECUTION_FINISHED,
TASK_EXECUTION_FAILED
TASK_EXECUTION_FAILED,
TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION
} TaskExecutionState;
/*
@ -515,6 +541,7 @@ typedef enum TaskPlacementExecutionState
PLACEMENT_EXECUTION_READY,
PLACEMENT_EXECUTION_RUNNING,
PLACEMENT_EXECUTION_FINISHED,
PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION,
PLACEMENT_EXECUTION_FAILED
} TaskPlacementExecutionState;
@ -582,7 +609,6 @@ static void StartDistributedExecution(DistributedExecution *execution);
static void RunLocalExecution(CitusScanState *scanState, DistributedExecution *execution);
static void RunDistributedExecution(DistributedExecution *execution);
static void SequentialRunDistributedExecution(DistributedExecution *execution);
static void FinishDistributedExecution(DistributedExecution *execution);
static void CleanUpSessions(DistributedExecution *execution);
@ -634,6 +660,8 @@ static void PlacementExecutionDone(TaskPlacementExecution *placementExecution,
bool succeeded);
static void ScheduleNextPlacementExecution(TaskPlacementExecution *placementExecution,
bool succeeded);
static bool CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *
placementExecution);
static bool ShouldMarkPlacementsInvalidOnFailure(DistributedExecution *execution);
static void PlacementExecutionReady(TaskPlacementExecution *placementExecution);
static TaskExecutionState TaskExecutionStateMachine(ShardCommandExecution *
@ -1080,7 +1108,11 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
}
else
{
execution->remoteTaskList = execution->remoteAndLocalTaskList;
/*
* Get a shallow copy of the list as we rely on remoteAndLocalTaskList
* across the execution.
*/
execution->remoteTaskList = list_copy(execution->remoteAndLocalTaskList);
}
execution->totalTaskCount = list_length(execution->remoteTaskList);
@ -1630,7 +1662,6 @@ CleanUpSessions(DistributedExecution *execution)
* changing any states in the ConnectionStateMachine.
*
*/
CloseConnection(connection);
}
else if (connection->connectionState == MULTI_CONNECTION_CONNECTED)
@ -1708,8 +1739,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
RowModifyLevel modLevel = execution->modLevel;
List *taskList = execution->remoteTaskList;
int32 localGroupId = GetLocalGroupId();
Task *task = NULL;
foreach_ptr(task, taskList)
{
@ -1849,11 +1878,6 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
*/
placementExecutionReady = false;
}
if (taskPlacement->groupId == localGroupId)
{
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
}
}
}
@ -2020,6 +2044,13 @@ FindOrCreateWorkerPool(DistributedExecution *execution, char *nodeName, int node
workerPool->nodeName = pstrdup(nodeName);
workerPool->nodePort = nodePort;
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode)
{
workerPool->poolToLocalNode =
workerNode->groupId == GetLocalGroupId();
}
/* "open" connections aggressively when there are cached connections */
int nodeConnectionCount = MaxCachedConnectionsPerWorker;
workerPool->maxNewConnectionsPerCycle = Max(1, nodeConnectionCount);
@ -2142,13 +2173,11 @@ SequentialRunDistributedExecution(DistributedExecution *execution)
* executions, so make sure to set it.
*/
MultiShardConnectionType = SEQUENTIAL_CONNECTION;
Task *taskToExecute = NULL;
foreach_ptr(taskToExecute, taskList)
{
/* execute each task one by one */
execution->remoteAndLocalTaskList = list_make1(taskToExecute);
execution->remoteTaskList = execution->remoteAndLocalTaskList;
execution->remoteTaskList = list_make1(taskToExecute);
execution->totalTaskCount = 1;
execution->unfinishedTaskCount = 1;
@ -2201,15 +2230,21 @@ RunDistributedExecution(DistributedExecution *execution)
while (execution->unfinishedTaskCount > 0 && !cancellationReceived)
{
long timeout = NextEventTimeout(execution);
WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList)
{
ManageWorkerPool(workerPool);
}
if (execution->rebuildWaitEventSet)
if (execution->remoteTaskList == NIL)
{
/*
* All the tasks are failed over to the local execution, no need
* to wait for any connection activity.
*/
continue;
}
else if (execution->rebuildWaitEventSet)
{
if (events != NULL)
{
@ -2221,7 +2256,6 @@ RunDistributedExecution(DistributedExecution *execution)
events = NULL;
}
eventSetSize = RebuildWaitEventSet(execution);
events = palloc0(eventSetSize * sizeof(WaitEvent));
}
else if (execution->waitFlagsChanged)
@ -2231,6 +2265,7 @@ RunDistributedExecution(DistributedExecution *execution)
}
/* wait for I/O events */
long timeout = NextEventTimeout(execution);
int eventCount = WaitEventSetWait(execution->waitEventSet, timeout, events,
eventSetSize, WAIT_EVENT_CLIENT_READ);
ProcessWaitEvents(execution, events, eventCount, &cancellationReceived);
@ -2369,7 +2404,6 @@ ManageWorkerPool(WorkerPool *workerPool)
}
int newConnectionCount = CalculateNewConnectionCount(workerPool);
if (newConnectionCount <= 0)
{
return;
@ -2377,6 +2411,46 @@ ManageWorkerPool(WorkerPool *workerPool)
OpenNewConnections(workerPool, newConnectionCount, execution->transactionProperties);
/*
* Cannot establish new connections to the local host, most probably because the
* local node cannot accept new connections (e.g., hit max_connections). Switch
* the tasks to the local execution.
*
* We prefer initiatedConnectionCount over the new connection establishments happen
* in this iteration via OpenNewConnections(). The reason is that it is expected for
* OpenNewConnections() to not open any new connections as long as the connections
* are optional (e.g., the second or later connections in the pool). But, for
* initiatedConnectionCount to be zero, the connection to the local pool should have
* been failed.
*/
int initiatedConnectionCount = list_length(workerPool->sessionList);
if (initiatedConnectionCount == 0)
{
/*
* Only the pools to the local node are allowed to have optional
* connections for the first connection. Hence, initiatedConnectionCount
* could only be zero for poolToLocalNode. For other pools, the connection
* manager would wait until it gets at least one connection.
*/
Assert(workerPool->poolToLocalNode);
WorkerPoolFailed(workerPool);
if (execution->failed)
{
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg(
"could not establish any connections to the node %s:%d "
"when local execution is also disabled.",
workerPool->nodeName,
workerPool->nodePort),
errhint("Enable local execution via SET "
"citus.enable_local_execution TO true;")));
}
return;
}
INSTR_TIME_SET_CURRENT(workerPool->lastConnectionOpenTime);
execution->rebuildWaitEventSet = true;
}
@ -2389,7 +2463,8 @@ ManageWorkerPool(WorkerPool *workerPool)
static bool
HasAnyConnectionFailure(WorkerPool *workerPool)
{
if (workerPool->failed)
if (workerPool->failureState == WORKER_POOL_FAILED ||
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
{
/* connection pool failed */
return true;
@ -2520,7 +2595,6 @@ CalculateNewConnectionCount(WorkerPool *workerPool)
* than the target pool size.
*/
newConnectionCount = Min(newConnectionsForReadyTasks, maxNewConnectionCount);
if (newConnectionCount > 0)
{
/* increase the open rate every cycle (like TCP slow start) */
@ -2557,7 +2631,8 @@ OpenNewConnections(WorkerPool *workerPool, int newConnectionCount,
* throttle connections if citus.max_shared_pool_size reached)
*/
int adaptiveConnectionManagementFlag =
AdaptiveConnectionManagementFlag(list_length(workerPool->sessionList));
AdaptiveConnectionManagementFlag(workerPool->poolToLocalNode,
list_length(workerPool->sessionList));
connectionFlags |= adaptiveConnectionManagementFlag;
/* open a new connection to the worker */
@ -2660,13 +2735,24 @@ CheckConnectionTimeout(WorkerPool *workerPool)
*/
WorkerPoolFailed(workerPool);
/*
* The enforcement is not always erroring out. For example, if a SELECT task
* has two different placements, we'd warn the user, fail the pool and continue
* with the next placement.
*/
if (execution->transactionProperties->errorOnAnyFailure || execution->failed)
if (workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
{
/*
*
* When the pool is failed over to local execution, warning
* the user just creates chatter as the executor is capable of
* finishing the execution.
*/
logLevel = DEBUG1;
}
else if (execution->transactionProperties->errorOnAnyFailure ||
execution->failed)
{
/*
* The enforcement is not always erroring out. For example, if a SELECT task
* has two different placements, we'd warn the user, fail the pool and continue
* with the next placement.
*/
logLevel = ERROR;
}
@ -2758,7 +2844,7 @@ NextEventTimeout(DistributedExecution *execution)
WorkerPool *workerPool = NULL;
foreach_ptr(workerPool, execution->workerList)
{
if (workerPool->failed)
if (workerPool->failureState == WORKER_POOL_FAILED)
{
/* worker pool may have already timed out */
continue;
@ -2974,14 +3060,28 @@ ConnectionStateMachine(WorkerSession *session)
* or WorkerPoolFailed.
*/
if (execution->failed ||
execution->transactionProperties->errorOnAnyFailure)
(execution->transactionProperties->errorOnAnyFailure &&
workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL))
{
/* a task has failed due to this connection failure */
ReportConnectionError(connection, ERROR);
}
else if (workerPool->activeConnectionCount > 0 ||
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
{
/*
* We already have active connection(s) to the node, and the
* executor is capable of using those connections to successfully
* finish the execution. So, there is not much value in warning
* the user.
*
* Similarly when the pool is failed over to local execution, warning
* the user just creates chatter.
*/
ReportConnectionError(connection, DEBUG1);
}
else
{
/* can continue with the remaining nodes */
ReportConnectionError(connection, WARNING);
}
@ -3558,6 +3658,17 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
if (querySent)
{
session->commandsSent++;
if (workerPool->poolToLocalNode)
{
/*
* As we started remote execution to the local node,
* we cannot switch back to local execution as that
* would cause self-deadlocks and breaking
* read-your-own-writes consistency.
*/
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
}
}
return querySent;
@ -4004,7 +4115,8 @@ WorkerPoolFailed(WorkerPool *workerPool)
* A pool cannot fail multiple times, the necessary actions
* has already be taken, so bail out.
*/
if (workerPool->failed)
if (workerPool->failureState == WORKER_POOL_FAILED ||
workerPool->failureState == WORKER_POOL_FAILED_OVER_TO_LOCAL)
{
return;
}
@ -4033,7 +4145,11 @@ WorkerPoolFailed(WorkerPool *workerPool)
/* we do not want more connections in this pool */
workerPool->readyTaskCount = 0;
workerPool->failed = true;
if (workerPool->failureState != WORKER_POOL_FAILED_OVER_TO_LOCAL)
{
/* we prefer not to override WORKER_POOL_FAILED_OVER_TO_LOCAL */
workerPool->failureState = WORKER_POOL_FAILED;
}
/*
* The reason is that when replication factor is > 1 and we are performing
@ -4050,7 +4166,8 @@ WorkerPoolFailed(WorkerPool *workerPool)
foreach_ptr(pool, workerList)
{
/* failed pools or pools without any connection attempts ignored */
if (pool->failed || INSTR_TIME_IS_ZERO(pool->poolStartTime))
if (pool->failureState == WORKER_POOL_FAILED ||
INSTR_TIME_IS_ZERO(pool->poolStartTime))
{
continue;
}
@ -4126,11 +4243,20 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
return;
}
/* mark the placement execution as finished */
if (succeeded)
{
/* mark the placement execution as finished */
placementExecution->executionState = PLACEMENT_EXECUTION_FINISHED;
}
else if (CanFailoverPlacementExecutionToLocalExecution(placementExecution))
{
/*
* The placement execution can be done over local execution, so it is a soft
* failure for now.
*/
placementExecution->executionState =
PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION;
}
else
{
if (ShouldMarkPlacementsInvalidOnFailure(execution))
@ -4174,13 +4300,36 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
* Update unfinishedTaskCount only when state changes from not finished to
* finished or failed state.
*/
TaskExecutionState newExecutionState = TaskExecutionStateMachine(
shardCommandExecution);
TaskExecutionState newExecutionState =
TaskExecutionStateMachine(shardCommandExecution);
if (newExecutionState == TASK_EXECUTION_FINISHED)
{
execution->unfinishedTaskCount--;
return;
}
else if (newExecutionState == TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION)
{
execution->unfinishedTaskCount--;
/* move the task to the local execution */
Task *task = shardCommandExecution->task;
execution->localTaskList = lappend(execution->localTaskList, task);
/* remove the task from the remote execution list */
execution->remoteTaskList = list_delete_ptr(execution->remoteTaskList, task);
/*
* As we decided to failover this task to local execution, we cannot
* allow remote execution to this pool during this distributedExecution.
*/
SetLocalExecutionStatus(LOCAL_EXECUTION_REQUIRED);
workerPool->failureState = WORKER_POOL_FAILED_OVER_TO_LOCAL;
ereport(DEBUG4, (errmsg("Task %d execution is failed over to local execution",
task->taskId)));
return;
}
else if (newExecutionState == TASK_EXECUTION_FAILED)
{
execution->unfinishedTaskCount--;
@ -4199,6 +4348,60 @@ PlacementExecutionDone(TaskPlacementExecution *placementExecution, bool succeede
}
/*
* CanFailoverPlacementExecutionToLocalExecution returns true if the input
* TaskPlacementExecution can be fail overed to local execution. In other words,
* the execution can be deferred to local execution.
*/
static bool
CanFailoverPlacementExecutionToLocalExecution(TaskPlacementExecution *placementExecution)
{
if (!EnableLocalExecution)
{
/* the user explicitly disabled local execution */
return false;
}
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{
/*
* If the current transaction accessed the local node over a connection
* then we can't use local execution because of visibility issues.
*/
return false;
}
WorkerPool *workerPool = placementExecution->workerPool;
if (!workerPool->poolToLocalNode)
{
/* we can only fail over tasks to local execution for local pools */
return false;
}
if (workerPool->activeConnectionCount > 0)
{
/*
* The pool has already active connections, the executor is capable
* of using those active connections. So, no need to failover
* to the local execution.
*/
return false;
}
if (placementExecution->assignedSession != NULL)
{
/*
* If the placement execution has been assigned to a specific session,
* it has to be executed over that session. Otherwise, it would cause
* self-deadlocks and break read-your-own-writes consistency.
*/
return false;
}
return true;
}
/*
* ScheduleNextPlacementExecution is triggered if the query needs to be
* executed on any or all placements in order and there is a placement on
@ -4361,6 +4564,7 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
PlacementExecutionOrder executionOrder = shardCommandExecution->executionOrder;
int donePlacementCount = 0;
int failedPlacementCount = 0;
int failedOverPlacementCount = 0;
int placementCount = 0;
int placementExecutionIndex = 0;
int placementExecutionCount = shardCommandExecution->placementExecutionCount;
@ -4386,6 +4590,10 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
{
failedPlacementCount++;
}
else if (executionState == PLACEMENT_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION)
{
failedOverPlacementCount++;
}
placementCount++;
}
@ -4402,6 +4610,21 @@ TaskExecutionStateMachine(ShardCommandExecution *shardCommandExecution)
{
currentTaskExecutionState = TASK_EXECUTION_FINISHED;
}
else if (failedOverPlacementCount + donePlacementCount + failedPlacementCount ==
placementCount)
{
/*
* For any given task, we could have 3 end states:
* - "donePlacementCount" indicates the successful placement executions
* - "failedPlacementCount" indicates the failed placement executions
* - "failedOverPlacementCount" indicates the placement executions that
* are failed when using remote execution due to connection errors,
* but there is still a possibility of being successful via
* local execution. So, for now they are considered as soft
* errors.
*/
currentTaskExecutionState = TASK_EXECUTION_FAILOVER_TO_LOCAL_EXECUTION;
}
else
{
currentTaskExecutionState = TASK_EXECUTION_NOT_FINISHED;

View File

@ -113,6 +113,7 @@ static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSourc
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra);
static const char * MaxSharedPoolSizeGucShowHook(void);
static const char * LocalPoolSizeGucShowHook(void);
static bool StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource
source);
static void CitusAuthHook(Port *port, int status);
@ -693,6 +694,21 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
DefineCustomIntVariable(
"citus.local_shared_pool_size",
gettext_noop(
"Sets the maximum number of connections allowed for the shards on the "
"local node across all the backends from this node. Setting to -1 disables "
"connections throttling. Setting to 0 makes it auto-adjust, meaning "
"equal to the half of max_connections on the coordinator."),
gettext_noop("As a rule of thumb, the value should be at most equal to the "
"max_connections on the local node."),
&LocalSharedPoolSize,
0, -1, INT_MAX,
PGC_SIGHUP,
GUC_SUPERUSER_ONLY,
NULL, NULL, LocalPoolSizeGucShowHook);
DefineCustomBoolVariable(
"citus.log_multi_join_order",
gettext_noop("Logs the distributed join order to the server log."),
@ -1764,6 +1780,21 @@ MaxSharedPoolSizeGucShowHook(void)
}
/*
* LocalPoolSizeGucShowHook overrides the value that is shown to the
* user when the default value has not been set.
*/
static const char *
LocalPoolSizeGucShowHook(void)
{
StringInfo newvalue = makeStringInfo();
appendStringInfo(newvalue, "%d", GetLocalSharedPoolSize());
return (const char *) newvalue->data;
}
static bool
StatisticsCollectionGucCheckHook(bool *newval, void **extra, GucSource source)
{

View File

@ -13,19 +13,23 @@
#define ADJUST_POOLSIZE_AUTOMATICALLY 0
#define DISABLE_CONNECTION_THROTTLING -1
#define DISABLE_REMOTE_CONNECTIONS_FOR_LOCAL_QUERIES -1
extern int MaxSharedPoolSize;
extern int LocalSharedPoolSize;
extern void InitializeSharedConnectionStats(void);
extern void WaitForSharedConnection(void);
extern void WakeupWaiterBackendsForSharedConnection(void);
extern int GetMaxSharedPoolSize(void);
extern int GetLocalSharedPoolSize(void);
extern bool TryToIncrementSharedConnectionCounter(const char *hostname, int port);
extern void WaitLoopForSharedConnection(const char *hostname, int port);
extern void DecrementSharedConnectionCounter(const char *hostname, int port);
extern void IncrementSharedConnectionCounter(const char *hostname, int port);
extern int AdaptiveConnectionManagementFlag(int activeConnectionCount);
extern int AdaptiveConnectionManagementFlag(bool connectToLocalNode, int
activeConnectionCount);
#endif /* SHARED_CONNECTION_STATS_H */

View File

@ -81,11 +81,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_
(1 row)
BEGIN;
SET LOCAL client_min_messages TO DEBUG1;
CREATE TABLE distributed_result_info AS
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
NATURAL JOIN pg_dist_node;
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
DEBUG: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
SELECT * FROM distributed_result_info ORDER BY resultId;

View File

@ -0,0 +1,120 @@
CREATE SCHEMA failure_failover_to_local_execution;
SET search_path TO failure_failover_to_local_execution;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.next_shard_id TO 1980000;
-- on Citus-mx, we can have
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
SELECT create_distributed_table('failover_to_local', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO failure_failover_to_local_execution;
-- we don't want any cached connections
SET citus.max_cached_conns_per_worker to 0;
INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
-- even if the connection establishment fails, Citus can
-- failover to local exection
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG1;
SELECT count(*) FROM failover_to_local;
DEBUG: could not establish any connections to the node localhost:xxxxx after 400 ms
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true
count
---------------------------------------------------------------------
21
(1 row)
RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
-- if the remote query execution fails, Citus
-- does not try to fallback to local execution
SELECT key / 0 FROM failover_to_local;
ERROR: division by zero
CONTEXT: while executing command on localhost:xxxxx
-- if the local execution is disabled, Citus does
-- not try to fallback to local execution
SET citus.enable_local_execution TO false;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.log_local_commands TO ON;
SELECT count(*) FROM failover_to_local;
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
RESET citus.enable_local_execution;
-- even if we are on a multi-shard command, Citus can
-- failover to the local execution if no connection attempts succeed
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SET citus.log_remote_commands TO ON;
SELECT count(*) FROM failover_to_local;
NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980001 failover_to_local WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980003 failover_to_local WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980000 failover_to_local WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM failure_failover_to_local_execution.failover_to_local_1980002 failover_to_local WHERE true
count
---------------------------------------------------------------------
21
(1 row)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA failure_failover_to_local_execution CASCADE;

View File

@ -48,3 +48,42 @@ BEGIN
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
\c - - - :worker_2_port
-- Add some helper functions for sending commands to mitmproxy
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
DECLARE
command ALIAS FOR $1;
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES (command);
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
BEGIN
PERFORM citus.mitmproxy('recorder.reset()');
RETURN; -- return void
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.dump_network_traffic()
RETURNS TABLE(conn int, source text, message text) AS $$
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (
conn int, source text, message text
) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;

View File

@ -49,7 +49,7 @@ SELECT create_reference_table('ref');
(1 row)
CREATE TABLE local(c int, d int);
CREATE TABLE public.another_schema_table(a int);
CREATE TABLE public.another_schema_table(a int, b int);
SELECT create_distributed_table('public.another_schema_table', 'a');
create_distributed_table
---------------------------------------------------------------------
@ -265,37 +265,37 @@ BEGIN;
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- single shard insert with citus_table_size
-- single shard insert with citus_table_size
INSERT INTO test_citus_size_func VALUES (3);
SELECT citus_table_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- multi shard modification with citus_table_size
-- multi shard modification with citus_table_size
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
SELECT citus_table_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- single shard insert with citus_relation_size
-- single shard insert with citus_relation_size
INSERT INTO test_citus_size_func VALUES (3);
SELECT citus_relation_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- multi shard modification with citus_relation_size
-- multi shard modification with citus_relation_size
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
SELECT citus_relation_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- single shard insert with citus_total_relation_size
-- single shard insert with citus_total_relation_size
INSERT INTO test_citus_size_func VALUES (3);
SELECT citus_total_relation_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
ROLLBACK;
BEGIN;
-- multi shard modification with citus_total_relation_size
-- multi shard modification with citus_total_relation_size
INSERT INTO test_citus_size_func SELECT * FROM test_citus_size_func;
SELECT citus_total_relation_size('test_citus_size_func');
ERROR: citus size functions cannot be called in transaction blocks which contain multi-shard data modifications
@ -1097,9 +1097,9 @@ SELECT pg_catalog.get_all_active_client_backend_count();
(1 row)
BEGIN;
SET citus.shard_count TO 32;
SET citus.force_max_query_parallelization TO ON;
SET citus.enable_local_execution TO false;
SET LOCAL citus.shard_count TO 32;
SET LOCAL citus.force_max_query_parallelization TO ON;
SET LOCAL citus.enable_local_execution TO false;
CREATE TABLE test (a int);
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('test', 'a');
@ -1122,11 +1122,101 @@ BEGIN;
(1 row)
ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
SET citus.force_max_query_parallelization TO false;
SET citus.log_remote_commands TO ON;
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SET citus.executor_slow_start_interval TO 10;
SELECT count(*) from another_schema_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630509 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630510 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630511 another_schema_table WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.another_schema_table_90630512 another_schema_table WHERE true
count
---------------------------------------------------------------------
0
(1 row)
UPDATE another_schema_table SET b = b;
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630509 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630510 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630511 another_schema_table SET b = b
NOTICE: executing the command locally: UPDATE single_node.another_schema_table_90630512 another_schema_table SET b = b
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
-- COPY command
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE (a IS NOT NULL)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE (a IS NOT NULL)
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630509_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630509_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630510_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630510_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630511_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630511_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_90630512_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_90630512_to','SELECT b AS a, a AS b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true',0,'hash','{-2147483648,-1073741824,0,1073741824}'::text[],'{-1073741825,-1,1073741823,2147483647}'::text[],true) WHERE rows_written > 0
-- multi-row INSERTs
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630509 AS citus_table_alias (a, b) VALUES (1,1), (5,5)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630510 AS citus_table_alias (a, b) VALUES (3,3), (4,4), (7,7)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) VALUES (6,6)
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) VALUES (2,2)
-- intermediate results
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630509 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630510 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '1000'::bigint
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
count
---------------------------------------------------------------------
7
(1 row)
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;
SELECT count(*) from another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
UPDATE another_schema_table SET b = b;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled.
HINT: Enable local execution via SET citus.enable_local_execution TO true;
-- set the values to originals back
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------

View File

@ -35,6 +35,9 @@ test: failure_multi_row_insert
test: failure_mx_metadata_sync
test: failure_connection_establishment
# this test syncs metadata to the workers
test: failure_failover_to_local_execution
# test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak

View File

@ -50,6 +50,7 @@ ROLLBACk;
-- with failure, results from 100802 should be retried and succeed on 57637
SELECT citus.mitmproxy('conn.onQuery(query="worker_partition_query_result.*test_from_100802").kill()');
BEGIN;
SET LOCAL client_min_messages TO DEBUG1;
CREATE TABLE distributed_result_info AS
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')

View File

@ -0,0 +1,58 @@
CREATE SCHEMA failure_failover_to_local_execution;
SET search_path TO failure_failover_to_local_execution;
SELECT citus.mitmproxy('conn.allow()');
SET citus.next_shard_id TO 1980000;
-- on Citus-mx, we can have
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
CREATE TABLE failover_to_local (key int PRIMARY KEY, value varchar(10));
SELECT create_distributed_table('failover_to_local', 'key');
\c - - - :worker_2_port
SET search_path TO failure_failover_to_local_execution;
-- we don't want any cached connections
SET citus.max_cached_conns_per_worker to 0;
INSERT INTO failover_to_local SELECT i, i::text FROM generate_series(0,20)i;
-- even if the connection establishment fails, Citus can
-- failover to local exection
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
SET citus.log_local_commands TO ON;
SET client_min_messages TO DEBUG1;
SELECT count(*) FROM failover_to_local;
RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()');
-- if the remote query execution fails, Citus
-- does not try to fallback to local execution
SELECT key / 0 FROM failover_to_local;
-- if the local execution is disabled, Citus does
-- not try to fallback to local execution
SET citus.enable_local_execution TO false;
SELECT citus.mitmproxy('conn.delay(500)');
SET citus.log_local_commands TO ON;
SELECT count(*) FROM failover_to_local;
SELECT citus.mitmproxy('conn.allow()');
RESET citus.enable_local_execution;
-- even if we are on a multi-shard command, Citus can
-- failover to the local execution if no connection attempts succeed
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
SET citus.log_remote_commands TO ON;
SELECT count(*) FROM failover_to_local;
SELECT citus.mitmproxy('conn.allow()');
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA failure_failover_to_local_execution CASCADE;

View File

@ -47,3 +47,47 @@ BEGIN
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
\c - - - :worker_2_port
-- Add some helper functions for sending commands to mitmproxy
CREATE FUNCTION citus.mitmproxy(text) RETURNS TABLE(result text) AS $$
DECLARE
command ALIAS FOR $1;
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (res text) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES (command);
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.clear_network_traffic() RETURNS void AS $$
BEGIN
PERFORM citus.mitmproxy('recorder.reset()');
RETURN; -- return void
END;
$$ LANGUAGE plpgsql;
CREATE FUNCTION citus.dump_network_traffic()
RETURNS TABLE(conn int, source text, message text) AS $$
BEGIN
CREATE TEMPORARY TABLE mitmproxy_command (command text) ON COMMIT DROP;
CREATE TEMPORARY TABLE mitmproxy_result (
conn int, source text, message text
) ON COMMIT DROP;
INSERT INTO mitmproxy_command VALUES ('recorder.dump()');
EXECUTE format('COPY mitmproxy_command TO %L', current_setting('citus.mitmfifo'));
EXECUTE format('COPY mitmproxy_result FROM %L', current_setting('citus.mitmfifo'));
RETURN QUERY SELECT * FROM mitmproxy_result;
END;
$$ LANGUAGE plpgsql;

View File

@ -30,7 +30,7 @@ CREATE TABLE ref(a int, b int);
SELECT create_reference_table('ref');
CREATE TABLE local(c int, d int);
CREATE TABLE public.another_schema_table(a int);
CREATE TABLE public.another_schema_table(a int, b int);
SELECT create_distributed_table('public.another_schema_table', 'a');
-- Confirm the basics work
@ -611,9 +611,9 @@ SET search_path TO single_node;
SELECT count(*) from should_commit;
SELECT pg_catalog.get_all_active_client_backend_count();
BEGIN;
SET citus.shard_count TO 32;
SET citus.force_max_query_parallelization TO ON;
SET citus.enable_local_execution TO false;
SET LOCAL citus.shard_count TO 32;
SET LOCAL citus.force_max_query_parallelization TO ON;
SET LOCAL citus.enable_local_execution TO false;
CREATE TABLE test (a int);
SET citus.shard_replication_factor TO 1;
@ -624,11 +624,54 @@ BEGIN;
SELECT pg_catalog.get_all_active_client_backend_count();
ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
SET citus.force_max_query_parallelization TO false;
SET citus.log_remote_commands TO ON;
ALTER SYSTEM SET citus.local_shared_pool_size TO -1;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
SET citus.executor_slow_start_interval TO 10;
SELECT count(*) from another_schema_table;
UPDATE another_schema_table SET b = b;
-- INSERT .. SELECT pushdown and INSERT .. SELECT via repartitioning
-- not that we ignore INSERT .. SELECT via coordinator as it relies on
-- COPY command
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
-- multi-row INSERTs
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
-- intermediate results
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;
SELECT count(*) from another_schema_table;
UPDATE another_schema_table SET b = b;
INSERT INTO another_schema_table SELECT * FROM another_schema_table;
INSERT INTO another_schema_table SELECT b::int, a::int FROM another_schema_table;
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7);
-- set the values to originals back
ALTER SYSTEM RESET citus.max_cached_conns_per_worker;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.recover_2pc_interval;
ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
ALTER SYSTEM RESET citus.local_shared_pool_size;
SELECT pg_reload_conf();
-- suppress notices