diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 10a0e93af..f85e9a996 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -9,11 +9,7 @@ */ #include "postgres.h" - -#ifdef HAVE_POLL_H -#include -#endif - +#include "pgstat.h" #include "libpq-fe.h" @@ -23,6 +19,7 @@ #include "commands/dbcommands.h" #include "distributed/connection_management.h" #include "distributed/errormessage.h" +#include "distributed/memutils.h" #include "distributed/metadata_cache.h" #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" @@ -47,6 +44,30 @@ static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isC static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static bool RemoteTransactionIdle(MultiConnection *connection); +static int EventSetSizeForConnectionList(List *connections); + +/* types for async connection management */ +enum MultiConnectionPhase +{ + MULTI_CONNECTION_PHASE_CONNECTING, + MULTI_CONNECTION_PHASE_CONNECTED, + MULTI_CONNECTION_PHASE_ERROR, +}; +typedef struct MultiConnectionState +{ + enum MultiConnectionPhase phase; + MultiConnection *connection; + PostgresPollingStatusType pollmode; +} MultiConnectionState; + + +/* helper functions for async connection management */ +static bool MultiConnectionStatePoll(MultiConnectionState *connectionState); +static WaitEventSet * WaitEventSetFromMultiConnectionStates(List *connections, + int *waitCount); +static long DeadlineTimestampTzToTimeout(TimestampTz deadline); +static void CloseNotReadyMultiConnectionStates(List *connectionStates); +static uint32 MultiConnectionStateEventMask(MultiConnectionState *connectionState); static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; @@ -459,183 +480,373 @@ ShutdownConnection(MultiConnection *connection) /* - * FinishConnectionListEstablishment is a wrapper around FinishConnectionEstablishment. - * The function iterates over the multiConnectionList and finishes the connection - * establishment for each multi connection. + * MultiConnectionStatePoll executes a PQconnectPoll on the connection to progres the + * connection establishment. The return value of this function indicates if the + * MultiConnectionState has been changed, which could require a change to the WaitEventSet + */ +static bool +MultiConnectionStatePoll(MultiConnectionState *connectionState) +{ + MultiConnection *connection = connectionState->connection; + ConnStatusType status = PQstatus(connection->pgConn); + PostgresPollingStatusType oldPollmode = connectionState->pollmode; + + Assert(connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING); + + if (status == CONNECTION_OK) + { + connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED; + return true; + } + else if (status == CONNECTION_BAD) + { + /* FIXME: retries? */ + connectionState->phase = MULTI_CONNECTION_PHASE_ERROR; + return true; + } + else + { + connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTING; + } + + connectionState->pollmode = PQconnectPoll(connection->pgConn); + + /* + * FIXME: Do we want to add transparent retry support here? + */ + if (connectionState->pollmode == PGRES_POLLING_FAILED) + { + connectionState->phase = MULTI_CONNECTION_PHASE_ERROR; + return true; + } + else if (connectionState->pollmode == PGRES_POLLING_OK) + { + connectionState->phase = MULTI_CONNECTION_PHASE_CONNECTED; + return true; + } + else + { + Assert(connectionState->pollmode == PGRES_POLLING_WRITING || + connectionState->pollmode == PGRES_POLLING_READING); + } + + return (oldPollmode != connectionState->pollmode) ? true : false; +} + + +/* + * EventSetSizeForConnectionList calculates the space needed for a WaitEventSet based on a + * list of connections. + */ +inline static int +EventSetSizeForConnectionList(List *connections) +{ + /* we need space for 2 postgres events in the waitset on top of the connections */ + return list_length(connections) + 2; +} + + +/* + * WaitEventSetFromMultiConnectionStates takes a list of MultiConnectionStates and adds + * all sockets of the connections that are still in the connecting phase to a WaitSet, + * taking into account the maximum number of connections that could be added in total to + * a WaitSet. + * + * waitCount populates the number of connections added to the WaitSet in case when a + * non-NULL pointer is provided. + */ +static WaitEventSet * +WaitEventSetFromMultiConnectionStates(List *connections, int *waitCount) +{ + WaitEventSet *waitEventSet = NULL; + ListCell *connectionCell = NULL; + + const int eventSetSize = EventSetSizeForConnectionList(connections); + int numEventsAdded = 0; + + if (waitCount) + { + *waitCount = 0; + } + + waitEventSet = CreateWaitEventSet(CurrentMemoryContext, eventSetSize); + EnsureReleaseResource((MemoryContextCallbackFunction) (&FreeWaitEventSet), + waitEventSet); + + /* + * Put the wait events for the signal latch and postmaster death at the end such that + * event index + pendingConnectionsStartIndex = the connection index in the array. + */ + AddWaitEventToSet(waitEventSet, WL_POSTMASTER_DEATH, PGINVALID_SOCKET, NULL, NULL); + AddWaitEventToSet(waitEventSet, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL); + numEventsAdded += 2; + + foreach(connectionCell, connections) + { + MultiConnectionState *connectionState = (MultiConnectionState *) lfirst( + connectionCell); + int socket = 0; + int eventMask = 0; + + if (numEventsAdded >= eventSetSize) + { + /* room for events to schedule is exhausted */ + break; + } + + if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING) + { + /* connections that are not connecting will not be added to the WaitSet */ + continue; + } + + socket = PQsocket(connectionState->connection->pgConn); + + eventMask = MultiConnectionStateEventMask(connectionState); + + AddWaitEventToSet(waitEventSet, eventMask, socket, NULL, connectionState); + numEventsAdded++; + + if (waitCount) + { + *waitCount = *waitCount + 1; + } + } + + return waitEventSet; +} + + +/* + * MultiConnectionStateEventMask returns the eventMask use by the WaitEventSet for the + * for the socket associated with the connection based on the pollmode PQconnectPoll + * returned in its last invocation + */ +static uint32 +MultiConnectionStateEventMask(MultiConnectionState *connectionState) +{ + uint32 eventMask = 0; + if (connectionState->pollmode == PGRES_POLLING_READING) + { + eventMask |= WL_SOCKET_READABLE; + } + else + { + eventMask |= WL_SOCKET_WRITEABLE; + } + return eventMask; +} + + +/* + * FinishConnectionListEstablishment takes a list of MultiConnection and finishes the + * connections establishment asynchronously for all connections not already fully + * connected. */ void FinishConnectionListEstablishment(List *multiConnectionList) { + const TimestampTz connectionStart = GetCurrentTimestamp(); + const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, + NodeConnectionTimeout); + List *connectionStates = NULL; ListCell *multiConnectionCell = NULL; + WaitEventSet *waitEventSet = NULL; + bool waitEventSetRebuild = true; + int waitCount = 0; + WaitEvent *events = NULL; + MemoryContext oldContext = NULL; + foreach(multiConnectionCell, multiConnectionList) { - MultiConnection *multiConnection = (MultiConnection *) lfirst( - multiConnectionCell); + MultiConnection *connection = (MultiConnection *) lfirst(multiConnectionCell); + MultiConnectionState *connectionState = palloc0(sizeof(MultiConnectionState)); - /* TODO: consider making connection establishment fully in parallel */ - FinishConnectionEstablishment(multiConnection); + connectionState->connection = connection; + + /* + * before we can build the waitset to wait for asynchronous IO we need to know the + * pollmode to use for the sockets. This is populated by executing one round of + * PQconnectPoll. This updates the MultiConnectionState struct with its phase and + * its next poll mode. + */ + MultiConnectionStatePoll(connectionState); + + connectionStates = lappend(connectionStates, connectionState); + if (connectionState->phase == MULTI_CONNECTION_PHASE_CONNECTING) + { + waitCount++; + } + } + + /* prepare space for socket events */ + events = (WaitEvent *) palloc0(EventSetSizeForConnectionList(connectionStates) * + sizeof(WaitEvent)); + + /* + * for high connection counts with lots of round trips we could potentially have a lot + * of (big) waitsets that we'd like to clean right after we have used them. To do this + * we switch to a temporary memory context for this loop which gets reset at the end + */ + oldContext = MemoryContextSwitchTo( + AllocSetContextCreate(CurrentMemoryContext, + "connection establishment temporary context", + ALLOCSET_DEFAULT_SIZES)); + while (waitCount > 0) + { + long timeout = DeadlineTimestampTzToTimeout(deadline); + int eventCount = 0; + int eventIndex = 0; + + if (waitEventSetRebuild) + { + MemoryContextReset(CurrentMemoryContext); + waitEventSet = WaitEventSetFromMultiConnectionStates(connectionStates, + &waitCount); + waitEventSetRebuild = false; + + if (waitCount <= 0) + { + break; + } + } + + eventCount = WaitEventSetWait(waitEventSet, timeout, events, waitCount, + WAIT_EVENT_CLIENT_READ); + + for (eventIndex = 0; eventIndex < eventCount; eventIndex++) + { + WaitEvent *event = &events[eventIndex]; + bool connectionStateChanged = false; + MultiConnectionState *connectionState = + (MultiConnectionState *) event->user_data; + + if (event->events & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (event->events & WL_LATCH_SET) + { + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + if (InterruptHoldoffCount > 0 && (QueryCancelPending || ProcDiePending)) + { + /* + * because we can't break from 2 loops easily we need to not forget to + * reset the memory context + */ + MemoryContextDelete(MemoryContextSwitchTo(oldContext)); + return; + } + + continue; + } + + connectionStateChanged = MultiConnectionStatePoll(connectionState); + if (connectionStateChanged) + { + if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING) + { + /* we cannot stop waiting for connection, so rebuild the event set */ + waitEventSetRebuild = true; + } + else + { + /* connection state changed, reset the event mask */ + uint32 eventMask = MultiConnectionStateEventMask(connectionState); + ModifyWaitEvent(waitEventSet, event->pos, eventMask, NULL); + } + } + } + + if (eventCount == 0) + { + /* + * timeout has occured on waitset, double check the timeout since + * connectionStart and if passed close all non-finished connections + */ + + TimestampTz now = GetCurrentTimestamp(); + if (TimestampDifferenceExceeds(connectionStart, now, NodeConnectionTimeout)) + { + /* + * showing as a warning, can't be an error. In some cases queries can + * proceed with only some of the connections being fully established. + * Queries that can't will error then and there + */ + ereport(WARNING, (errmsg("could not establish connection after %u ms", + NodeConnectionTimeout))); + + /* + * Close all connections that have not been fully established. + */ + CloseNotReadyMultiConnectionStates(connectionStates); + + break; + } + } + } + MemoryContextDelete(MemoryContextSwitchTo(oldContext)); +} + + +/* + * DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse + * before the deadline provided as an argument will be reached. The outcome can be used to + * pass to the Wait of an EventSet to make sure it returns after the timeout has passed. + */ +static long +DeadlineTimestampTzToTimeout(TimestampTz deadline) +{ + long secs = 0; + int msecs = 0; + TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs); + return secs * 1000 + msecs / 1000; +} + + +/* + * CloseNotReadyMultiConnectionStates calls CloseConnection for all MultiConnection's + * tracked in the MultiConnectionState list passed in, only if the connection is not yet + * fully established. + * + * This function removes the pointer to the MultiConnection data after the Connections are + * closed since they should not be used anymore. + */ +static void +CloseNotReadyMultiConnectionStates(List *connectionStates) +{ + ListCell *connectionStateCell = NULL; + foreach(connectionStateCell, connectionStates) + { + MultiConnectionState *connectionState = lfirst(connectionStateCell); + MultiConnection *connection = connectionState->connection; + + if (connectionState->phase != MULTI_CONNECTION_PHASE_CONNECTING) + { + continue; + } + + /* close connection, otherwise we take up resource on the other side */ + PQfinish(connection->pgConn); + connection->pgConn = NULL; } } /* + * Close connections on timeout in FinishConnectionListEstablishment * Synchronously finish connection establishment of an individual connection. - * - * TODO: Replace with variant waiting for multiple connections. + * This function is a convenience wrapped around FinishConnectionListEstablishment. */ void FinishConnectionEstablishment(MultiConnection *connection) { - static int checkIntervalMS = 200; - - /* - * Loop until connection is established, or failed (possibly just timed - * out). - */ - while (true) - { - ConnStatusType status = PQstatus(connection->pgConn); - PostgresPollingStatusType pollmode; - - if (status == CONNECTION_OK) - { - return; - } - - /* FIXME: retries? */ - if (status == CONNECTION_BAD) - { - return; - } - - pollmode = PQconnectPoll(connection->pgConn); - - /* - * FIXME: Do we want to add transparent retry support here? - */ - if (pollmode == PGRES_POLLING_FAILED) - { - return; - } - else if (pollmode == PGRES_POLLING_OK) - { - return; - } - else - { - Assert(pollmode == PGRES_POLLING_WRITING || - pollmode == PGRES_POLLING_READING); - } - - /* Loop, to handle poll() being interrupted by signals (EINTR) */ - while (true) - { - int pollResult = 0; - - /* we use poll(2) if available, otherwise select(2) */ -#ifdef HAVE_POLL - struct pollfd pollFileDescriptor; - - pollFileDescriptor.fd = PQsocket(connection->pgConn); - if (pollmode == PGRES_POLLING_READING) - { - pollFileDescriptor.events = POLLIN; - } - else - { - pollFileDescriptor.events = POLLOUT; - } - pollFileDescriptor.revents = 0; - - /* - * Only sleep for a limited amount of time, so we can react to - * interrupts in time, even if the platform doesn't interrupt - * poll() after signal arrival. - */ - pollResult = poll(&pollFileDescriptor, 1, checkIntervalMS); -#else /* !HAVE_POLL */ - fd_set readFileDescriptorSet; - fd_set writeFileDescriptorSet; - fd_set exceptionFileDescriptorSet; - int selectTimeoutUS = checkIntervalMS * 1000; - struct timeval selectTimeout = { 0, selectTimeoutUS }; - int selectFileDescriptor = PQsocket(connection->pgConn); - - FD_ZERO(&readFileDescriptorSet); - FD_ZERO(&writeFileDescriptorSet); - FD_ZERO(&exceptionFileDescriptorSet); - - if (pollmode == PGRES_POLLING_READING) - { - FD_SET(selectFileDescriptor, &readFileDescriptorSet); - } - else if (pollmode == PGRES_POLLING_WRITING) - { - FD_SET(selectFileDescriptor, &writeFileDescriptorSet); - } - - pollResult = (select) (selectFileDescriptor + 1, &readFileDescriptorSet, - &writeFileDescriptorSet, &exceptionFileDescriptorSet, - &selectTimeout); -#endif /* HAVE_POLL */ - - if (pollResult == 0) - { - /* - * Timeout exceeded. Two things to do: - * - check whether any interrupts arrived and handle them - * - check whether establishment for connection already has - * lasted for too long, stop waiting if so. - */ - CHECK_FOR_INTERRUPTS(); - - if (TimestampDifferenceExceeds(connection->connectionStart, - GetCurrentTimestamp(), - NodeConnectionTimeout)) - { - ereport(WARNING, (errmsg("could not establish connection after %u ms", - NodeConnectionTimeout))); - - /* close connection, otherwise we take up resource on the other side */ - PQfinish(connection->pgConn); - connection->pgConn = NULL; - break; - } - } - else if (pollResult > 0) - { - /* - * IO possible, continue connection establishment. We could - * check for timeouts here as well, but if there's progress - * there seems little point. - */ - break; - } - else - { - int errorCode = errno; -#ifdef WIN32 - errorCode = WSAGetLastError(); -#endif - if (errorCode == EINTR) - { - /* Retrying, signal interrupted. So check. */ - CHECK_FOR_INTERRUPTS(); - } - else - { - /* - * We ERROR here, instead of just returning a failed - * connection, because this shouldn't happen, and indicates a - * programming error somewhere, not a network etc. issue. - */ - ereport(ERROR, (errcode_for_socket_access(), - errmsg("poll()/select() failed: %m"))); - } - } - } - } + FinishConnectionListEstablishment(list_make1(connection)); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index e790bd80b..cbc5d2d14 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -294,6 +294,11 @@ get_global_active_transactions(PG_FUNCTION_ARGS) int64 rowCount = 0; int64 colCount = 0; + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + continue; + } + result = GetRemoteCommandResult(connection, raiseInterrupts); if (!IsResponseOK(result)) { diff --git a/src/backend/distributed/utils/enable_ssl.c b/src/backend/distributed/utils/enable_ssl.c index 5f32e8a47..f07b9ee99 100644 --- a/src/backend/distributed/utils/enable_ssl.c +++ b/src/backend/distributed/utils/enable_ssl.c @@ -11,6 +11,7 @@ #include "postgres.h" #include "distributed/connection_management.h" +#include "distributed/memutils.h" #include "distributed/worker_protocol.h" #include "libpq/libpq.h" #include "miscadmin.h" @@ -48,8 +49,6 @@ static void GloballyReloadConfig(void); #ifdef USE_SSL /* forward declaration of functions used when compiled with ssl */ -static void EnsureReleaseOpenSSLResource(MemoryContextCallbackFunction callback, - void *arg); static bool ShouldUseAutoSSL(void); static bool CreateCertificatesWhenNeeded(void); static EVP_PKEY * GeneratePrivateKey(void); @@ -183,20 +182,6 @@ GloballyReloadConfig() #ifdef USE_SSL -/* - * EnsureReleaseOpenSSLResource registers the openssl allocated resource to be freed when the - * current memory context is reset. - */ -static void -EnsureReleaseOpenSSLResource(MemoryContextCallbackFunction callback, void *arg) -{ - MemoryContextCallback *cb = MemoryContextAllocZero(CurrentMemoryContext, - sizeof(MemoryContextCallback)); - cb->func = callback; - cb->arg = arg; - MemoryContextRegisterResetCallback(CurrentMemoryContext, cb); -} - /* * ShouldUseAutoSSL checks if citus should enable ssl based on the connection settings it @@ -258,8 +243,8 @@ CreateCertificatesWhenNeeded() "correctly."))); return false; } - EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&SSL_CTX_free), - sslContext); + EnsureReleaseResource((MemoryContextCallbackFunction) (&SSL_CTX_free), + sslContext); /* * check if we can load the certificate, when we can we assume the certificates are in @@ -315,11 +300,11 @@ GeneratePrivateKey() { ereport(ERROR, (errmsg("unable to allocate space for private key"))); } - EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&EVP_PKEY_free), - privateKey); + EnsureReleaseResource((MemoryContextCallbackFunction) (&EVP_PKEY_free), + privateKey); exponent = BN_new(); - EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&BN_free), exponent); + EnsureReleaseResource((MemoryContextCallbackFunction) (&BN_free), exponent); /* load the exponent to use for the generation of the key */ success = BN_set_word(exponent, RSA_F4); @@ -361,8 +346,8 @@ CreateCertificate(EVP_PKEY *privateKey) { ereport(ERROR, (errmsg("unable to allocate space for the x509 certificate"))); } - EnsureReleaseOpenSSLResource((MemoryContextCallbackFunction) (&X509_free), - certificate); + EnsureReleaseResource((MemoryContextCallbackFunction) (&X509_free), + certificate); /* Set the serial number. */ ASN1_INTEGER_set(X509_get_serialNumber(certificate), 1); diff --git a/src/include/distributed/memutils.h b/src/include/distributed/memutils.h new file mode 100644 index 000000000..724eb2910 --- /dev/null +++ b/src/include/distributed/memutils.h @@ -0,0 +1,29 @@ +/* + * memutils.h + * utility functions to help with postgres' memory management primitives + */ + +#ifndef CITUS_MEMUTILS_H +#define CITUS_MEMUTILS_H + +#include "utils/palloc.h" + + +/* + * EnsureReleaseResource is an abstraction on MemoryContextRegisterResetCallback that + * allocates the space for the MemoryContextCallback and registers it to the current + * MemoryContext, ensuring the call of callback with arg as its argument during either the + * Reset of Delete of a MemoryContext. + */ +static inline void +EnsureReleaseResource(MemoryContextCallbackFunction callback, void *arg) +{ + MemoryContextCallback *cb = MemoryContextAllocZero(CurrentMemoryContext, + sizeof(MemoryContextCallback)); + cb->func = callback; + cb->arg = arg; + MemoryContextRegisterResetCallback(CurrentMemoryContext, cb); +} + + +#endif /*CITUS_MEMUTILS_H */ diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 1e5ffa8bf..f64774ea8 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -88,6 +88,10 @@ check-failure: all $(pg_regress_multi_check) --load-extension=citus --mitmproxy \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS) +check-failure-base: all + $(pg_regress_multi_check) --load-extension=citus --mitmproxy \ + -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_base_schedule $(EXTRA_TESTS) + clean distclean maintainer-clean: rm -f $(output_files) $(input_files) rm -rf tmp_check/ diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out new file mode 100644 index 000000000..869d9eeb3 --- /dev/null +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -0,0 +1,129 @@ +-- +-- failure_connection_establishment.sql tests some behaviour of connection management when +-- it fails to connect. +-- +-- Failure cases covered: +-- - timeout +-- +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE SCHEMA fail_connect; +SET search_path TO 'fail_connect'; +SET citus.shard_count TO 4; +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; +CREATE TABLE products ( + product_no integer, + name text, + price numeric +); +SELECT create_distributed_table('products', 'product_no'); + create_distributed_table +-------------------------- + +(1 row) + +-- Can only add primary key constraint on distribution column (or group of columns +-- including distribution column) +-- Command below should error out since 'name' is not a distribution column +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name); +ERROR: cannot create constraint on "products" +DETAIL: Distributed relations cannot have UNIQUE, EXCLUDE, or PRIMARY KEY constraints that do not include the partition column (with an equality operator if EXCLUDE). +-- we will insert a connection delay here as this query was the cause for an investigation +-- into connection establishment problems +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.delay(500)'); + mitmproxy +----------- + +(1 row) + +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); +WARNING: could not establish connection after 400 ms +ERROR: connection error: localhost:9060 +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +CREATE TABLE r1 ( + id int PRIMARY KEY, + name text +); +INSERT INTO r1 (id, name) VALUES +(1,'foo'), +(2,'bar'), +(3,'baz'); +SELECT create_reference_table('r1'); +NOTICE: Copying data from local table... + create_reference_table +------------------------ + +(1 row) + +SELECT citus.clear_network_traffic(); + clear_network_traffic +----------------------- + +(1 row) + +SELECT citus.mitmproxy('conn.delay(500)'); + mitmproxy +----------- + +(1 row) + +-- we cannot control which replica of the reference table will be queried and there is +-- only one specific client we can control the connection for. +-- by using round-robin task_assignment_policy we can force to hit both machines. We will +-- use two output files to match both orders to verify there is 1 that times out and falls +-- through to read from the other machine +SET citus.task_assignment_policy TO 'round-robin'; +-- suppress the warning since we can't control which shard is chose first. Failure of this +-- test would be if one of the queries does not return the result but an error. +SET client_min_messages TO ERROR; +SELECT name FROM r1 WHERE id = 2; + name +------ + bar +(1 row) + +SELECT name FROM r1 WHERE id = 2; + name +------ + bar +(1 row) + +-- verify a connection attempt was made to the intercepted node, this would have cause the +-- connection to have been delayed and thus caused a timeout +SELECT citus.dump_network_traffic(); + dump_network_traffic +------------------------------------- + (0,coordinator,"[initial message]") +(1 row) + +RESET client_min_messages; +-- verify get_global_active_transactions works when a timeout happens on a connection +SELECT get_global_active_transactions(); +WARNING: could not establish connection after 400 ms +WARNING: connection error: localhost:9060 + get_global_active_transactions +-------------------------------- +(0 rows) + +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +----------- + +(1 row) + +DROP SCHEMA fail_connect CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table products +drop cascades to table r1 +SET search_path TO default; diff --git a/src/test/regress/failure_base_schedule b/src/test/regress/failure_base_schedule new file mode 100644 index 000000000..6ecbe588b --- /dev/null +++ b/src/test/regress/failure_base_schedule @@ -0,0 +1,6 @@ +# import this file (from psql you can use \i) to use mitmproxy manually +test: failure_test_helpers + +# this should only be run by pg_regress_multi, you don't need it +test: failure_setup +test: multi_test_helpers diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 7afdd3f26..51af5fabb 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -30,3 +30,4 @@ test: failure_single_mod test: failure_savepoints test: failure_multi_row_insert test: failure_mx_metadata_sync +test: failure_connection_establishment diff --git a/src/test/regress/mitmscripts/fluent.py b/src/test/regress/mitmscripts/fluent.py index c441e3e7d..8b4c9681e 100644 --- a/src/test/regress/mitmscripts/fluent.py +++ b/src/test/regress/mitmscripts/fluent.py @@ -125,6 +125,10 @@ class ActionsMixin: self.next = CancelHandler(self.root, pid) return self.next + def delay(self, timeMs): + self.next = DelayHandler(self.root, timeMs) + return self.next + class AcceptHandler(Handler): def __init__(self, root): super().__init__(root) @@ -181,6 +185,15 @@ class CancelHandler(Handler): time.sleep(0.1) return 'done' +class DelayHandler(Handler): + 'Delay a packet by sleeping before deciding what to do' + def __init__(self, root, timeMs): + super().__init__(root) + self.timeMs = timeMs + def _handle(self, flow, message): + time.sleep(self.timeMs/1000.0) + return 'done' + class Contains(Handler, ActionsMixin, FilterableMixin): def __init__(self, root, pattern): super().__init__(root) diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql new file mode 100644 index 000000000..2c6fa7ecd --- /dev/null +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -0,0 +1,78 @@ +-- +-- failure_connection_establishment.sql tests some behaviour of connection management when +-- it fails to connect. +-- +-- Failure cases covered: +-- - timeout +-- + +SELECT citus.mitmproxy('conn.allow()'); + +CREATE SCHEMA fail_connect; +SET search_path TO 'fail_connect'; + +SET citus.shard_count TO 4; +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1450000; +ALTER SEQUENCE pg_catalog.pg_dist_placement_placementid_seq RESTART 1450000; + +CREATE TABLE products ( + product_no integer, + name text, + price numeric +); +SELECT create_distributed_table('products', 'product_no'); + +-- Can only add primary key constraint on distribution column (or group of columns +-- including distribution column) +-- Command below should error out since 'name' is not a distribution column +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(name); + + +-- we will insert a connection delay here as this query was the cause for an investigation +-- into connection establishment problems +SET citus.node_connection_timeout TO 400; +SELECT citus.mitmproxy('conn.delay(500)'); + +ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); + +SELECT citus.mitmproxy('conn.allow()'); + +CREATE TABLE r1 ( + id int PRIMARY KEY, + name text +); +INSERT INTO r1 (id, name) VALUES +(1,'foo'), +(2,'bar'), +(3,'baz'); + +SELECT create_reference_table('r1'); + +SELECT citus.clear_network_traffic(); +SELECT citus.mitmproxy('conn.delay(500)'); + +-- we cannot control which replica of the reference table will be queried and there is +-- only one specific client we can control the connection for. +-- by using round-robin task_assignment_policy we can force to hit both machines. We will +-- use two output files to match both orders to verify there is 1 that times out and falls +-- through to read from the other machine +SET citus.task_assignment_policy TO 'round-robin'; +-- suppress the warning since we can't control which shard is chose first. Failure of this +-- test would be if one of the queries does not return the result but an error. +SET client_min_messages TO ERROR; +SELECT name FROM r1 WHERE id = 2; +SELECT name FROM r1 WHERE id = 2; + +-- verify a connection attempt was made to the intercepted node, this would have cause the +-- connection to have been delayed and thus caused a timeout +SELECT citus.dump_network_traffic(); + +RESET client_min_messages; + +-- verify get_global_active_transactions works when a timeout happens on a connection +SELECT get_global_active_transactions(); + + +SELECT citus.mitmproxy('conn.allow()'); +DROP SCHEMA fail_connect CASCADE; +SET search_path TO default;