diff --git a/src/backend/distributed/citus--8.2-2--8.2-3.sql b/src/backend/distributed/citus--8.2-2--8.2-3.sql new file mode 100644 index 000000000..55bd730aa --- /dev/null +++ b/src/backend/distributed/citus--8.2-2--8.2-3.sql @@ -0,0 +1,27 @@ +/* citus--8.2-2--8.2-3 */ + +SET search_path = 'pg_catalog'; + +DROP FUNCTION master_update_node(node_id int, + new_node_name text, + new_node_port int); + +CREATE OR REPLACE FUNCTION master_update_node(node_id int, + new_node_name text, + new_node_port int, + force bool DEFAULT false, + lock_cooldown int DEFAULT 10000) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_update_node$$; + +COMMENT ON FUNCTION master_update_node(node_id int, + new_node_name text, + new_node_port int, + force bool, + lock_cooldown int) + IS 'change the location of a node. when force => true it will wait lock_cooldown ms before killing competing locks'; + +REVOKE ALL ON FUNCTION master_update_node(int,text,int,bool,int) FROM PUBLIC; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 643009f15..8570537f0 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.2-2' +default_version = '8.2-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/utils/acquire_lock.c b/src/backend/distributed/utils/acquire_lock.c new file mode 100644 index 000000000..59ea9fd20 --- /dev/null +++ b/src/backend/distributed/utils/acquire_lock.c @@ -0,0 +1,330 @@ +/*------------------------------------------------------------------------- + * + * acquire_lock.c + * A dynamic background worker that can help your backend to acquire its locks. This is + * an intrusive way of getting your way. The primary use of this will be to allow + * master_update_node to make progress during failure. When the system cannot possibly + * finish a transaction due to the host required to finish the transaction has failed + * it might be better to actively cancel the backend instead of waiting for it to fail. + * + * This file provides infrastructure for launching exactly one a background + * worker for every database in which citus is used. That background worker + * can then perform work like deadlock detection, prepared transaction + * recovery, and cleanup. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include + +#include "postgres.h" + + +#include "access/xact.h" +#include "catalog/pg_type.h" +#include "executor/spi.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "storage/ipc.h" +#include "storage/latch.h" +#include "utils/snapmgr.h" + +#include "distributed/citus_acquire_lock.h" +#include "distributed/version_compat.h" + +/* forward declaration of background worker entrypoint */ +extern void LockAcquireHelperMain(Datum main_arg); + +/* forward declaration of helper functions */ +static void lock_acquire_helper_sigterm(SIGNAL_ARGS); +static void EnsureStopLockAcquireHelper(void *arg); +static long DeadlineTimestampTzToTimeout(TimestampTz deadline); + +/* LockAcquireHelperArgs contains extra arguments to be used to start the worker */ +typedef struct LockAcquireHelperArgs +{ + Oid DatabaseId; + int32 lock_cooldown; +} LockAcquireHelperArgs; + +static bool got_sigterm = false; + + +/* + * StartLockAcquireHelperBackgroundWorker creates a background worker that will help the + * backend passed in as an argument to complete. The worker that is started will be + * terminated once the current memory context gets reset, to make sure it is cleaned up in + * all situations. It is however advised to call TerminateBackgroundWorker on the handle + * returned on the first possible moment the help is no longer required. + */ +BackgroundWorkerHandle * +StartLockAcquireHelperBackgroundWorker(int backendToHelp, int32 lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + LockAcquireHelperArgs args; + BackgroundWorker worker; + MemoryContextCallback *workerCleanup = NULL; + memset(&args, 0, sizeof(args)); + memset(&worker, 0, sizeof(worker)); + + /* collect the extra arguments required for the background worker */ + args.DatabaseId = MyDatabaseId; + args.lock_cooldown = lock_cooldown; + + /* construct the background worker and start it */ + snprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Lock Acquire Helper: %d/%u", + backendToHelp, MyDatabaseId); +#if PG_VERSION_NUM >= 110000 + snprintf(worker.bgw_type, BGW_MAXLEN, "citus_lock_aqcuire"); +#endif + + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_RecoveryFinished; + worker.bgw_restart_time = BGW_NEVER_RESTART; + + snprintf(worker.bgw_library_name, BGW_MAXLEN, "citus"); + snprintf(worker.bgw_function_name, BGW_MAXLEN, "LockAcquireHelperMain"); + worker.bgw_main_arg = Int32GetDatum(backendToHelp); + worker.bgw_notify_pid = 0; + + /* + * we check if args fits in bgw_extra to make sure it is safe to copy the data. Once + * we exceed the size of data to copy this way we need to look into a different way of + * passing the arguments to the worker. + */ + Assert(sizeof(worker.bgw_extra) >= sizeof(args)); + memcpy(worker.bgw_extra, &args, sizeof(args)); + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + ereport(ERROR, (errmsg("could not start lock acquiring background worker to " + "force the update"), + errhint("Increasing max_worker_processes might help."))); + } + + workerCleanup = palloc0(sizeof(MemoryContextCallback)); + workerCleanup->func = EnsureStopLockAcquireHelper; + workerCleanup->arg = handle; + + MemoryContextRegisterResetCallback(CurrentMemoryContext, workerCleanup); + + return handle; +} + + +/* + * EnsureStopLockAcquireHelper is designed to be called as a MemoryContextCallback. It + * takes a handle to the background worker and Terminates it. It is safe to be called on a + * handle that has already been terminated due to the guard around the generation number + * implemented in the handle by postgres. + */ +static void +EnsureStopLockAcquireHelper(void *arg) +{ + BackgroundWorkerHandle *handle = (BackgroundWorkerHandle *) arg; + TerminateBackgroundWorker(handle); +} + + +/* + * Signal handler for SIGTERM + * Set a flag to let the main loop to terminate, and set our latch to wake + * it up. + */ +static void +lock_acquire_helper_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_sigterm = true; + SetLatch(MyLatch); + + errno = save_errno; +} + + +/* + * ShouldAcquireLock tests if our backend should still proceed with acquiring the lock, + * and thus keep terminating conflicting backends. This function returns true until a + * SIGTERM, background worker termination signal, has been received. + * + * The function blocks for at most sleepms when called. During operation without being + * terminated this is the time between invocations to the backend termination logic. + */ +static bool +ShouldAcquireLock(long sleepms) +{ + int rc; + + /* early escape in case we already got the signal to stop acquiring the lock */ + if (got_sigterm) + { + return false; + } + + rc = WaitLatch(MyLatch, WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + sleepms * 1L, PG_WAIT_EXTENSION); + ResetLatch(MyLatch); + + /* emergency bailout if postmaster has died */ + if (rc & WL_POSTMASTER_DEATH) + { + proc_exit(1); + } + + CHECK_FOR_INTERRUPTS(); + + return !got_sigterm; +} + + +/* + * LockAcquireHelperMain runs in a dynamic background worker to help master_update_node to + * acquire its locks. + */ +void +LockAcquireHelperMain(Datum main_arg) +{ + int backendPid = DatumGetInt32(main_arg); + StringInfoData sql; + LockAcquireHelperArgs *args = (LockAcquireHelperArgs *) MyBgworkerEntry->bgw_extra; + long timeout = 0; + const TimestampTz connectionStart = GetCurrentTimestamp(); + const TimestampTz deadline = TimestampTzPlusMilliseconds(connectionStart, + args->lock_cooldown); + + /* parameters for sql query to be executed */ + const int paramCount = 1; + Oid paramTypes[1] = { INT4OID }; + Datum paramValues[1]; + + pqsignal(SIGTERM, lock_acquire_helper_sigterm); + + BackgroundWorkerUnblockSignals(); + + elog(LOG, "lock acquiring backend started for backend %d (cooldown %dms)", backendPid, + args->lock_cooldown); + + /* + * this loop waits till the deadline is reached (eg. lock_cooldown has passed) OR we + * no longer need to acquire the lock due to the termination of this backend. + * Only after the timeout the code will continue with the section that will acquire + * the lock. + */ + do { + timeout = DeadlineTimestampTzToTimeout(deadline); + } while (timeout > 0 && ShouldAcquireLock(timeout)); + + /* connecting to the database */ + BackgroundWorkerInitializeConnectionByOid(args->DatabaseId, InvalidOid, 0); + + /* + * The query below does a self join on pg_locks to find backends that are granted a + * lock our target backend (backendPid) is waiting for. Once it found such a backend + * it terminates the backend with pg_terminate_pid. + * + * The result is are rows of pid,bool indicating backend that is terminated and the + * success of the termination. These will be logged accordingly below for an + * administrator to correlate in the logs with the termination message. + */ + initStringInfo(&sql); + appendStringInfo(&sql, + "SELECT \n" + " DISTINCT conflicting.pid,\n" + " pg_terminate_backend(conflicting.pid)\n" + " FROM pg_locks AS blocked\n" + " JOIN pg_locks AS conflicting\n" + " ON (conflicting.database = blocked.database\n" + " AND conflicting.objid = blocked.objid)\n" + " WHERE conflicting.granted = true\n" + " AND blocked.granted = false\n" + " AND blocked.pid = $1;"); + paramValues[0] = Int32GetDatum(backendPid); + + while (ShouldAcquireLock(100)) + { + int row = 0; + int spiStatus = 0; + + elog(LOG, "canceling competing backends for backend %d", backendPid); + + /* + * Begin our transaction + */ + SetCurrentStatementStartTimestamp(); + StartTransactionCommand(); + SPI_connect(); + PushActiveSnapshot(GetTransactionSnapshot()); + pgstat_report_activity(STATE_RUNNING, sql.data); + + spiStatus = SPI_execute_with_args(sql.data, paramCount, paramTypes, paramValues, + NULL, false, 0); + + if (spiStatus == SPI_OK_SELECT) + { + for (row = 0; row < SPI_processed; row++) + { + int terminatedPid = 0; + bool isTerminated = false; + bool isnull = false; + + terminatedPid = DatumGetInt32(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + + isTerminated = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 2, &isnull)); + + if (isTerminated) + { + elog(WARNING, "terminated conflicting backend %d", terminatedPid); + } + else + { + elog(INFO, + "attempt to terminate conflicting backend %d was unsuccessful", + terminatedPid); + } + } + } + else + { + elog(FATAL, "cannot cancel competing backends for backend %d", backendPid); + } + + /* + * And finish our transaction. + */ + SPI_finish(); + PopActiveSnapshot(); + CommitTransactionCommand(); + pgstat_report_stat(false); + pgstat_report_activity(STATE_IDLE, NULL); + } + + + elog(LOG, "lock acquiring backend finished for backend %d", backendPid); + + /* safely got to the end, exit without problem */ + proc_exit(0); +} + + +/* + * DeadlineTimestampTzToTimeout returns the numer of miliseconds that still need to elapse + * before the deadline provided as an argument will be reached. The outcome can be used to + * pass to the Wait of an EventSet to make sure it returns after the timeout has passed. + */ +static long +DeadlineTimestampTzToTimeout(TimestampTz deadline) +{ + long secs = 0; + int msecs = 0; + TimestampDifference(GetCurrentTimestamp(), deadline, &secs, &msecs); + return secs * 1000 + msecs / 1000; +} diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 435bb0f19..090a57ebc 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -20,6 +20,7 @@ #include "catalog/indexing.h" #include "catalog/namespace.h" #include "commands/sequence.h" +#include "distributed/citus_acquire_lock.h" #include "distributed/colocation_utils.h" #include "distributed/connection_management.h" #include "distributed/master_protocol.h" @@ -470,10 +471,23 @@ master_update_node(PG_FUNCTION_ARGS) text *newNodeName = PG_GETARG_TEXT_P(1); int32 newNodePort = PG_GETARG_INT32(2); + /* + * force is used when an update needs to happen regardless of conflicting locks. This + * feature is important to force the update during a failover due to failure, eg. by + * a highavailability system such as pg_auto_failover. The strategy is a to start a + * background worker that actively cancels backends holding conflicting locks with + * this backend. + * + * Defaults to false + */ + bool force = PG_GETARG_BOOL(3); + int32 lock_cooldown = PG_GETARG_INT32(4); + char *newNodeNameString = text_to_cstring(newNodeName); WorkerNode *workerNode = NULL; WorkerNode *workerNodeWithSameAddress = NULL; List *placementList = NIL; + BackgroundWorkerHandle *handle = NULL; CheckCitusVersion(ERROR); @@ -518,18 +532,42 @@ master_update_node(PG_FUNCTION_ARGS) * - This function blocks until all previous queries have finished. This * means that long-running queries will prevent failover. * + * In case of node failure said long-running queries will fail in the end + * anyway as they will be unable to commit successfully on the failed + * machine. To cause quick failure of these queries use force => true + * during the invocation of master_update_node to terminate conflicting + * backends proactively. + * * It might be worth blocking reads to a secondary for the same reasons, * though we currently only query secondaries on follower clusters * where these locks will have no effect. */ if (WorkerNodeIsPrimary(workerNode)) { + /* + * before acquiring the locks check if we want a background worker to help us to + * aggressively obtain the locks. + */ + if (force) + { + handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); + } + placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); } UpdateNodeLocation(nodeId, newNodeNameString, newNodePort); + if (handle != NULL) + { + /* + * this will be called on memory context cleanup as well, if the worker has been + * terminated already this will be a noop + */ + TerminateBackgroundWorker(handle); + } + PG_RETURN_VOID(); } diff --git a/src/include/distributed/citus_acquire_lock.h b/src/include/distributed/citus_acquire_lock.h new file mode 100644 index 000000000..f19a5d8d0 --- /dev/null +++ b/src/include/distributed/citus_acquire_lock.h @@ -0,0 +1,20 @@ +/*------------------------------------------------------------------------- + * + * citus_acquire_lock.h + * Background worker to help with acquiering locks by canceling competing backends. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_ACQUIRE_LOCK_H +#define CITUS_ACQUIRE_LOCK_H + + +#include "postmaster/bgworker.h" + +BackgroundWorkerHandle * StartLockAcquireHelperBackgroundWorker(int backendToHelp, + int32 lock_cooldown); + +#endif /* CITUS_ACQUIRE_LOCK_H */ diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index b046e496d..3de412d8d 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -73,6 +73,10 @@ check-isolation: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus --isolationtester \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/isolation_schedule $(EXTRA_TESTS) +check-isolation-base: all tempinstall-main + $(pg_regress_multi_check) --load-extension=citus --isolationtester \ + -- $(MULTI_REGRESS_OPTS) $(EXTRA_TESTS) + check-vanilla: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus --vanillatest diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index 251de2973..46617f54a 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -45,16 +45,16 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT worker_apply_shard_ddl_command (105941, 'public', ' +SELECT worker_apply_shard_ddl_command (105949, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (105940, 'public', ' +SELECT worker_apply_shard_ddl_command (105948, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (105939, 'public', ' +SELECT worker_apply_shard_ddl_command (105947, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (105938, 'public', ' +SELECT worker_apply_shard_ddl_command (105946, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression @@ -116,7 +116,7 @@ query query_hostname query_hostport master_query_host_namemaster_query_ SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression -INSERT INTO public.test_table_105944 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +INSERT INTO public.test_table_105952 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -177,10 +177,10 @@ query query_hostname query_hostport master_query_host_namemaster_query_ SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_105949 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_105948 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_105947 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_105946 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105957 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105956 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105955 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_105954 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -241,7 +241,7 @@ query query_hostname query_hostport master_query_host_namemaster_query_ SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression -SELECT count(*) AS count FROM public.test_table_105951 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression +SELECT count(*) AS count FROM public.test_table_105959 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index f60650315..211fc10fa 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -77,7 +77,7 @@ step s1-get-current-transaction-id: row -(0,299) +(0,305) step s2-get-first-worker-active-transactions: SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 3361dec60..9d3d7c2f5 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -302 301 f +308 307 f transactionnumberwaitingtransactionnumbers -301 -302 301 +307 +308 307 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -306 305 f -307 305 f -307 306 t +312 311 f +313 311 f +313 312 t transactionnumberwaitingtransactionnumbers -305 -306 305 -307 305,306 +311 +312 311 +313 311,312 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_master_update_node.out b/src/test/regress/expected/isolation_master_update_node.out new file mode 100644 index 000000000..09d9bf781 --- /dev/null +++ b/src/test/regress/expected/isolation_master_update_node.out @@ -0,0 +1,57 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort +create_distributed_table + + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1: + -- update a specific node by address + SELECT master_update_node(nodeid, 'localhost', nodeport + 10) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s1-abort: ABORT; +step s2-update-node-1: <... completed> +master_update_node + + +step s2-abort: ABORT; +master_remove_node + + + + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort +create_distributed_table + + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1-force: + -- update a specific node by address (force) + SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s2-update-node-1-force: <... completed> +master_update_node + + +step s2-abort: ABORT; +step s1-abort: ABORT; +WARNING: this step had a leftover error message +FATAL: terminating connection due to administrator command +server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. + +master_remove_node + + + diff --git a/src/test/regress/expected/isolation_master_update_node_0.out b/src/test/regress/expected/isolation_master_update_node_0.out new file mode 100644 index 000000000..eb450d715 --- /dev/null +++ b/src/test/regress/expected/isolation_master_update_node_0.out @@ -0,0 +1,55 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort +create_distributed_table + + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1: + -- update a specific node by address + SELECT master_update_node(nodeid, 'localhost', nodeport + 10) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s1-abort: ABORT; +step s2-update-node-1: <... completed> +master_update_node + + +step s2-abort: ABORT; +master_remove_node + + + + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort +create_distributed_table + + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1-force: + -- update a specific node by address (force) + SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s2-update-node-1-force: <... completed> +master_update_node + + +step s2-abort: ABORT; +step s1-abort: ABORT; +WARNING: this step had a leftover error message +FATAL: terminating connection due to administrator command +SSL connection has been closed unexpectedly + +master_remove_node + + + diff --git a/src/test/regress/expected/isolation_replace_wait_function.out b/src/test/regress/expected/isolation_replace_wait_function.out index 60a142d11..ac2c36096 100644 --- a/src/test/regress/expected/isolation_replace_wait_function.out +++ b/src/test/regress/expected/isolation_replace_wait_function.out @@ -16,7 +16,7 @@ step s1-finish: COMMIT; step s2-insert: <... completed> -error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102329" +error in steps s1-finish s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102337" step s2-finish: COMMIT; diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 98d413f85..e5f1d6778 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -147,7 +147,7 @@ GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_me GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user; GRANT EXECUTE ON FUNCTION master_disable_node(text,int) TO node_metadata_user; GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user; -GRANT EXECUTE ON FUNCTION master_update_node(int,text,int) TO node_metadata_user; +GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user; -- try to manipulate node metadata via non-super user SET ROLE non_super_user; SELECT 1 FROM master_initialize_node_metadata(); diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index a457c145d..85870b9c8 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -3,6 +3,7 @@ test: isolation_update_node test: isolation_update_node_lock_writes test: isolation_add_node_vs_reference_table_operations test: isolation_create_table_vs_add_remove_node +test: isolation_master_update_node # tests that change node metadata should precede # isolation_cluster_management such that tests diff --git a/src/test/regress/specs/isolation_citus_dist_activity.spec b/src/test/regress/specs/isolation_citus_dist_activity.spec index c50d46582..79edf2584 100644 --- a/src/test/regress/specs/isolation_citus_dist_activity.spec +++ b/src/test/regress/specs/isolation_citus_dist_activity.spec @@ -1,5 +1,11 @@ setup { + -- would be great if we could SET citus.next_shard_id TO 107000; + -- unfortunately that will cause messages like + -- ERROR: cached metadata for shard 107000 is inconsistent + -- to show up. This test is therefore subject to change due to + -- addition of tests or permutations prior to this test. + SET citus.shard_replication_factor TO 1; SET citus.shard_count TO 4; -- we don't want to see any entries related to 2PC recovery diff --git a/src/test/regress/specs/isolation_master_update_node.spec b/src/test/regress/specs/isolation_master_update_node.spec new file mode 100644 index 000000000..a7fdf3652 --- /dev/null +++ b/src/test/regress/specs/isolation_master_update_node.spec @@ -0,0 +1,43 @@ +setup +{ + SELECT 1 FROM master_add_node('localhost', 57637); + SELECT 1 FROM master_add_node('localhost', 57638); + + CREATE TABLE t1(a int); + SELECT create_distributed_table('t1','a'); +} + +teardown +{ + DROP TABLE t1; + + -- remove the nodes again + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; +} + +session "s1" +step "s1-begin" { BEGIN; } +step "s1-insert" { INSERT INTO t1 SELECT generate_series(1, 100); } +step "s1-verify-terminated" { -- verify the connection has been terminated } +step "s1-abort" { ABORT; } + +session "s2" +step "s2-begin" { BEGIN; } +step "s2-update-node-1" { + -- update a specific node by address + SELECT master_update_node(nodeid, 'localhost', nodeport + 10) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; +} +step "s2-update-node-1-force" { + -- update a specific node by address (force) + SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; +} +step "s2-abort" { ABORT; } + +permutation "s1-begin" "s1-insert" "s2-begin" "s2-update-node-1" "s1-abort" "s2-abort" +permutation "s1-begin" "s1-insert" "s2-begin" "s2-update-node-1-force" "s2-abort" "s1-abort" diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 01d43c839..77a54707d 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -65,7 +65,7 @@ GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_me GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user; GRANT EXECUTE ON FUNCTION master_disable_node(text,int) TO node_metadata_user; GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user; -GRANT EXECUTE ON FUNCTION master_update_node(int,text,int) TO node_metadata_user; +GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user; -- try to manipulate node metadata via non-super user SET ROLE non_super_user;