From d1f005daaced71eb22a1afc30118b8c154e53c3c Mon Sep 17 00:00:00 2001 From: velioglu Date: Tue, 4 Sep 2018 09:18:39 +0300 Subject: [PATCH] Adds UDFs for testing MX functionalities with isolation tests --- Makefile | 1 + citus.control | 2 +- src/backend/distributed/Makefile | 4 +- .../distributed/citus--8.0-5--8.0-6.sql | 84 +++ src/backend/distributed/citus.control | 2 +- .../connection/connection_management.c | 28 +- src/backend/distributed/shared_library_init.c | 22 + .../test/run_from_same_connection.c | 214 ++++++ .../distributed/transaction/backend_data.c | 217 +++++- src/include/distributed/lock_graph.h | 5 + .../distributed/run_from_same_connection.h | 23 + .../expected/isolation_reference_on_mx.out | 616 ++++++++++++++++++ src/test/regress/expected/multi_extension.out | 1 + src/test/regress/isolation_schedule | 3 + .../specs/isolation_reference_on_mx.spec | 160 +++++ src/test/regress/sql/multi_extension.sql | 1 + 16 files changed, 1354 insertions(+), 29 deletions(-) create mode 100644 src/backend/distributed/citus--8.0-5--8.0-6.sql create mode 100644 src/backend/distributed/test/run_from_same_connection.c create mode 100644 src/include/distributed/run_from_same_connection.h create mode 100644 src/test/regress/expected/isolation_reference_on_mx.out create mode 100644 src/test/regress/specs/isolation_reference_on_mx.spec diff --git a/Makefile b/Makefile index c47c5c69f..5558cc54a 100644 --- a/Makefile +++ b/Makefile @@ -72,6 +72,7 @@ OBJS = src/backend/distributed/shared_library_init.o \ src/backend/distributed/test/progress_utils.o \ src/backend/distributed/test/prune_shard_list.o \ src/backend/distributed/test/relation_access_tracking.o \ + src/backend/distributed/test/run_from_same_connection.o \ src/backend/distributed/test/sequential_execution.o \ src/backend/distributed/transaction/backend_data.o \ src/backend/distributed/transaction/citus_dist_stat_activity.o \ diff --git a/citus.control b/citus.control index 9e53b5012..aa6c5dda8 100644 --- a/citus.control +++ b/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-5' +default_version = '8.0-6' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 514a78a49..d7cf63985 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.3-1 7.3-2 7.3-3 \ 7.4-1 7.4-2 7.4-3 \ 7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \ - 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 + 8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -225,6 +225,8 @@ $(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql cat $^ > $@ $(EXTENSION)--8.0-5.sql: $(EXTENSION)--8.0-4.sql $(EXTENSION)--8.0-4--8.0-5.sql cat $^ > $@ +$(EXTENSION)--8.0-6.sql: $(EXTENSION)--8.0-5.sql $(EXTENSION)--8.0-5--8.0-6.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--8.0-5--8.0-6.sql b/src/backend/distributed/citus--8.0-5--8.0-6.sql new file mode 100644 index 000000000..4d5dbe038 --- /dev/null +++ b/src/backend/distributed/citus--8.0-5--8.0-6.sql @@ -0,0 +1,84 @@ +/* citus--8.0-5--8.0-6 */ +SET search_path = 'pg_catalog'; + +CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz) + RETURNS SETOF RECORD + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$get_global_active_transactions$$; + COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz) + IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster'; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) +RETURNS int4[] AS $$ + DECLARE + mLocalBlockingPids int4[]; + mRemoteBlockingPids int4[]; + mLocalTransactionNum int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids; + + IF (array_length(mLocalBlockingPids, 1) > 0) THEN + RETURN mLocalBlockingPids; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + + SELECT transaction_number INTO mLocalTransactionNum + FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT transaction_number INTO mLocalTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + IF EXISTS (SELECT waiting_transaction_num FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mLocalTransactionNum) THEN + SELECT array_agg(pBlockedPid) INTO mRemoteBlockingPids; + END IF; + + RETURN mRemoteBlockingPids; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[]) +RETURNS boolean AS $$ + DECLARE + mBlockedTransactionNum int8; + workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id'); + coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id'); + BEGIN + IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN + RETURN true; + END IF; + + -- pg says we're not blocked locally; check whether we're blocked globally. + -- Note that worker process may be blocked or waiting for a lock. So we need to + -- get transaction number for both of them. Following IF provides the transaction + -- number when the worker process waiting for other session. + IF EXISTS (SELECT transaction_number FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN + SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions() + WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId; + ELSE + -- Check whether transactions initiated from the coordinator get locked + SELECT transaction_number INTO mBlockedTransactionNum + FROM get_all_active_transactions() WHERE process_id = pBlockedPid; + END IF; + + RETURN EXISTS ( + SELECT 1 FROM dump_global_wait_edges() + WHERE waiting_transaction_num = mBlockedTransactionNum + ); + + END; + +$$ LANGUAGE plpgsql; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 9e53b5012..aa6c5dda8 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '8.0-5' +default_version = '8.0-6' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 38a28dde7..c8c509a70 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -26,6 +26,7 @@ #include "distributed/metadata_cache.h" #include "distributed/hash_helpers.h" #include "distributed/placement_connection.h" +#include "distributed/run_from_same_connection.h" #include "distributed/remote_commands.h" #include "distributed/version_compat.h" #include "mb/pg_wchar.h" @@ -44,6 +45,7 @@ static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void DefaultCitusNoticeProcessor(void *arg, const char *message); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); +static bool RemoteTransactionIdle(MultiConnection *connection); static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL; @@ -755,7 +757,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) */ if (!connection->sessionLifespan || PQstatus(connection->pgConn) != CONNECTION_OK || - PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE) + !RemoteTransactionIdle(connection)) { ShutdownConnection(connection); @@ -779,6 +781,30 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit) } +/* + * RemoteTransactionIdle function returns true if we manually + * set flag on run_commands_on_session_level_connection_to_node to true to + * force connection API keeping connection open or the status of the connection + * is idle. + */ +static bool +RemoteTransactionIdle(MultiConnection *connection) +{ + /* + * This is a very special case where we're running isolation tests on MX. + * We don't care whether the transaction is idle or not when we're + * running MX isolation tests. Thus, let the caller act as if the remote + * transactions is idle. + */ + if (AllowNonIdleTransactionOnXactHandling()) + { + return true; + } + + return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE; +} + + /* * SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index fbed3d27d..6b30c3f86 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -39,6 +39,7 @@ #include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" +#include "distributed/run_from_same_connection.h" #include "distributed/query_pushdown_planning.h" #include "distributed/query_stats.h" #include "distributed/remote_commands.h" @@ -977,6 +978,27 @@ RegisterCitusConfigVariables(void) NodeConninfoGucCheckHook, NodeConninfoGucAssignHook, NULL); + + DefineCustomIntVariable( + "citus.isolation_test_session_remote_process_id", + NULL, + NULL, + &IsolationTestSessionRemoteProcessID, + -1, -1, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.isolation_test_session_process_id", + NULL, + NULL, + &IsolationTestSessionProcessID, + -1, -1, INT_MAX, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + NormalizeWorkerListPath(); diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c new file mode 100644 index 000000000..d1be9d208 --- /dev/null +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -0,0 +1,214 @@ +/*------------------------------------------------------------------------- + * + * test/src/run_from_same_connection.c + * + * This file contains UDF to run consecutive commands on worker node from the + * same connection. UDFs will be used to test MX functionalities in isolation + * tests. + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" +#include "libpq-fe.h" + +#include "access/xact.h" +#include "distributed/connection_management.h" +#include "distributed/lock_graph.h" +#include "distributed/master_protocol.h" +#include "distributed/multi_logical_optimizer.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "distributed/run_from_same_connection.h" +#include "distributed/task_tracker.h" +#include "distributed/version_compat.h" +#include "executor/spi.h" +#include "lib/stringinfo.h" +#include "postmaster/postmaster.h" +#include "utils/builtins.h" +#include "utils/memutils.h" + + +#define ALTER_CURRENT_PROCESS_ID \ + "ALTER SYSTEM SET citus.isolation_test_session_process_id TO %d" +#define ALTER_CURRENT_WORKER_PROCESS_ID \ + "ALTER SYSTEM SET citus.isolation_test_session_remote_process_id TO %ld" +#define GET_PROCESS_ID "SELECT process_id FROM get_current_transaction_id()" + + +static bool allowNonIdleRemoteTransactionOnXactHandling = false; +static MultiConnection *connection = NULL; + + +/* + * Config variables which will be used by isolation framework to check transactions + * initiated from worker nodes. + */ +int IsolationTestSessionRemoteProcessID = -1; +int IsolationTestSessionProcessID = -1; + + +static int64 GetRemoteProcessId(MultiConnection *connection); + +/* declarations for dynamic loading */ +PG_FUNCTION_INFO_V1(start_session_level_connection_to_node); +PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node); +PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node); + + +/* + * AllowNonIdleTransactionOnXactHandling allows connection opened with + * SESSION_LIFESPAN remain opened even if it is not idle. + */ +bool +AllowNonIdleTransactionOnXactHandling(void) +{ + return allowNonIdleRemoteTransactionOnXactHandling; +} + + +/* + * start_session_level_connection_to_node helps us to open and keep connections + * open while sending consecutive commands, even if they are outside the transaction. + * To use the connection opened with an open transaction, we have implemented a hacky + * solution by setting a static flag, allowNonIdleRemoteTransactionOnXactHandling, on + * this file to true. That gives us to chance to keep that connection open. + * + * Note that, this UDF shouldn't be used outside the isolation tests. + */ +Datum +start_session_level_connection_to_node(PG_FUNCTION_ARGS) +{ + text *nodeName = PG_GETARG_TEXT_P(0); + uint32 nodePort = PG_GETARG_UINT32(1); + char *nodeNameString = text_to_cstring(nodeName); + + CheckCitusVersion(ERROR); + + if (connection != NULL && (strcmp(connection->hostname, nodeNameString) != 0 || + connection->port != nodePort)) + { + elog(ERROR, + "can not connect different worker nodes from the same session using start_session_level_connection_to_node"); + } + + /* + * In order to keep connection open even with an open transaction, + * allowSessionLifeSpanWithOpenTransaction is set to true. + */ + if (connection == NULL) + { + connection = GetNodeConnection(SESSION_LIFESPAN, nodeNameString, nodePort); + allowNonIdleRemoteTransactionOnXactHandling = true; + } + + if (PQstatus(connection->pgConn) != CONNECTION_OK) + { + elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort); + } + + PG_RETURN_VOID(); +} + + +/* + * run_commands_on_session_level_connection_to_node runs to consecutive commands + * from the same connection opened by start_session_level_connection_to_node. + * + * Since transactions can be initiated from worker nodes with MX, we need to + * keep them open on the worker node to check whether there exist a waiting + * transaction in test steps. In order to release the locks taken in the + * transaction we need to send related unlock commands from the same connection + * as well. + */ +Datum +run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS) +{ + text *queryText = PG_GETARG_TEXT_P(0); + char *queryString = text_to_cstring(queryText); + + StringInfo processStringInfo = makeStringInfo(); + StringInfo workerProcessStringInfo = makeStringInfo(); + MultiConnection *localConnection = GetNodeConnection(0, LOCAL_HOST_NAME, + PostPortNumber); + Oid pgReloadConfOid = InvalidOid; + + if (!connection) + { + elog(ERROR, + "start_session_level_connection_to_node must be called first to open a session level connection"); + } + + ExecuteCriticalRemoteCommand(connection, queryString); + + appendStringInfo(processStringInfo, ALTER_CURRENT_PROCESS_ID, MyProcPid); + appendStringInfo(workerProcessStringInfo, ALTER_CURRENT_WORKER_PROCESS_ID, + GetRemoteProcessId(connection)); + + /* + * Since we cannot run `ALTER SYSTEM` command within a transaction, we are + * calling it from a self-connected session. + */ + ExecuteCriticalRemoteCommand(localConnection, processStringInfo->data); + ExecuteCriticalRemoteCommand(localConnection, workerProcessStringInfo->data); + + CloseConnection(localConnection); + + /* Call pg_reload_conf UDF to update changed GUCs above on each backend */ + pgReloadConfOid = FunctionOid("pg_catalog", "pg_reload_conf", 0); + OidFunctionCall0(pgReloadConfOid); + + + PG_RETURN_VOID(); +} + + +/* + * stop_session_level_connection_to_node closes the connection opened by the + * start_session_level_connection_to_node and set the flag to false which + * allows connection API to keep connections with open transaction. + */ +Datum +stop_session_level_connection_to_node(PG_FUNCTION_ARGS) +{ + allowNonIdleRemoteTransactionOnXactHandling = false; + + if (connection != NULL) + { + CloseConnection(connection); + connection = NULL; + } + + PG_RETURN_VOID(); +} + + +/* + * GetRemoteProcessId() get the process id of remote transaction opened + * by the connection. + */ +static int64 +GetRemoteProcessId(MultiConnection *connection) +{ + StringInfo queryStringInfo = makeStringInfo(); + PGresult *result = NULL; + int64 rowCount = 0; + + appendStringInfo(queryStringInfo, GET_PROCESS_ID); + + ExecuteOptionalRemoteCommand(connection, queryStringInfo->data, &result); + + rowCount = PQntuples(result); + + if (rowCount != 1) + { + PG_RETURN_VOID(); + } + + ClearResults(connection, false); + + return ParseIntField(result, 0, 0); +} diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 0ab031901..b78e694d9 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -18,12 +18,15 @@ #include "catalog/pg_type.h" #include "datatype/timestamp.h" #include "distributed/backend_data.h" +#include "distributed/connection_management.h" #include "distributed/listutils.h" #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" #include "distributed/transaction_identifier.h" #include "nodes/execnodes.h" #include "storage/ipc.h" +#include "storage/lmgr.h" #include "storage/lwlock.h" #include "storage/proc.h" #include "storage/spin.h" @@ -31,6 +34,9 @@ #include "utils/timestamp.h" +#define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();" +#define ACTIVE_TRANSACTION_COLUMN_COUNT 6 + /* * Each backend's data reside in the shared memory * on the BackendManagementShmemData. @@ -56,6 +62,9 @@ typedef struct BackendManagementShmemData } BackendManagementShmemData; +static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc + tupleDescriptor); +static void CheckReturnSetInfo(ReturnSetInfo *returnSetInfo); static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static BackendManagementShmemData *backendManagementShmemData = NULL; static BackendData *MyBackendData = NULL; @@ -67,6 +76,7 @@ static size_t BackendManagementShmemSize(void); PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); PG_FUNCTION_INFO_V1(get_current_transaction_id); +PG_FUNCTION_INFO_V1(get_global_active_transactions); PG_FUNCTION_INFO_V1(get_all_active_transactions); @@ -180,6 +190,143 @@ get_current_transaction_id(PG_FUNCTION_ARGS) } +/* + * get_global_active_transactions returns all the available information about all + * the active backends from each node of the cluster. If you call that function from + * the coordinator, it will returns back active transaction from the coordinator as + * well. Yet, if you call it from the worker, result won't include the transactions + * on the coordinator node, since worker nodes do not aware of the coordinator. + */ +Datum +get_global_active_transactions(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = NULL; + MemoryContext perQueryContext = NULL; + MemoryContext oldContext = NULL; + List *workerNodeList = ActivePrimaryNodeList(); + ListCell *workerNodeCell = NULL; + List *connectionList = NIL; + ListCell *connectionCell = NULL; + StringInfo queryToSend = makeStringInfo(); + + CheckCitusVersion(ERROR); + CheckReturnSetInfo(returnSetInfo); + + /* build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY); + + perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory; + + oldContext = MemoryContextSwitchTo(perQueryContext); + + tupleStore = tuplestore_begin_heap(true, false, work_mem); + returnSetInfo->returnMode = SFRM_Materialize; + returnSetInfo->setResult = tupleStore; + returnSetInfo->setDesc = tupleDescriptor; + + MemoryContextSwitchTo(oldContext); + + /* add active transactions for local node */ + StoreAllActiveTransactions(tupleStore, tupleDescriptor); + + /* open connections in parallel */ + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + char *nodeName = workerNode->workerName; + int nodePort = workerNode->workerPort; + MultiConnection *connection = NULL; + int connectionFlags = 0; + + if (workerNode->groupId == GetLocalGroupId()) + { + /* we already get these transactions via GetAllActiveTransactions() */ + continue; + } + + connection = StartNodeConnection(connectionFlags, nodeName, nodePort); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + /* send commands in parallel */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + int querySent = false; + + querySent = SendRemoteCommand(connection, queryToSend->data); + if (querySent == 0) + { + ReportConnectionError(connection, WARNING); + } + } + + /* receive query results */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + PGresult *result = NULL; + bool raiseInterrupts = true; + Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT]; + bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT]; + int64 rowIndex = 0; + int64 rowCount = 0; + int64 colCount = 0; + + result = GetRemoteCommandResult(connection, raiseInterrupts); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, WARNING); + continue; + } + + rowCount = PQntuples(result); + colCount = PQnfields(result); + + /* Although it is not expected */ + if (colCount != ACTIVE_TRANSACTION_COLUMN_COUNT) + { + ereport(WARNING, (errmsg("unexpected number of columns from " + "get_all_active_transactions"))); + continue; + } + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[0] = ParseIntField(result, rowIndex, 0); + values[1] = ParseIntField(result, rowIndex, 1); + values[2] = ParseIntField(result, rowIndex, 2); + values[3] = ParseBoolField(result, rowIndex, 3); + values[4] = ParseIntField(result, rowIndex, 4); + values[5] = ParseTimestampTzField(result, rowIndex, 5); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } + + PQclear(result); + ForgetResults(connection); + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + /* * get_all_active_transactions returns all the avaliable information about all * the active backends. @@ -193,29 +340,8 @@ get_all_active_transactions(PG_FUNCTION_ARGS) MemoryContext perQueryContext = NULL; MemoryContext oldContext = NULL; - int backendIndex = 0; - - Datum values[6]; - bool isNulls[6]; - CheckCitusVersion(ERROR); - - /* check to see if caller supports us returning a tuplestore */ - if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("set-valued function called in context " \ - "that cannot accept a set"))); - } - - if (!(returnSetInfo->allowedModes & SFRM_Materialize)) - { - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("materialize mode required, but it is not " \ - "allowed in this context"))); - } + CheckReturnSetInfo(returnSetInfo); /* build a tuple descriptor for our result type */ if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE) @@ -233,6 +359,26 @@ get_all_active_transactions(PG_FUNCTION_ARGS) returnSetInfo->setDesc = tupleDescriptor; MemoryContextSwitchTo(oldContext); + StoreAllActiveTransactions(tupleStore, tupleDescriptor); + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupleStore); + + PG_RETURN_VOID(); +} + + +/* + * StoreAllActiveTransactions gets active transaction from the local node and inserts + * them into the given tuplestore. + */ +static void +StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + int backendIndex = 0; + + Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT]; + bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT]; /* * We don't want to initialize memory while spinlock is held so we @@ -288,11 +434,32 @@ get_all_active_transactions(PG_FUNCTION_ARGS) } UnlockBackendSharedMemory(); +} - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); +/* + * CheckReturnSetInfo checks whether the defined given returnSetInfo is + * proper for returning tuplestore. + */ +static void +CheckReturnSetInfo(ReturnSetInfo *returnSetInfo) +{ + /* check to see if caller supports us returning a tuplestore */ + if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context " \ + "that cannot accept a set"))); + } + + if (!(returnSetInfo->allowedModes & SFRM_Materialize)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + } } diff --git a/src/include/distributed/lock_graph.h b/src/include/distributed/lock_graph.h index 977e6dba8..eec0e20ce 100644 --- a/src/include/distributed/lock_graph.h +++ b/src/include/distributed/lock_graph.h @@ -15,7 +15,10 @@ #include "postgres.h" #include "libpq-fe.h" + #include "datatype/timestamp.h" +#include "distributed/backend_data.h" +#include "storage/lock.h" /* @@ -58,6 +61,8 @@ typedef struct WaitGraph extern WaitGraph * BuildGlobalWaitGraph(void); extern bool IsProcessWaitingForLock(PGPROC *proc); extern bool IsInDistributedTransaction(BackendData *backendData); +extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex); +extern int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); /* some utility function to parse results */ extern int64 ParseIntField(PGresult *result, int rowIndex, int colIndex); diff --git a/src/include/distributed/run_from_same_connection.h b/src/include/distributed/run_from_same_connection.h new file mode 100644 index 000000000..af9e6eade --- /dev/null +++ b/src/include/distributed/run_from_same_connection.h @@ -0,0 +1,23 @@ +/* + * run_from_same_connection.h + * + * Sending commands from same connection to test transactions initiated from + * worker nodes in the isolation framework. + * + * Copyright (c) 2018, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ +#ifndef RUN_FROM_SAME_CONNECTION_H +#define RUN_FROM_SAME_CONNECTION_H + +/* + * Config variables which will be used by isolation framework to check transactions + * initiated from worker nodes. + */ +extern int IsolationTestSessionRemoteProcessID; +extern int IsolationTestSessionProcessID; + +bool AllowNonIdleTransactionOnXactHandling(void); + +#endif /* RUN_FROM_SAME_CONNECTION_H */ diff --git a/src/test/regress/expected/isolation_reference_on_mx.out b/src/test/regress/expected/isolation_reference_on_mx.out new file mode 100644 index 000000000..d45f54659 --- /dev/null +++ b/src/test/regress/expected/isolation_reference_on_mx.out @@ -0,0 +1,616 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-delete-from-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-delete-from-ref-table: + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-insert-into-ref-table: + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-insert-into-ref-table: + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-insert-into-ref-table: + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + +run_commands_on_session_level_connection_to_node + + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-copy-to-ref-table: + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-copy-to-ref-table: + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-insert-into-ref-table: + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + +run_commands_on_session_level_connection_to_node + + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-copy-to-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-copy-to-ref-table: + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-copy-to-ref-table: + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); + +run_commands_on_session_level_connection_to_node + + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection +step s1-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57637); + +start_session_level_connection_to_node + + +step s1-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s1-select-for-update: + SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); + +run_commands_on_session_level_connection_to_node + + +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); + +step s1-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-ref-table: <... completed> +run_commands_on_session_level_connection_to_node + + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-begin s1-alter-table s2-commit-worker s1-commit s2-stop-connection +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-insert-into-ref-table: + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); + +run_commands_on_session_level_connection_to_node + + +step s1-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-alter-table: <... completed> +step s1-commit: + COMMIT; + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-select-from-ref-table s1-begin s1-alter-table s2-commit-worker s1-commit s2-stop-connection +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-select-from-ref-table: + SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM ref_table'); + +run_commands_on_session_level_connection_to_node + + +step s1-begin: + BEGIN; + +step s1-alter-table: + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); + +step s2-commit-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s1-alter-table: <... completed> +step s1-commit: + COMMIT; + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 531b44f49..c6f3c676c 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -148,6 +148,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; ALTER EXTENSION citus UPDATE TO '8.0-4'; ALTER EXTENSION citus UPDATE TO '8.0-5'; +ALTER EXTENSION citus UPDATE TO '8.0-6'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 728f7dbb4..cc7ce58d2 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -43,3 +43,6 @@ test: isolation_truncate_vs_all test: isolation_drop_vs_all test: isolation_ddl_vs_all test: isolation_citus_dist_activity + +# MX tests +test: isolation_reference_on_mx diff --git a/src/test/regress/specs/isolation_reference_on_mx.spec b/src/test/regress/specs/isolation_reference_on_mx.spec new file mode 100644 index 000000000..828f5de5f --- /dev/null +++ b/src/test/regress/specs/isolation_reference_on_mx.spec @@ -0,0 +1,160 @@ +# Create and use UDF to send commands from the same connection. Also make the cluster +# ready for testing MX functionalities. +setup +{ + CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$start_session_level_connection_to_node$$; + + CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text) + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$run_commands_on_session_level_connection_to_node$$; + + CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node() + RETURNS void + LANGUAGE C STRICT VOLATILE + AS 'citus', $$stop_session_level_connection_to_node$$; + + SELECT citus.replace_isolation_tester_func(); + SELECT citus.refresh_isolation_tester_prepared_statement(); + + SELECT start_metadata_sync_to_node('localhost', 57637); + SELECT start_metadata_sync_to_node('localhost', 57638); + SET citus.replication_model to streaming; + + CREATE TABLE ref_table(user_id int, value_1 int); + SELECT create_reference_table('ref_table'); + INSERT INTO ref_table VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71); +} + +# Create and use UDF to close the connection opened in the setup step. Also return the cluster +# back to the initial state. +teardown +{ + DROP TABLE ref_table; + SELECT citus.restore_isolation_tester_func(); +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +# We do not need to begin a transaction on coordinator, since it will be open on workers. + +step "s1-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57637); +} + +step "s1-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "s1-update-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); +} + +step "s1-delete-from-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1'); +} + +step "s1-insert-into-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); +} + +step "s1-copy-to-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); +} + +step "s1-select-for-update" +{ + SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE'); +} + +step "s1-commit-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +step "s1-alter-table" +{ + ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id); +} + +step "s1-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57638); +} + +step "s2-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "s2-update-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1'); +} + +step "s2-select-from-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM ref_table'); +} + +step "s2-delete-from-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 2'); +} + +step "s2-insert-into-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)'); +} + +step "s2-copy-to-ref-table" +{ + SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV'); +} + +step "s2-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +step "s2-commit-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-from-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" +permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection" +permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-from-ref-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection" diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index d6a6e1d1f..719cd5f9f 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -148,6 +148,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-2'; ALTER EXTENSION citus UPDATE TO '8.0-3'; ALTER EXTENSION citus UPDATE TO '8.0-4'; ALTER EXTENSION citus UPDATE TO '8.0-5'; +ALTER EXTENSION citus UPDATE TO '8.0-6'; -- show running version SHOW citus.version;