Implementation for asycn FinishConnectionListEstablishment (#2584)

pull/2638/head
Nils Dijk 2019-03-22 17:30:42 +01:00 committed by GitHub
parent 7a094edc4c
commit feaac69769
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 647 additions and 186 deletions

View File

@ -9,11 +9,7 @@
*/
#include "postgres.h"
#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#include "pgstat.h"
#include "libpq-fe.h"
@ -23,6 +19,7 @@
#include "commands/dbcommands.h"
#include "distributed/connection_management.h"
#include "distributed/errormessage.h"
#include "distributed/memutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/hash_helpers.h"
#include "distributed/placement_connection.h"
@ -47,6 +44,30 @@ static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isC
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections);
/* types for async connection management */
enum MultiConnectionPhase
{
MULTI_CONNECTION_PHASE_CONNECTING,
MULTI_CONNECTION_PHASE_CONNECTED,
MULTI_CONNECTION_PHASE_ERROR,
};
typedef struct MultiConnectionState
{
enum MultiConnectionPhase phase;
MultiConnection *connection;
PostgresPollingStatusType pollmode;
} MultiConnectionState;
/* helper functions for async connection management */
static bool MultiConnectionStatePoll(MultiConnectionState *connectionState);
static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections,
int *waitCount);
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
static void CloseNotReadyMultiConnectionStates(List *connectionStates);
static uint32 MultiConnectionStateEventMask(MultiConnectionState *connectionState);
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
@ -459,183 +480,373 @@ ShutdownConnection(MultiConnection *connection)
/*
* FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment.
* The function iterates over the multiConnectionList and finishes the connection
* establishment for each multi connection.
* MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the
* connection establishment. The return value of this function indicates if the
* MultiConnectionState has been changed, which could require a change to the WaitEventSet
*/
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
ListCell *multiConnectionCell = NULL;
foreach(multiConnectionCell, multiConnectionList)
{
MultiConnection *multiConnection = (MultiConnection *) lfirst(
multiConnectionCell);
/* TODO: consider making connection establishment fully in parallel */
FinishConnectionEstablishment(multiConnection);
}
}
/*
* Synchronously finish connection establishment of an individual connection.
*
* TODO: Replace with variant waiting for multiple connections.
*/
void
FinishConnectionEstablishment(MultiConnection *connection)
{
static int checkIntervalMS = 200;
/*
* Loop until connection is established, or failed (possibly just timed
* out).
*/
while (true)
static bool
MultiConnectionStatePoll(MultiConnectionState *connectionState)
{
MultiConnection *connection = connectionState->connection;
ConnStatusType status = PQstatus(connection->pgConn);
PostgresPollingStatusType pollmode;
PostgresPollingStatusType oldPollmode = connectionState->pollmode;
Assert(connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING);
if (status == CONNECTION_OK)
{
return;
connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
return true;
}
/* FIXME: retries? */
if (status == CONNECTION_BAD)
else if (status == CONNECTION_BAD)
{
return;
/* FIXME: retries? */
connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
return true;
}
else
{
connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTING;
}
pollmode = PQconnectPoll(connection->pgConn);
connectionState->pollmode = PQconnectPoll(connection->pgConn);
/*
* FIXME: Do we want to add transparent retry support here?
*/
if (pollmode == PGRES_POLLING_FAILED)
if (connectionState->pollmode == PGRES_POLLING_FAILED)
{
return;
connectionState->phase = MULTI_CONNECTION_PHASE_ERROR;
return true;
}
else if (pollmode == PGRES_POLLING_OK)
else if (connectionState->pollmode == PGRES_POLLING_OK)
{
return;
connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED;
return true;
}
else
{
Assert(pollmode == PGRES_POLLING_WRITING ||
pollmode == PGRES_POLLING_READING);
Assert(connectionState->pollmode == PGRES_POLLING_WRITING ||
connectionState->pollmode == PGRES_POLLING_READING);
}
/* Loop, to handle poll() being interrupted by signals (EINTR) */
while (true)
{
int pollResult = 0;
return (oldPollmode != connectionState->pollmode) ? true : false;
}
/* we use poll(2) if available, otherwise select(2) */
#ifdef HAVE_POLL
struct pollfd pollFileDescriptor;
pollFileDescriptor.fd = PQsocket(connection->pgConn);
if (pollmode == PGRES_POLLING_READING)
/*
* EventSetSizeForConnectionList calculates the space needed for a WaitEventSet based on a
* list of connections.
*/
inline static int
EventSetSizeForConnectionList(List *connections)
{
pollFileDescriptor.events = POLLIN;
/* we need space for 2 postgres events in the waitset on top of the connections */
return list_length(connections) + 2;
}
/*
* WaitEventSetFromMultiConnectionStates takes a list of MultiConnectionStates and adds
* all sockets of the connections that are still in the connecting phase to a WaitSet,
* taking into account the maximum number of connections that could be added in total to
* a WaitSet.
*
* waitCount populates the number of connections added to the WaitSet in case when a
* non-NULL pointer is provided.
*/
static WaitEventSet *
WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount)
{
WaitEventSet *waitEventSet = NULL;
ListCell *connectionCell = NULL;
const int eventSetSize = EventSetSizeForConnectionList(connections);
int numEventsAdded = 0;
if (waitCount)
{
*waitCount = 0;
}
waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize);
EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet),
waitEventSet);
/*
* Put the wait events for the signal latch and postmaster death at the end such that
* event index + pendingConnectionsStartIndex = the connection index in the array.
*/
AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL);
AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
numEventsAdded += 2;
foreach(connectionCell, connections)
{
MultiConnectionState *connectionState = (MultiConnectionState *) lfirst(
connectionCell);
int socket = 0;
int eventMask = 0;
if (numEventsAdded >= eventSetSize)
{
/* room for events to schedule is exhausted */
break;
}
if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
{
/* connections that are not connecting will not be added to the WaitSet */
continue;
}
socket = PQsocket(connectionState->connection->pgConn);
eventMask = MultiConnectionStateEventMask(connectionState);
AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, connectionState);
numEventsAdded++;
if (waitCount)
{
*waitCount = *waitCount + 1;
}
}
return waitEventSet;
}
/*
* MultiConnectionStateEventMask returns the eventMask use by the WaitEventSet for the
* for the socket associated with the connection based on the pollmode PQconnectPoll
* returned in its last invocation
*/
static uint32
MultiConnectionStateEventMask(MultiConnectionState *connectionState)
{
uint32 eventMask = 0;
if (connectionState->pollmode == PGRES_POLLING_READING)
{
eventMask |= WL_SOCKET_READABLE;
}
else
{
pollFileDescriptor.events = POLLOUT;
eventMask |= WL_SOCKET_WRITEABLE;
}
pollFileDescriptor.revents = 0;
return eventMask;
}
/*
* Only sleep for a limited amount of time, so we can react to
* interrupts in time, even if the platform doesn't interrupt
* poll() after signal arrival.
* FinishConnectionListEstablishment takes a list of MultiConnection and finishes the
* connections establishment asynchronously for all connections not already fully
* connected.
*/
pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS);
#else /* !HAVE_POLL */
fd_set readFileDescriptorSet;
fd_set writeFileDescriptorSet;
fd_set exceptionFileDescriptorSet;
int selectTimeoutUS = checkIntervalMS * 1000;
struct timeval selectTimeout = { 0, selectTimeoutUS };
int selectFileDescriptor = PQsocket(connection->pgConn);
FD_ZERO(&readFileDescriptorSet);
FD_ZERO(&writeFileDescriptorSet);
FD_ZERO(&exceptionFileDescriptorSet);
if (pollmode == PGRES_POLLING_READING)
void
FinishConnectionListEstablishment(List *multiConnectionList)
{
FD_SET(selectFileDescriptor, &readFileDescriptorSet);
}
else if (pollmode == PGRES_POLLING_WRITING)
{
FD_SET(selectFileDescriptor, &writeFileDescriptorSet);
}
const TimestampTz connectionStart = GetCurrentTimestamp();
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart,
NodeConnectionTimeout);
List *connectionStates = NULL;
ListCell *multiConnectionCell = NULL;
pollResult = (select) (selectFileDescriptor + 1, &readFileDescriptorSet,
&writeFileDescriptorSet, &exceptionFileDescriptorSet,
&selectTimeout);
#endif /* HAVE_POLL */
WaitEventSet *waitEventSet = NULL;
bool waitEventSetRebuild = true;
int waitCount = 0;
WaitEvent *events = NULL;
MemoryContext oldContext = NULL;
if (pollResult == 0)
foreach(multiConnectionCell, multiConnectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(multiConnectionCell);
MultiConnectionState *connectionState = palloc0(sizeof(MultiConnectionState));
connectionState->connection = connection;
/*
* Timeout exceeded. Two things to do:
* - check whether any interrupts arrived and handle them
* - check whether establishment for connection already has
* lasted for too long, stop waiting if so.
* before we can build the waitset to wait for asynchronous IO we need to know the
* pollmode to use for the sockets. This is populated by executing one round of
* PQconnectPoll. This updates the MultiConnectionState struct with its phase and
* its next poll mode.
*/
MultiConnectionStatePoll(connectionState);
connectionStates = lappend(connectionStates, connectionState);
if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING)
{
waitCount++;
}
}
/* prepare space for socket events */
events = (WaitEvent *) palloc0(EventSetSizeForConnectionList(connectionStates) *
sizeof(WaitEvent));
/*
* for high connection counts with lots of round trips we could potentially have a lot
* of (big) waitsets that we'd like to clean right after we have used them. To do this
* we switch to a temporary memory context for this loop which gets reset at the end
*/
oldContext = MemoryContextSwitchTo(
AllocSetContextCreate(CurrentMemoryContext,
"connection establishment temporary context",
ALLOCSET_DEFAULT_SIZES));
while (waitCount > 0)
{
long timeout = DeadlineTimestampTzToTimeout(deadline);
int eventCount = 0;
int eventIndex = 0;
if (waitEventSetRebuild)
{
MemoryContextReset(CurrentMemoryContext);
waitEventSet = WaitEventSetFromMultiConnectionStates(connectionStates,
&waitCount);
waitEventSetRebuild = false;
if (waitCount <= 0)
{
break;
}
}
eventCount = WaitEventSetWait(waitEventSet, timeout, events, waitCount,
WAIT_EVENT_CLIENT_READ);
for (eventIndex = 0; eventIndex < eventCount; eventIndex++)
{
WaitEvent *event = &events[eventIndex];
bool connectionStateChanged = false;
MultiConnectionState *connectionState =
(MultiConnectionState *) event->user_data;
if (event->events & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (event->events & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
if (TimestampDifferenceExceeds(connection->connectionStart,
GetCurrentTimestamp(),
NodeConnectionTimeout))
if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending))
{
/*
* because we can't break from 2 loops easily we need to not forget to
* reset the memory context
*/
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
return;
}
continue;
}
connectionStateChanged = MultiConnectionStatePoll(connectionState);
if (connectionStateChanged)
{
if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
{
/* we cannot stop waiting for connection, so rebuild the event set */
waitEventSetRebuild = true;
}
else
{
/* connection state changed, reset the event mask */
uint32 eventMask = MultiConnectionStateEventMask(connectionState);
ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL);
}
}
}
if (eventCount == 0)
{
/*
* timeout has occured on waitset, double check the timeout since
* connectionStart and if passed close all non-finished connections
*/
TimestampTz now = GetCurrentTimestamp();
if (TimestampDifferenceExceeds(connectionStart, now, NodeConnectionTimeout))
{
/*
* showing as a warning, can't be an error. In some cases queries can
* proceed with only some of the connections being fully established.
* Queries that can't will error then and there
*/
ereport(WARNING, (errmsg("could not establish connection after %u ms",
NodeConnectionTimeout)));
/*
* Close all connections that have not been fully established.
*/
CloseNotReadyMultiConnectionStates(connectionStates);
break;
}
}
}
MemoryContextDelete(MemoryContextSwitchTo(oldContext));
}
/*
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse
* before the deadline provided as an argument will be reached. The outcome can be used to
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
*/
static long
DeadlineTimestampTzToTimeout(TimestampTz deadline)
{
long secs = 0;
int msecs = 0;
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs);
return secs * 1000 + msecs / 1000;
}
/*
* CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's
* tracked in the MultiConnectionState list passed in, only if the connection is not yet
* fully established.
*
* This function removes the pointer to the MultiConnection data after the Connections are
* closed since they should not be used anymore.
*/
static void
CloseNotReadyMultiConnectionStates(List *connectionStates)
{
ListCell *connectionStateCell = NULL;
foreach(connectionStateCell, connectionStates)
{
MultiConnectionState *connectionState = lfirst(connectionStateCell);
MultiConnection *connection = connectionState->connection;
if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING)
{
continue;
}
/* close connection, otherwise we take up resource on the other side */
PQfinish(connection->pgConn);
connection->pgConn = NULL;
break;
}
}
else if (pollResult > 0)
{
/*
* IO possible, continue connection establishment. We could
* check for timeouts here as well, but if there's progress
* there seems little point.
* Close connections on timeout in FinishConnectionListEstablishment
* Synchronously finish connection establishment of an individual connection.
* This function is a convenience wrapped around FinishConnectionListEstablishment.
*/
break;
}
else
void
FinishConnectionEstablishment(MultiConnection *connection)
{
int errorCode = errno;
#ifdef WIN32
errorCode = WSAGetLastError();
#endif
if (errorCode == EINTR)
{
/* Retrying, signal interrupted. So check. */
CHECK_FOR_INTERRUPTS();
}
else
{
/*
* We ERROR here, instead of just returning a failed
* connection, because this shouldn't happen, and indicates a
* programming error somewhere, not a network etc. issue.
*/
ereport(ERROR, (errcode_for_socket_access(),
errmsg("poll()/select() failed: %m")));
}
}
}
}
FinishConnectionListEstablishment(list_make1(connection));
}

View File

@ -294,6 +294,11 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
int64 rowCount = 0;
int64 colCount = 0;
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
continue;
}
result = GetRemoteCommandResult(connection, raiseInterrupts);
if (!IsResponseOK(result))
{

View File

@ -11,6 +11,7 @@
#include "postgres.h"
#include "distributed/connection_management.h"
#include "distributed/memutils.h"
#include "distributed/worker_protocol.h"
#include "libpq/libpq.h"
#include "miscadmin.h"
@ -48,8 +49,6 @@ static void GloballyReloadConfig(void);
#ifdef USE_SSL
/* forward declaration of functions used when compiled with ssl */
static void EnsureReleaseOpenSSLResource(MemoryContextCallbackFunction callback,
void *arg);
static bool ShouldUseAutoSSL(void);
static bool CreateCertificatesWhenNeeded(void);
static EVP_PKEY * GeneratePrivateKey(void);
@ -183,20 +182,6 @@ GloballyReloadConfig()
#ifdef USE_SSL
/*
* EnsureReleaseOpenSSLResource registers the openssl allocated resource to be freed when the
* current memory context is reset.
*/
static void
EnsureReleaseOpenSSLResource(MemoryContextCallbackFunction callback, void *arg)
{
MemoryContextCallback *cb = MemoryContextAllocZero(CurrentMemoryContext,
sizeof(MemoryContextCallback));
cb->func = callback;
cb->arg = arg;
MemoryContextRegisterResetCallback(CurrentMemoryContext, cb);
}
/*
* ShouldUseAutoSSL checks if citus should enable ssl based on the connection settings it
@ -258,7 +243,7 @@ CreateCertificatesWhenNeeded()
"correctly.")));
return false;
}
EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&SSL_CTX_free),
EnsureReleaseResource((MemoryContextCallbackFunction) (&SSL_CTX_free),
sslContext);
/*
@ -315,11 +300,11 @@ GeneratePrivateKey()
{
ereport(ERROR, (errmsg("unable to allocate space for private key")));
}
EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&EVP_PKEY_free),
EnsureReleaseResource((MemoryContextCallbackFunction) (&EVP_PKEY_free),
privateKey);
exponent = BN_new();
EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&BN_free), exponent);
EnsureReleaseResource((MemoryContextCallbackFunction) (&BN_free), exponent);
/* load the exponent to use for the generation of the key */
success = BN_set_word(exponent, RSA_F4);
@ -361,7 +346,7 @@ CreateCertificate(EVP_PKEY *privateKey)
{
ereport(ERROR, (errmsg("unable to allocate space for the x509 certificate")));
}
EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&X509_free),
EnsureReleaseResource((MemoryContextCallbackFunction) (&X509_free),
certificate);
/* Set the serial number. */

View File

@ -0,0 +1,29 @@
/*
* memutils.h
* utility functions to help with postgres' memory management primitives
*/
#ifndef CITUS_MEMUTILS_H
#define CITUS_MEMUTILS_H
#include "utils/palloc.h"
/*
* EnsureReleaseResource is an abstraction on MemoryContextRegisterResetCallback that
* allocates the space for the MemoryContextCallback and registers it to the current
* MemoryContext, ensuring the call of callback with arg as its argument during either the
* Reset of Delete of a MemoryContext.
*/
static inline void
EnsureReleaseResource(MemoryContextCallbackFunction callback, void *arg)
{
MemoryContextCallback *cb = MemoryContextAllocZero(CurrentMemoryContext,
sizeof(MemoryContextCallback));
cb->func = callback;
cb->arg = arg;
MemoryContextRegisterResetCallback(CurrentMemoryContext, cb);
}
#endif /*CITUS_MEMUTILS_H */

View File

@ -88,6 +88,10 @@ check-failure: all
$(pg_regress_multi_check) --load-extension=citus --mitmproxy \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS)
check-failure-base: all
$(pg_regress_multi_check) --load-extension=citus --mitmproxy \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_base_schedule $(EXTRA_TESTS)
clean distclean maintainer-clean:
rm -f $(output_files) $(input_files)
rm -rf tmp_check/

View File

@ -0,0 +1,129 @@
--
-- failure_connection_establishment.sql tests some behaviour of connection management when
-- it fails to connect.
--
-- Failure cases covered:
-- - timeout
--
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
CREATE SCHEMA fail_connect;
SET search_path TO 'fail_connect';
SET citus.shard_count TO 4;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000;
CREATE TABLE products (
product_no integer,
name text,
price numeric
);
SELECT create_distributed_table('products', 'product_no');
create_distributed_table
--------------------------
(1 row)
-- Can only add primary key constraint on distribution column (or group of columns
-- including distribution column)
-- Command below should error out since 'name' is not a distribution column
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
ERROR: cannot create constraint on "products"
DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE).
-- we will insert a connection delay here as this query was the cause for an investigation
-- into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
-----------
(1 row)
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
WARNING: could not establish connection after 400 ms
ERROR: connection error: localhost:9060
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
CREATE TABLE r1 (
id int PRIMARY KEY,
name text
);
INSERT INTO r1 (id, name) VALUES
(1,'foo'),
(2,'bar'),
(3,'baz');
SELECT create_reference_table('r1');
NOTICE: Copying data from local table...
create_reference_table
------------------------
(1 row)
SELECT citus.clear_network_traffic();
clear_network_traffic
-----------------------
(1 row)
SELECT citus.mitmproxy('conn.delay(500)');
mitmproxy
-----------
(1 row)
-- we cannot control which replica of the reference table will be queried and there is
-- only one specific client we can control the connection for.
-- by using round-robin task_assignment_policy we can force to hit both machines. We will
-- use two output files to match both orders to verify there is 1 that times out and falls
-- through to read from the other machine
SET citus.task_assignment_policy TO 'round-robin';
-- suppress the warning since we can't control which shard is chose first. Failure of this
-- test would be if one of the queries does not return the result but an error.
SET client_min_messages TO ERROR;
SELECT name FROM r1 WHERE id = 2;
name
------
bar
(1 row)
SELECT name FROM r1 WHERE id = 2;
name
------
bar
(1 row)
-- verify a connection attempt was made to the intercepted node, this would have cause the
-- connection to have been delayed and thus caused a timeout
SELECT citus.dump_network_traffic();
dump_network_traffic
-------------------------------------
(0,coordinator,"[initial message]")
(1 row)
RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions();
WARNING: could not establish connection after 400 ms
WARNING: connection error: localhost:9060
get_global_active_transactions
--------------------------------
(0 rows)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
(1 row)
DROP SCHEMA fail_connect CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table products
drop cascades to table r1
SET search_path TO default;

View File

@ -0,0 +1,6 @@
# import this file (from psql you can use \i) to use mitmproxy manually
test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it
test: failure_setup
test: multi_test_helpers

View File

@ -30,3 +30,4 @@ test: failure_single_mod
test: failure_savepoints
test: failure_multi_row_insert
test: failure_mx_metadata_sync
test: failure_connection_establishment

View File

@ -125,6 +125,10 @@ class ActionsMixin:
self.next = CancelHandler(self.root, pid)
return self.next
def delay(self, timeMs):
self.next = DelayHandler(self.root, timeMs)
return self.next
class AcceptHandler(Handler):
def __init__(self, root):
super().__init__(root)
@ -181,6 +185,15 @@ class CancelHandler(Handler):
time.sleep(0.1)
return 'done'
class DelayHandler(Handler):
'Delay a packet by sleeping before deciding what to do'
def __init__(self, root, timeMs):
super().__init__(root)
self.timeMs = timeMs
def _handle(self, flow, message):
time.sleep(self.timeMs/1000.0)
return 'done'
class Contains(Handler, ActionsMixin, FilterableMixin):
def __init__(self, root, pattern):
super().__init__(root)

View File

@ -0,0 +1,78 @@
--
-- failure_connection_establishment.sql tests some behaviour of connection management when
-- it fails to connect.
--
-- Failure cases covered:
-- - timeout
--
SELECT citus.mitmproxy('conn.allow()');
CREATE SCHEMA fail_connect;
SET search_path TO 'fail_connect';
SET citus.shard_count TO 4;
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000;
ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000;
CREATE TABLE products (
product_no integer,
name text,
price numeric
);
SELECT create_distributed_table('products', 'product_no');
-- Can only add primary key constraint on distribution column (or group of columns
-- including distribution column)
-- Command below should error out since 'name' is not a distribution column
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name);
-- we will insert a connection delay here as this query was the cause for an investigation
-- into connection establishment problems
SET citus.node_connection_timeout TO 400;
SELECT citus.mitmproxy('conn.delay(500)');
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
SELECT citus.mitmproxy('conn.allow()');
CREATE TABLE r1 (
id int PRIMARY KEY,
name text
);
INSERT INTO r1 (id, name) VALUES
(1,'foo'),
(2,'bar'),
(3,'baz');
SELECT create_reference_table('r1');
SELECT citus.clear_network_traffic();
SELECT citus.mitmproxy('conn.delay(500)');
-- we cannot control which replica of the reference table will be queried and there is
-- only one specific client we can control the connection for.
-- by using round-robin task_assignment_policy we can force to hit both machines. We will
-- use two output files to match both orders to verify there is 1 that times out and falls
-- through to read from the other machine
SET citus.task_assignment_policy TO 'round-robin';
-- suppress the warning since we can't control which shard is chose first. Failure of this
-- test would be if one of the queries does not return the result but an error.
SET client_min_messages TO ERROR;
SELECT name FROM r1 WHERE id = 2;
SELECT name FROM r1 WHERE id = 2;
-- verify a connection attempt was made to the intercepted node, this would have cause the
-- connection to have been delayed and thus caused a timeout
SELECT citus.dump_network_traffic();
RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions();
SELECT citus.mitmproxy('conn.allow()');
DROP SCHEMA fail_connect CASCADE;
SET search_path TO default;