mirror of https://github.com/citusdata/citus.git
Feature: optionally force master_update_node during failover (#2773)
When `master_update_node` is called to update a node's location it waits for appropriate locks to become available. This is useful during normal operation as new operations will be blocked till after the metadata update while running operations have time to finish. When `master_update_node` is called after a node failure it is less useful to wait for running operations to finish as they can't. The lock being held indicates an operation that once attempted to commit will fail as the machine already failed. Now the downside is the failover is postponed till the termination point of the operation. This has been observed by users to take a significant amount of time causing the rest of the system to be observed unavailable. With this patch it is possible in such situations to invoke `master_update_node` with 2 optional arguments: - `force` (bool defaults to `false`): When called with true the update of the metadata will be forced to proceed by terminating conflicting backends. A cancel is not enough as the backend might be in idle time (eg. an interactive session, or going back and forth between an appliaction), therefore a more intrusive solution of termination is used here. - `lock_cooldown` (int defaults to `10000`): This is the time in milliseconds before conflicting backends are terminated. This is to allow the backends to finish cleanly before terminating them. This allows the user to set an upperbound to the expected time to complete the metadata update, eg. performing the failover. The functionality is implemented by spawning a background worker that has the task of helping a certain backend in acquiring its locks. The backend is either terminated on successful execution of the metadata update, or once the memory context of the expression gets reset, eg. on a cancel of the statement.pull/2786/head
parent
70055098af
commit
5df1b49bed
|
@ -0,0 +1,27 @@
|
|||
/* citus--8.2-2--8.2-3 */
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
DROP FUNCTION master_update_node(node_id int,
|
||||
new_node_name text,
|
||||
new_node_port int);
|
||||
|
||||
CREATE OR REPLACE FUNCTION master_update_node(node_id int,
|
||||
new_node_name text,
|
||||
new_node_port int,
|
||||
force bool DEFAULT false,
|
||||
lock_cooldown int DEFAULT 10000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$master_update_node$$;
|
||||
|
||||
COMMENT ON FUNCTION master_update_node(node_id int,
|
||||
new_node_name text,
|
||||
new_node_port int,
|
||||
force bool,
|
||||
lock_cooldown int)
|
||||
IS 'change the location of a node. when force => true it will wait lock_cooldown ms before killing competing locks';
|
||||
|
||||
REVOKE ALL ON FUNCTION master_update_node(int,text,int,bool,int) FROM PUBLIC;
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '8.2-2'
|
||||
default_version = '8.2-3'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -0,0 +1,330 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* acquire_lock.c
|
||||
* A dynamic background worker that can help your backend to acquire its locks. This is
|
||||
* an intrusive way of getting your way. The primary use of this will be to allow
|
||||
* master_update_node to make progress during failure. When the system cannot possibly
|
||||
* finish a transaction due to the host required to finish the transaction has failed
|
||||
* it might be better to actively cancel the backend instead of waiting for it to fail.
|
||||
*
|
||||
* This file provides infrastructure for launching exactly one a background
|
||||
* worker for every database in which citus is used. That background worker
|
||||
* can then perform work like deadlock detection, prepared transaction
|
||||
* recovery, and cleanup.
|
||||
*
|
||||
* Copyright (c) 2019, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
#include <unistd.h>
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "executor/spi.h"
|
||||
#include "miscadmin.h"
|
||||
#include "pgstat.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/snapmgr.h"
|
||||
|
||||
#include "distributed/citus_acquire_lock.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
||||
/* forward declaration of background worker entrypoint */
|
||||
extern void LockAcquireHelperMain(Datum main_arg);
|
||||
|
||||
/* forward declaration of helper functions */
|
||||
static void lock_acquire_helper_sigterm(SIGNAL_ARGS);
|
||||
static void EnsureStopLockAcquireHelper(void *arg);
|
||||
static long DeadlineTimestampTzToTimeout(TimestampTz deadline);
|
||||
|
||||
/* LockAcquireHelperArgs contains extra arguments to be used to start the worker */
|
||||
typedef struct LockAcquireHelperArgs
|
||||
{
|
||||
Oid DatabaseId;
|
||||
int32 lock_cooldown;
|
||||
} LockAcquireHelperArgs;
|
||||
|
||||
static bool got_sigterm = false;
|
||||
|
||||
|
||||
/*
|
||||
* StartLockAcquireHelperBackgroundWorker creates a background worker that will help the
|
||||
* backend passed in as an argument to complete. The worker that is started will be
|
||||
* terminated once the current memory context gets reset, to make sure it is cleaned up in
|
||||
* all situations. It is however advised to call TerminateBackgroundWorker on the handle
|
||||
* returned on the first possible moment the help is no longer required.
|
||||
*/
|
||||
BackgroundWorkerHandle *
|
||||
StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown)
|
||||
{
|
||||
BackgroundWorkerHandle *handle = NULL;
|
||||
LockAcquireHelperArgs args;
|
||||
BackgroundWorker worker;
|
||||
MemoryContextCallback *workerCleanup = NULL;
|
||||
memset(&args, 0, sizeof(args));
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
|
||||
/* collect the extra arguments required for the background worker */
|
||||
args.DatabaseId = MyDatabaseId;
|
||||
args.lock_cooldown = lock_cooldown;
|
||||
|
||||
/* construct the background worker and start it */
|
||||
snprintf(worker.bgw_name, BGW_MAXLEN,
|
||||
"Citus Lock Acquire Helper: %d/%u",
|
||||
backendToHelp, MyDatabaseId);
|
||||
#if PG_VERSION_NUM >= 110000
|
||||
snprintf(worker.bgw_type, BGW_MAXLEN, "citus_lock_aqcuire");
|
||||
#endif
|
||||
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
|
||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
|
||||
snprintf(worker.bgw_library_name, BGW_MAXLEN, "citus");
|
||||
snprintf(worker.bgw_function_name, BGW_MAXLEN, "LockAcquireHelperMain");
|
||||
worker.bgw_main_arg = Int32GetDatum(backendToHelp);
|
||||
worker.bgw_notify_pid = 0;
|
||||
|
||||
/*
|
||||
* we check if args fits in bgw_extra to make sure it is safe to copy the data. Once
|
||||
* we exceed the size of data to copy this way we need to look into a different way of
|
||||
* passing the arguments to the worker.
|
||||
*/
|
||||
Assert(sizeof(worker.bgw_extra) >= sizeof(args));
|
||||
memcpy(worker.bgw_extra, &args, sizeof(args));
|
||||
|
||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not start lock acquiring background worker to "
|
||||
"force the update"),
|
||||
errhint("Increasing max_worker_processes might help.")));
|
||||
}
|
||||
|
||||
workerCleanup = palloc0(sizeof(MemoryContextCallback));
|
||||
workerCleanup->func = EnsureStopLockAcquireHelper;
|
||||
workerCleanup->arg = handle;
|
||||
|
||||
MemoryContextRegisterResetCallback(CurrentMemoryContext, workerCleanup);
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureStopLockAcquireHelper is designed to be called as a MemoryContextCallback. It
|
||||
* takes a handle to the background worker and Terminates it. It is safe to be called on a
|
||||
* handle that has already been terminated due to the guard around the generation number
|
||||
* implemented in the handle by postgres.
|
||||
*/
|
||||
static void
|
||||
EnsureStopLockAcquireHelper(void *arg)
|
||||
{
|
||||
BackgroundWorkerHandle *handle = (BackgroundWorkerHandle *) arg;
|
||||
TerminateBackgroundWorker(handle);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Signal handler for SIGTERM
|
||||
* Set a flag to let the main loop to terminate, and set our latch to wake
|
||||
* it up.
|
||||
*/
|
||||
static void
|
||||
lock_acquire_helper_sigterm(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_sigterm = true;
|
||||
SetLatch(MyLatch);
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldAcquireLock tests if our backend should still proceed with acquiring the lock,
|
||||
* and thus keep terminating conflicting backends. This function returns true until a
|
||||
* SIGTERM, background worker termination signal, has been received.
|
||||
*
|
||||
* The function blocks for at most sleepms when called. During operation without being
|
||||
* terminated this is the time between invocations to the backend termination logic.
|
||||
*/
|
||||
static bool
|
||||
ShouldAcquireLock(long sleepms)
|
||||
{
|
||||
int rc;
|
||||
|
||||
/* early escape in case we already got the signal to stop acquiring the lock */
|
||||
if (got_sigterm)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH,
|
||||
sleepms * 1L, PG_WAIT_EXTENSION);
|
||||
ResetLatch(MyLatch);
|
||||
|
||||
/* emergency bailout if postmaster has died */
|
||||
if (rc & WL_POSTMASTER_DEATH)
|
||||
{
|
||||
proc_exit(1);
|
||||
}
|
||||
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
return !got_sigterm;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LockAcquireHelperMain runs in a dynamic background worker to help master_update_node to
|
||||
* acquire its locks.
|
||||
*/
|
||||
void
|
||||
LockAcquireHelperMain(Datum main_arg)
|
||||
{
|
||||
int backendPid = DatumGetInt32(main_arg);
|
||||
StringInfoData sql;
|
||||
LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra;
|
||||
long timeout = 0;
|
||||
const TimestampTz connectionStart = GetCurrentTimestamp();
|
||||
const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart,
|
||||
args->lock_cooldown);
|
||||
|
||||
/* parameters for sql query to be executed */
|
||||
const int paramCount = 1;
|
||||
Oid paramTypes[1] = { INT4OID };
|
||||
Datum paramValues[1];
|
||||
|
||||
pqsignal(SIGTERM, lock_acquire_helper_sigterm);
|
||||
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
elog(LOG, "lock acquiring backend started for backend %d (cooldown %dms)", backendPid,
|
||||
args->lock_cooldown);
|
||||
|
||||
/*
|
||||
* this loop waits till the deadline is reached (eg. lock_cooldown has passed) OR we
|
||||
* no longer need to acquire the lock due to the termination of this backend.
|
||||
* Only after the timeout the code will continue with the section that will acquire
|
||||
* the lock.
|
||||
*/
|
||||
do {
|
||||
timeout = DeadlineTimestampTzToTimeout(deadline);
|
||||
} while (timeout > 0 && ShouldAcquireLock(timeout));
|
||||
|
||||
/* connecting to the database */
|
||||
BackgroundWorkerInitializeConnectionByOid(args->DatabaseId, InvalidOid, 0);
|
||||
|
||||
/*
|
||||
* The query below does a self join on pg_locks to find backends that are granted a
|
||||
* lock our target backend (backendPid) is waiting for. Once it found such a backend
|
||||
* it terminates the backend with pg_terminate_pid.
|
||||
*
|
||||
* The result is are rows of pid,bool indicating backend that is terminated and the
|
||||
* success of the termination. These will be logged accordingly below for an
|
||||
* administrator to correlate in the logs with the termination message.
|
||||
*/
|
||||
initStringInfo(&sql);
|
||||
appendStringInfo(&sql,
|
||||
"SELECT \n"
|
||||
" DISTINCT conflicting.pid,\n"
|
||||
" pg_terminate_backend(conflicting.pid)\n"
|
||||
" FROM pg_locks AS blocked\n"
|
||||
" JOIN pg_locks AS conflicting\n"
|
||||
" ON (conflicting.database = blocked.database\n"
|
||||
" AND conflicting.objid = blocked.objid)\n"
|
||||
" WHERE conflicting.granted = true\n"
|
||||
" AND blocked.granted = false\n"
|
||||
" AND blocked.pid = $1;");
|
||||
paramValues[0] = Int32GetDatum(backendPid);
|
||||
|
||||
while (ShouldAcquireLock(100))
|
||||
{
|
||||
int row = 0;
|
||||
int spiStatus = 0;
|
||||
|
||||
elog(LOG, "canceling competing backends for backend %d", backendPid);
|
||||
|
||||
/*
|
||||
* Begin our transaction
|
||||
*/
|
||||
SetCurrentStatementStartTimestamp();
|
||||
StartTransactionCommand();
|
||||
SPI_connect();
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
pgstat_report_activity(STATE_RUNNING, sql.data);
|
||||
|
||||
spiStatus = SPI_execute_with_args(sql.data, paramCount, paramTypes, paramValues,
|
||||
NULL, false, 0);
|
||||
|
||||
if (spiStatus == SPI_OK_SELECT)
|
||||
{
|
||||
for (row = 0; row < SPI_processed; row++)
|
||||
{
|
||||
int terminatedPid = 0;
|
||||
bool isTerminated = false;
|
||||
bool isnull = false;
|
||||
|
||||
terminatedPid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0],
|
||||
SPI_tuptable->tupdesc,
|
||||
1, &isnull));
|
||||
|
||||
isTerminated = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
|
||||
SPI_tuptable->tupdesc,
|
||||
2, &isnull));
|
||||
|
||||
if (isTerminated)
|
||||
{
|
||||
elog(WARNING, "terminated conflicting backend %d", terminatedPid);
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(INFO,
|
||||
"attempt to terminate conflicting backend %d was unsuccessful",
|
||||
terminatedPid);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
elog(FATAL, "cannot cancel competing backends for backend %d", backendPid);
|
||||
}
|
||||
|
||||
/*
|
||||
* And finish our transaction.
|
||||
*/
|
||||
SPI_finish();
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
pgstat_report_stat(false);
|
||||
pgstat_report_activity(STATE_IDLE, NULL);
|
||||
}
|
||||
|
||||
|
||||
elog(LOG, "lock acquiring backend finished for backend %d", backendPid);
|
||||
|
||||
/* safely got to the end, exit without problem */
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse
|
||||
* before the deadline provided as an argument will be reached. The outcome can be used to
|
||||
* pass to the Wait of an EventSet to make sure it returns after the timeout has passed.
|
||||
*/
|
||||
static long
|
||||
DeadlineTimestampTzToTimeout(TimestampTz deadline)
|
||||
{
|
||||
long secs = 0;
|
||||
int msecs = 0;
|
||||
TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs);
|
||||
return secs * 1000 + msecs / 1000;
|
||||
}
|
|
@ -20,6 +20,7 @@
|
|||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "commands/sequence.h"
|
||||
#include "distributed/citus_acquire_lock.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
@ -470,10 +471,23 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
text *newNodeName = PG_GETARG_TEXT_P(1);
|
||||
int32 newNodePort = PG_GETARG_INT32(2);
|
||||
|
||||
/*
|
||||
* force is used when an update needs to happen regardless of conflicting locks. This
|
||||
* feature is important to force the update during a failover due to failure, eg. by
|
||||
* a highavailability system such as pg_auto_failover. The strategy is a to start a
|
||||
* background worker that actively cancels backends holding conflicting locks with
|
||||
* this backend.
|
||||
*
|
||||
* Defaults to false
|
||||
*/
|
||||
bool force = PG_GETARG_BOOL(3);
|
||||
int32 lock_cooldown = PG_GETARG_INT32(4);
|
||||
|
||||
char *newNodeNameString = text_to_cstring(newNodeName);
|
||||
WorkerNode *workerNode = NULL;
|
||||
WorkerNode *workerNodeWithSameAddress = NULL;
|
||||
List *placementList = NIL;
|
||||
BackgroundWorkerHandle *handle = NULL;
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
|
@ -518,18 +532,42 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
* - This function blocks until all previous queries have finished. This
|
||||
* means that long-running queries will prevent failover.
|
||||
*
|
||||
* In case of node failure said long-running queries will fail in the end
|
||||
* anyway as they will be unable to commit successfully on the failed
|
||||
* machine. To cause quick failure of these queries use force => true
|
||||
* during the invocation of master_update_node to terminate conflicting
|
||||
* backends proactively.
|
||||
*
|
||||
* It might be worth blocking reads to a secondary for the same reasons,
|
||||
* though we currently only query secondaries on follower clusters
|
||||
* where these locks will have no effect.
|
||||
*/
|
||||
if (WorkerNodeIsPrimary(workerNode))
|
||||
{
|
||||
/*
|
||||
* before acquiring the locks check if we want a background worker to help us to
|
||||
* aggressively obtain the locks.
|
||||
*/
|
||||
if (force)
|
||||
{
|
||||
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
|
||||
}
|
||||
|
||||
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
|
||||
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
|
||||
}
|
||||
|
||||
UpdateNodeLocation(nodeId, newNodeNameString, newNodePort);
|
||||
|
||||
if (handle != NULL)
|
||||
{
|
||||
/*
|
||||
* this will be called on memory context cleanup as well, if the worker has been
|
||||
* terminated already this will be a noop
|
||||
*/
|
||||
TerminateBackgroundWorker(handle);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* citus_acquire_lock.h
|
||||
* Background worker to help with acquiering locks by canceling competing backends.
|
||||
*
|
||||
* Copyright (c) 2019, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef CITUS_ACQUIRE_LOCK_H
|
||||
#define CITUS_ACQUIRE_LOCK_H
|
||||
|
||||
|
||||
#include "postmaster/bgworker.h"
|
||||
|
||||
BackgroundWorkerHandle * StartLockAcquireHelperBackgroundWorker(int backendToHelp,
|
||||
int32 lock_cooldown);
|
||||
|
||||
#endif /* CITUS_ACQUIRE_LOCK_H */
|
|
@ -73,6 +73,10 @@ check-isolation: all tempinstall-main
|
|||
$(pg_regress_multi_check) --load-extension=citus --isolationtester \
|
||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/isolation_schedule $(EXTRA_TESTS)
|
||||
|
||||
check-isolation-base: all tempinstall-main
|
||||
$(pg_regress_multi_check) --load-extension=citus --isolationtester \
|
||||
-- $(MULTI_REGRESS_OPTS) $(EXTRA_TESTS)
|
||||
|
||||
check-vanilla: all tempinstall-main
|
||||
$(pg_regress_multi_check) --load-extension=citus --vanillatest
|
||||
|
||||
|
|
|
@ -45,16 +45,16 @@ step s3-view-worker:
|
|||
|
||||
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
|
||||
|
||||
SELECT worker_apply_shard_ddl_command (105941, 'public', '
|
||||
SELECT worker_apply_shard_ddl_command (105949, 'public', '
|
||||
ALTER TABLE test_table ADD COLUMN x INT;
|
||||
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
SELECT worker_apply_shard_ddl_command (105940, 'public', '
|
||||
SELECT worker_apply_shard_ddl_command (105948, 'public', '
|
||||
ALTER TABLE test_table ADD COLUMN x INT;
|
||||
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
SELECT worker_apply_shard_ddl_command (105939, 'public', '
|
||||
SELECT worker_apply_shard_ddl_command (105947, 'public', '
|
||||
ALTER TABLE test_table ADD COLUMN x INT;
|
||||
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
SELECT worker_apply_shard_ddl_command (105938, 'public', '
|
||||
SELECT worker_apply_shard_ddl_command (105946, 'public', '
|
||||
ALTER TABLE test_table ADD COLUMN x INT;
|
||||
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression
|
||||
|
@ -116,7 +116,7 @@ query query_hostname query_hostport master_query_host_namemaster_query_
|
|||
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression
|
||||
INSERT INTO public.test_table_105944 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
INSERT INTO public.test_table_105952 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
@ -177,10 +177,10 @@ query query_hostname query_hostport master_query_host_namemaster_query_
|
|||
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105949 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105948 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105947 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105946 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105957 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105956 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105955 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
COPY (SELECT count(*) AS count FROM test_table_105954 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
@ -241,7 +241,7 @@ query query_hostname query_hostport master_query_host_namemaster_query_
|
|||
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression
|
||||
SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression
|
||||
SELECT count(*) AS count FROM public.test_table_105951 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression
|
||||
SELECT count(*) AS count FROM public.test_table_105959 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression
|
||||
step s2-rollback:
|
||||
ROLLBACK;
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ step s1-get-current-transaction-id:
|
|||
|
||||
row
|
||||
|
||||
(0,299)
|
||||
(0,305)
|
||||
step s2-get-first-worker-active-transactions:
|
||||
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
|
||||
FROM
|
||||
|
|
|
@ -29,11 +29,11 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
302 301 f
|
||||
308 307 f
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
301
|
||||
302 301
|
||||
307
|
||||
308 307
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
306 305 f
|
||||
307 305 f
|
||||
307 306 t
|
||||
312 311 f
|
||||
313 311 f
|
||||
313 312 t
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
305
|
||||
306 305
|
||||
307 305,306
|
||||
311
|
||||
312 311
|
||||
313 311,312
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1:
|
||||
-- update a specific node by address
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s1-abort: ABORT;
|
||||
step s2-update-node-1: <... completed>
|
||||
master_update_node
|
||||
|
||||
|
||||
step s2-abort: ABORT;
|
||||
master_remove_node
|
||||
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1-force:
|
||||
-- update a specific node by address (force)
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s2-update-node-1-force: <... completed>
|
||||
master_update_node
|
||||
|
||||
|
||||
step s2-abort: ABORT;
|
||||
step s1-abort: ABORT;
|
||||
WARNING: this step had a leftover error message
|
||||
FATAL: terminating connection due to administrator command
|
||||
server closed the connection unexpectedly
|
||||
This probably means the server terminated abnormally
|
||||
before or while processing the request.
|
||||
|
||||
master_remove_node
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1:
|
||||
-- update a specific node by address
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s1-abort: ABORT;
|
||||
step s2-update-node-1: <... completed>
|
||||
master_update_node
|
||||
|
||||
|
||||
step s2-abort: ABORT;
|
||||
master_remove_node
|
||||
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1-force:
|
||||
-- update a specific node by address (force)
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s2-update-node-1-force: <... completed>
|
||||
master_update_node
|
||||
|
||||
|
||||
step s2-abort: ABORT;
|
||||
step s1-abort: ABORT;
|
||||
WARNING: this step had a leftover error message
|
||||
FATAL: terminating connection due to administrator command
|
||||
SSL connection has been closed unexpectedly
|
||||
|
||||
master_remove_node
|
||||
|
||||
|
||||
|
|
@ -16,7 +16,7 @@ step s1-finish:
|
|||
COMMIT;
|
||||
|
||||
step s2-insert: <... completed>
|
||||
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102329"
|
||||
error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102337"
|
||||
step s2-finish:
|
||||
COMMIT;
|
||||
|
||||
|
|
|
@ -147,7 +147,7 @@ GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_me
|
|||
GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_disable_node(text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user;
|
||||
-- try to manipulate node metadata via non-super user
|
||||
SET ROLE non_super_user;
|
||||
SELECT 1 FROM master_initialize_node_metadata();
|
||||
|
|
|
@ -3,6 +3,7 @@ test: isolation_update_node
|
|||
test: isolation_update_node_lock_writes
|
||||
test: isolation_add_node_vs_reference_table_operations
|
||||
test: isolation_create_table_vs_add_remove_node
|
||||
test: isolation_master_update_node
|
||||
|
||||
# tests that change node metadata should precede
|
||||
# isolation_cluster_management such that tests
|
||||
|
|
|
@ -1,5 +1,11 @@
|
|||
setup
|
||||
{
|
||||
-- would be great if we could SET citus.next_shard_id TO 107000;
|
||||
-- unfortunately that will cause messages like
|
||||
-- ERROR: cached metadata for shard 107000 is inconsistent
|
||||
-- to show up. This test is therefore subject to change due to
|
||||
-- addition of tests or permutations prior to this test.
|
||||
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.shard_count TO 4;
|
||||
-- we don't want to see any entries related to 2PC recovery
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
setup
|
||||
{
|
||||
SELECT 1 FROM master_add_node('localhost', 57637);
|
||||
SELECT 1 FROM master_add_node('localhost', 57638);
|
||||
|
||||
CREATE TABLE t1(a int);
|
||||
SELECT create_distributed_table('t1','a');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE t1;
|
||||
|
||||
-- remove the nodes again
|
||||
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
step "s1-begin" { BEGIN; }
|
||||
step "s1-insert" { INSERT INTO t1 SELECT generate_series(1, 100); }
|
||||
step "s1-verify-terminated" { -- verify the connection has been terminated }
|
||||
step "s1-abort" { ABORT; }
|
||||
|
||||
session "s2"
|
||||
step "s2-begin" { BEGIN; }
|
||||
step "s2-update-node-1" {
|
||||
-- update a specific node by address
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
}
|
||||
step "s2-update-node-1-force" {
|
||||
-- update a specific node by address (force)
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
}
|
||||
step "s2-abort" { ABORT; }
|
||||
|
||||
permutation "s1-begin" "s1-insert" "s2-begin" "s2-update-node-1" "s1-abort" "s2-abort"
|
||||
permutation "s1-begin" "s1-insert" "s2-begin" "s2-update-node-1-force" "s2-abort" "s1-abort"
|
|
@ -65,7 +65,7 @@ GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_me
|
|||
GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_disable_node(text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int) TO node_metadata_user;
|
||||
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user;
|
||||
|
||||
-- try to manipulate node metadata via non-super user
|
||||
SET ROLE non_super_user;
|
||||
|
|
Loading…
Reference in New Issue