mirror of https://github.com/citusdata/citus.git
Adds UDFs for testing MX functionalities with isolation tests
parent
470ee0b4d9
commit
d1f005daac
1
Makefile
1
Makefile
|
@ -72,6 +72,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
|
|||
src/backend/distributed/test/progress_utils.o \
|
||||
src/backend/distributed/test/prune_shard_list.o \
|
||||
src/backend/distributed/test/relation_access_tracking.o \
|
||||
src/backend/distributed/test/run_from_same_connection.o \
|
||||
src/backend/distributed/test/sequential_execution.o \
|
||||
src/backend/distributed/transaction/backend_data.o \
|
||||
src/backend/distributed/transaction/citus_dist_stat_activity.o \
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '8.0-5'
|
||||
default_version = '8.0-6'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
|||
7.3-1 7.3-2 7.3-3 \
|
||||
7.4-1 7.4-2 7.4-3 \
|
||||
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
|
||||
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5
|
||||
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -225,6 +225,8 @@ $(EXTENSION)--8.0-4.sql: $(EXTENSION)--8.0-3.sql $(EXTENSION)--8.0-3--8.0-4.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--8.0-5.sql: $(EXTENSION)--8.0-4.sql $(EXTENSION)--8.0-4--8.0-5.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--8.0-6.sql: $(EXTENSION)--8.0-5.sql $(EXTENSION)--8.0-5--8.0-6.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/* citus--8.0-5--8.0-6 */
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
CREATE FUNCTION get_global_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$get_global_active_transactions$$;
|
||||
COMMENT ON FUNCTION get_global_active_transactions(OUT database_id oid, OUT process_id int, OUT initiator_node_identifier int4, OUT transaction_number int8, OUT transaction_stamp timestamptz)
|
||||
IS 'returns distributed transaction ids of active distributed transactions from each node of the cluster';
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
|
||||
RETURNS int4[] AS $$
|
||||
DECLARE
|
||||
mLocalBlockingPids int4[];
|
||||
mRemoteBlockingPids int4[];
|
||||
mLocalTransactionNum int8;
|
||||
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
|
||||
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
|
||||
BEGIN
|
||||
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
|
||||
|
||||
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
|
||||
RETURN mLocalBlockingPids;
|
||||
END IF;
|
||||
|
||||
-- pg says we're not blocked locally; check whether we're blocked globally.
|
||||
-- Note that worker process may be blocked or waiting for a lock. So we need to
|
||||
-- get transaction number for both of them. Following IF provides the transaction
|
||||
-- number when the worker process waiting for other session.
|
||||
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
|
||||
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
|
||||
|
||||
SELECT transaction_number INTO mLocalTransactionNum
|
||||
FROM get_global_active_transactions() WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
|
||||
ELSE
|
||||
-- Check whether transactions initiated from the coordinator get locked
|
||||
SELECT transaction_number INTO mLocalTransactionNum
|
||||
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
||||
END IF;
|
||||
|
||||
IF EXISTS (SELECT waiting_transaction_num FROM dump_global_wait_edges()
|
||||
WHERE waiting_transaction_num = mLocalTransactionNum) THEN
|
||||
SELECT array_agg(pBlockedPid) INTO mRemoteBlockingPids;
|
||||
END IF;
|
||||
|
||||
RETURN mRemoteBlockingPids;
|
||||
END;
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
|
||||
RETURNS boolean AS $$
|
||||
DECLARE
|
||||
mBlockedTransactionNum int8;
|
||||
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
|
||||
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
|
||||
BEGIN
|
||||
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
|
||||
RETURN true;
|
||||
END IF;
|
||||
|
||||
-- pg says we're not blocked locally; check whether we're blocked globally.
|
||||
-- Note that worker process may be blocked or waiting for a lock. So we need to
|
||||
-- get transaction number for both of them. Following IF provides the transaction
|
||||
-- number when the worker process waiting for other session.
|
||||
IF EXISTS (SELECT transaction_number FROM get_global_active_transactions()
|
||||
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
|
||||
SELECT transaction_number INTO mBlockedTransactionNum FROM get_global_active_transactions()
|
||||
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
|
||||
ELSE
|
||||
-- Check whether transactions initiated from the coordinator get locked
|
||||
SELECT transaction_number INTO mBlockedTransactionNum
|
||||
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
|
||||
END IF;
|
||||
|
||||
RETURN EXISTS (
|
||||
SELECT 1 FROM dump_global_wait_edges()
|
||||
WHERE waiting_transaction_num = mBlockedTransactionNum
|
||||
);
|
||||
|
||||
END;
|
||||
|
||||
$$ LANGUAGE plpgsql;
|
||||
|
||||
RESET search_path;
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '8.0-5'
|
||||
default_version = '8.0-6'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/hash_helpers.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
|
@ -44,6 +45,7 @@ static MultiConnection * StartConnectionEstablishment(ConnectionHashKey *key);
|
|||
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
|
||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||
|
||||
|
||||
static int CitusNoticeLogLevel = DEFAULT_CITUS_NOTICE_LEVEL;
|
||||
|
@ -755,7 +757,7 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
|||
*/
|
||||
if (!connection->sessionLifespan ||
|
||||
PQstatus(connection->pgConn) != CONNECTION_OK ||
|
||||
PQtransactionStatus(connection->pgConn) != PQTRANS_IDLE)
|
||||
!RemoteTransactionIdle(connection))
|
||||
{
|
||||
ShutdownConnection(connection);
|
||||
|
||||
|
@ -779,6 +781,30 @@ AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoteTransactionIdle function returns true if we manually
|
||||
* set flag on run_commands_on_session_level_connection_to_node to true to
|
||||
* force connection API keeping connection open or the status of the connection
|
||||
* is idle.
|
||||
*/
|
||||
static bool
|
||||
RemoteTransactionIdle(MultiConnection *connection)
|
||||
{
|
||||
/*
|
||||
* This is a very special case where we're running isolation tests on MX.
|
||||
* We don't care whether the transaction is idle or not when we're
|
||||
* running MX isolation tests. Thus, let the caller act as if the remote
|
||||
* transactions is idle.
|
||||
*/
|
||||
if (AllowNonIdleTransactionOnXactHandling())
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
return PQtransactionStatus(connection->pgConn) == PQTRANS_IDLE;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetCitusNoticeProcessor sets the NoticeProcessor to DefaultCitusNoticeProcessor
|
||||
*/
|
||||
|
|
|
@ -39,6 +39,7 @@
|
|||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/query_pushdown_planning.h"
|
||||
#include "distributed/query_stats.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
|
@ -977,6 +978,27 @@ RegisterCitusConfigVariables(void)
|
|||
NodeConninfoGucCheckHook,
|
||||
NodeConninfoGucAssignHook,
|
||||
NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.isolation_test_session_remote_process_id",
|
||||
NULL,
|
||||
NULL,
|
||||
&IsolationTestSessionRemoteProcessID,
|
||||
-1, -1, INT_MAX,
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.isolation_test_session_process_id",
|
||||
NULL,
|
||||
NULL,
|
||||
&IsolationTestSessionProcessID,
|
||||
-1, -1, INT_MAX,
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
NormalizeWorkerListPath();
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,214 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/run_from_same_connection.c
|
||||
*
|
||||
* This file contains UDF to run consecutive commands on worker node from the
|
||||
* same connection. UDFs will be used to test MX functionalities in isolation
|
||||
* tests.
|
||||
*
|
||||
* Copyright (c) 2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "access/xact.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_logical_optimizer.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/run_from_same_connection.h"
|
||||
#include "distributed/task_tracker.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "executor/spi.h"
|
||||
#include "lib/stringinfo.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
#define ALTER_CURRENT_PROCESS_ID \
|
||||
"ALTER SYSTEM SET citus.isolation_test_session_process_id TO %d"
|
||||
#define ALTER_CURRENT_WORKER_PROCESS_ID \
|
||||
"ALTER SYSTEM SET citus.isolation_test_session_remote_process_id TO %ld"
|
||||
#define GET_PROCESS_ID "SELECT process_id FROM get_current_transaction_id()"
|
||||
|
||||
|
||||
static bool allowNonIdleRemoteTransactionOnXactHandling = false;
|
||||
static MultiConnection *connection = NULL;
|
||||
|
||||
|
||||
/*
|
||||
* Config variables which will be used by isolation framework to check transactions
|
||||
* initiated from worker nodes.
|
||||
*/
|
||||
int IsolationTestSessionRemoteProcessID = -1;
|
||||
int IsolationTestSessionProcessID = -1;
|
||||
|
||||
|
||||
static int64 GetRemoteProcessId(MultiConnection *connection);
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(start_session_level_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node);
|
||||
PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node);
|
||||
|
||||
|
||||
/*
|
||||
* AllowNonIdleTransactionOnXactHandling allows connection opened with
|
||||
* SESSION_LIFESPAN remain opened even if it is not idle.
|
||||
*/
|
||||
bool
|
||||
AllowNonIdleTransactionOnXactHandling(void)
|
||||
{
|
||||
return allowNonIdleRemoteTransactionOnXactHandling;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* start_session_level_connection_to_node helps us to open and keep connections
|
||||
* open while sending consecutive commands, even if they are outside the transaction.
|
||||
* To use the connection opened with an open transaction, we have implemented a hacky
|
||||
* solution by setting a static flag, allowNonIdleRemoteTransactionOnXactHandling, on
|
||||
* this file to true. That gives us to chance to keep that connection open.
|
||||
*
|
||||
* Note that, this UDF shouldn't be used outside the isolation tests.
|
||||
*/
|
||||
Datum
|
||||
start_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *nodeName = PG_GETARG_TEXT_P(0);
|
||||
uint32 nodePort = PG_GETARG_UINT32(1);
|
||||
char *nodeNameString = text_to_cstring(nodeName);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
if (connection != NULL && (strcmp(connection->hostname, nodeNameString) != 0 ||
|
||||
connection->port != nodePort))
|
||||
{
|
||||
elog(ERROR,
|
||||
"can not connect different worker nodes from the same session using start_session_level_connection_to_node");
|
||||
}
|
||||
|
||||
/*
|
||||
* In order to keep connection open even with an open transaction,
|
||||
* allowSessionLifeSpanWithOpenTransaction is set to true.
|
||||
*/
|
||||
if (connection == NULL)
|
||||
{
|
||||
connection = GetNodeConnection(SESSION_LIFESPAN, nodeNameString, nodePort);
|
||||
allowNonIdleRemoteTransactionOnXactHandling = true;
|
||||
}
|
||||
|
||||
if (PQstatus(connection->pgConn) != CONNECTION_OK)
|
||||
{
|
||||
elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort);
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* run_commands_on_session_level_connection_to_node runs to consecutive commands
|
||||
* from the same connection opened by start_session_level_connection_to_node.
|
||||
*
|
||||
* Since transactions can be initiated from worker nodes with MX, we need to
|
||||
* keep them open on the worker node to check whether there exist a waiting
|
||||
* transaction in test steps. In order to release the locks taken in the
|
||||
* transaction we need to send related unlock commands from the same connection
|
||||
* as well.
|
||||
*/
|
||||
Datum
|
||||
run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *queryText = PG_GETARG_TEXT_P(0);
|
||||
char *queryString = text_to_cstring(queryText);
|
||||
|
||||
StringInfo processStringInfo = makeStringInfo();
|
||||
StringInfo workerProcessStringInfo = makeStringInfo();
|
||||
MultiConnection *localConnection = GetNodeConnection(0, LOCAL_HOST_NAME,
|
||||
PostPortNumber);
|
||||
Oid pgReloadConfOid = InvalidOid;
|
||||
|
||||
if (!connection)
|
||||
{
|
||||
elog(ERROR,
|
||||
"start_session_level_connection_to_node must be called first to open a session level connection");
|
||||
}
|
||||
|
||||
ExecuteCriticalRemoteCommand(connection, queryString);
|
||||
|
||||
appendStringInfo(processStringInfo, ALTER_CURRENT_PROCESS_ID, MyProcPid);
|
||||
appendStringInfo(workerProcessStringInfo, ALTER_CURRENT_WORKER_PROCESS_ID,
|
||||
GetRemoteProcessId(connection));
|
||||
|
||||
/*
|
||||
* Since we cannot run `ALTER SYSTEM` command within a transaction, we are
|
||||
* calling it from a self-connected session.
|
||||
*/
|
||||
ExecuteCriticalRemoteCommand(localConnection, processStringInfo->data);
|
||||
ExecuteCriticalRemoteCommand(localConnection, workerProcessStringInfo->data);
|
||||
|
||||
CloseConnection(localConnection);
|
||||
|
||||
/* Call pg_reload_conf UDF to update changed GUCs above on each backend */
|
||||
pgReloadConfOid = FunctionOid("pg_catalog", "pg_reload_conf", 0);
|
||||
OidFunctionCall0(pgReloadConfOid);
|
||||
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* stop_session_level_connection_to_node closes the connection opened by the
|
||||
* start_session_level_connection_to_node and set the flag to false which
|
||||
* allows connection API to keep connections with open transaction.
|
||||
*/
|
||||
Datum
|
||||
stop_session_level_connection_to_node(PG_FUNCTION_ARGS)
|
||||
{
|
||||
allowNonIdleRemoteTransactionOnXactHandling = false;
|
||||
|
||||
if (connection != NULL)
|
||||
{
|
||||
CloseConnection(connection);
|
||||
connection = NULL;
|
||||
}
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRemoteProcessId() get the process id of remote transaction opened
|
||||
* by the connection.
|
||||
*/
|
||||
static int64
|
||||
GetRemoteProcessId(MultiConnection *connection)
|
||||
{
|
||||
StringInfo queryStringInfo = makeStringInfo();
|
||||
PGresult *result = NULL;
|
||||
int64 rowCount = 0;
|
||||
|
||||
appendStringInfo(queryStringInfo, GET_PROCESS_ID);
|
||||
|
||||
ExecuteOptionalRemoteCommand(connection, queryStringInfo->data, &result);
|
||||
|
||||
rowCount = PQntuples(result);
|
||||
|
||||
if (rowCount != 1)
|
||||
{
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
ClearResults(connection, false);
|
||||
|
||||
return ParseIntField(result, 0, 0);
|
||||
}
|
|
@ -18,12 +18,15 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "datatype/timestamp.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/lock_graph.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "distributed/transaction_identifier.h"
|
||||
#include "nodes/execnodes.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "storage/lwlock.h"
|
||||
#include "storage/proc.h"
|
||||
#include "storage/spin.h"
|
||||
|
@ -31,6 +34,9 @@
|
|||
#include "utils/timestamp.h"
|
||||
|
||||
|
||||
#define GET_ACTIVE_TRANSACTION_QUERY "SELECT * FROM get_all_active_transactions();"
|
||||
#define ACTIVE_TRANSACTION_COLUMN_COUNT 6
|
||||
|
||||
/*
|
||||
* Each backend's data reside in the shared memory
|
||||
* on the BackendManagementShmemData.
|
||||
|
@ -56,6 +62,9 @@ typedef struct BackendManagementShmemData
|
|||
} BackendManagementShmemData;
|
||||
|
||||
|
||||
static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc
|
||||
tupleDescriptor);
|
||||
static void CheckReturnSetInfo(ReturnSetInfo *returnSetInfo);
|
||||
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||
static BackendManagementShmemData *backendManagementShmemData = NULL;
|
||||
static BackendData *MyBackendData = NULL;
|
||||
|
@ -67,6 +76,7 @@ static size_t BackendManagementShmemSize(void);
|
|||
|
||||
PG_FUNCTION_INFO_V1(assign_distributed_transaction_id);
|
||||
PG_FUNCTION_INFO_V1(get_current_transaction_id);
|
||||
PG_FUNCTION_INFO_V1(get_global_active_transactions);
|
||||
PG_FUNCTION_INFO_V1(get_all_active_transactions);
|
||||
|
||||
|
||||
|
@ -180,6 +190,143 @@ get_current_transaction_id(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_global_active_transactions returns all the available information about all
|
||||
* the active backends from each node of the cluster. If you call that function from
|
||||
* the coordinator, it will returns back active transaction from the coordinator as
|
||||
* well. Yet, if you call it from the worker, result won't include the transactions
|
||||
* on the coordinator node, since worker nodes do not aware of the coordinator.
|
||||
*/
|
||||
Datum
|
||||
get_global_active_transactions(PG_FUNCTION_ARGS)
|
||||
{
|
||||
ReturnSetInfo *returnSetInfo = (ReturnSetInfo *) fcinfo->resultinfo;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = NULL;
|
||||
MemoryContext perQueryContext = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
List *workerNodeList = ActivePrimaryNodeList();
|
||||
ListCell *workerNodeCell = NULL;
|
||||
List *connectionList = NIL;
|
||||
ListCell *connectionCell = NULL;
|
||||
StringInfo queryToSend = makeStringInfo();
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
CheckReturnSetInfo(returnSetInfo);
|
||||
|
||||
/* build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
|
||||
{
|
||||
elog(ERROR, "return type must be a row type");
|
||||
}
|
||||
|
||||
appendStringInfo(queryToSend, GET_ACTIVE_TRANSACTION_QUERY);
|
||||
|
||||
perQueryContext = returnSetInfo->econtext->ecxt_per_query_memory;
|
||||
|
||||
oldContext = MemoryContextSwitchTo(perQueryContext);
|
||||
|
||||
tupleStore = tuplestore_begin_heap(true, false, work_mem);
|
||||
returnSetInfo->returnMode = SFRM_Materialize;
|
||||
returnSetInfo->setResult = tupleStore;
|
||||
returnSetInfo->setDesc = tupleDescriptor;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
/* add active transactions for local node */
|
||||
StoreAllActiveTransactions(tupleStore, tupleDescriptor);
|
||||
|
||||
/* open connections in parallel */
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
char *nodeName = workerNode->workerName;
|
||||
int nodePort = workerNode->workerPort;
|
||||
MultiConnection *connection = NULL;
|
||||
int connectionFlags = 0;
|
||||
|
||||
if (workerNode->groupId == GetLocalGroupId())
|
||||
{
|
||||
/* we already get these transactions via GetAllActiveTransactions() */
|
||||
continue;
|
||||
}
|
||||
|
||||
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
|
||||
|
||||
connectionList = lappend(connectionList, connection);
|
||||
}
|
||||
|
||||
FinishConnectionListEstablishment(connectionList);
|
||||
|
||||
/* send commands in parallel */
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||
int querySent = false;
|
||||
|
||||
querySent = SendRemoteCommand(connection, queryToSend->data);
|
||||
if (querySent == 0)
|
||||
{
|
||||
ReportConnectionError(connection, WARNING);
|
||||
}
|
||||
}
|
||||
|
||||
/* receive query results */
|
||||
foreach(connectionCell, connectionList)
|
||||
{
|
||||
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
|
||||
PGresult *result = NULL;
|
||||
bool raiseInterrupts = true;
|
||||
Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||
bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||
int64 rowIndex = 0;
|
||||
int64 rowCount = 0;
|
||||
int64 colCount = 0;
|
||||
|
||||
result = GetRemoteCommandResult(connection, raiseInterrupts);
|
||||
if (!IsResponseOK(result))
|
||||
{
|
||||
ReportResultError(connection, result, WARNING);
|
||||
continue;
|
||||
}
|
||||
|
||||
rowCount = PQntuples(result);
|
||||
colCount = PQnfields(result);
|
||||
|
||||
/* Although it is not expected */
|
||||
if (colCount != ACTIVE_TRANSACTION_COLUMN_COUNT)
|
||||
{
|
||||
ereport(WARNING, (errmsg("unexpected number of columns from "
|
||||
"get_all_active_transactions")));
|
||||
continue;
|
||||
}
|
||||
|
||||
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||
{
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
values[0] = ParseIntField(result, rowIndex, 0);
|
||||
values[1] = ParseIntField(result, rowIndex, 1);
|
||||
values[2] = ParseIntField(result, rowIndex, 2);
|
||||
values[3] = ParseBoolField(result, rowIndex, 3);
|
||||
values[4] = ParseIntField(result, rowIndex, 4);
|
||||
values[5] = ParseTimestampTzField(result, rowIndex, 5);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
ForgetResults(connection);
|
||||
}
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_all_active_transactions returns all the avaliable information about all
|
||||
* the active backends.
|
||||
|
@ -193,29 +340,8 @@ get_all_active_transactions(PG_FUNCTION_ARGS)
|
|||
MemoryContext perQueryContext = NULL;
|
||||
MemoryContext oldContext = NULL;
|
||||
|
||||
int backendIndex = 0;
|
||||
|
||||
Datum values[6];
|
||||
bool isNulls[6];
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("set-valued function called in context " \
|
||||
"that cannot accept a set")));
|
||||
}
|
||||
|
||||
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("materialize mode required, but it is not " \
|
||||
"allowed in this context")));
|
||||
}
|
||||
CheckReturnSetInfo(returnSetInfo);
|
||||
|
||||
/* build a tuple descriptor for our result type */
|
||||
if (get_call_result_type(fcinfo, NULL, &tupleDescriptor) != TYPEFUNC_COMPOSITE)
|
||||
|
@ -233,6 +359,26 @@ get_all_active_transactions(PG_FUNCTION_ARGS)
|
|||
returnSetInfo->setDesc = tupleDescriptor;
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
StoreAllActiveTransactions(tupleStore, tupleDescriptor);
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StoreAllActiveTransactions gets active transaction from the local node and inserts
|
||||
* them into the given tuplestore.
|
||||
*/
|
||||
static void
|
||||
StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
|
||||
{
|
||||
int backendIndex = 0;
|
||||
|
||||
Datum values[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||
bool isNulls[ACTIVE_TRANSACTION_COLUMN_COUNT];
|
||||
|
||||
/*
|
||||
* We don't want to initialize memory while spinlock is held so we
|
||||
|
@ -288,11 +434,32 @@ get_all_active_transactions(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
UnlockBackendSharedMemory();
|
||||
}
|
||||
|
||||
/* clean up and return the tuplestore */
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
/*
|
||||
* CheckReturnSetInfo checks whether the defined given returnSetInfo is
|
||||
* proper for returning tuplestore.
|
||||
*/
|
||||
static void
|
||||
CheckReturnSetInfo(ReturnSetInfo *returnSetInfo)
|
||||
{
|
||||
/* check to see if caller supports us returning a tuplestore */
|
||||
if (returnSetInfo == NULL || !IsA(returnSetInfo, ReturnSetInfo))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("set-valued function called in context " \
|
||||
"that cannot accept a set")));
|
||||
}
|
||||
|
||||
if (!(returnSetInfo->allowedModes & SFRM_Materialize))
|
||||
{
|
||||
ereport(ERROR,
|
||||
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("materialize mode required, but it is not " \
|
||||
"allowed in this context")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -15,7 +15,10 @@
|
|||
|
||||
#include "postgres.h"
|
||||
#include "libpq-fe.h"
|
||||
|
||||
#include "datatype/timestamp.h"
|
||||
#include "distributed/backend_data.h"
|
||||
#include "storage/lock.h"
|
||||
|
||||
|
||||
/*
|
||||
|
@ -58,6 +61,8 @@ typedef struct WaitGraph
|
|||
extern WaitGraph * BuildGlobalWaitGraph(void);
|
||||
extern bool IsProcessWaitingForLock(PGPROC *proc);
|
||||
extern bool IsInDistributedTransaction(BackendData *backendData);
|
||||
extern TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);
|
||||
extern int64 ParseIntField(PGresult *result, int rowIndex, int colIndex);
|
||||
|
||||
/* some utility function to parse results */
|
||||
extern int64 ParseIntField(PGresult *result, int rowIndex, int colIndex);
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* run_from_same_connection.h
|
||||
*
|
||||
* Sending commands from same connection to test transactions initiated from
|
||||
* worker nodes in the isolation framework.
|
||||
*
|
||||
* Copyright (c) 2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#ifndef RUN_FROM_SAME_CONNECTION_H
|
||||
#define RUN_FROM_SAME_CONNECTION_H
|
||||
|
||||
/*
|
||||
* Config variables which will be used by isolation framework to check transactions
|
||||
* initiated from worker nodes.
|
||||
*/
|
||||
extern int IsolationTestSessionRemoteProcessID;
|
||||
extern int IsolationTestSessionProcessID;
|
||||
|
||||
bool AllowNonIdleTransactionOnXactHandling(void);
|
||||
|
||||
#endif /* RUN_FROM_SAME_CONNECTION_H */
|
|
@ -0,0 +1,616 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-delete-from-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-delete-from-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-insert-into-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-insert-into-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-insert-into-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-insert-into-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-copy-to-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-copy-to-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-insert-into-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-copy-to-ref-table s2-start-session-level-connection s2-begin-on-worker s2-copy-to-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-copy-to-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-copy-to-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select-for-update s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s1-commit-worker s2-commit-worker s1-stop-connection s2-stop-connection
|
||||
step s1-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-select-for-update:
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-ref-table: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-insert-into-ref-table s1-begin s1-alter-table s2-commit-worker s1-commit s2-stop-connection
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-insert-into-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|
||||
<waiting ...>
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-alter-table: <... completed>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: s2-start-session-level-connection s2-begin-on-worker s2-select-from-ref-table s1-begin s1-alter-table s2-commit-worker s1-commit s2-stop-connection
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-select-from-ref-table:
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM ref_table');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-alter-table:
|
||||
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|
||||
<waiting ...>
|
||||
step s2-commit-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-alter-table: <... completed>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
|
@ -148,6 +148,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-2';
|
|||
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-6';
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
citus.version
|
||||
|
|
|
@ -43,3 +43,6 @@ test: isolation_truncate_vs_all
|
|||
test: isolation_drop_vs_all
|
||||
test: isolation_ddl_vs_all
|
||||
test: isolation_citus_dist_activity
|
||||
|
||||
# MX tests
|
||||
test: isolation_reference_on_mx
|
||||
|
|
|
@ -0,0 +1,160 @@
|
|||
# Create and use UDF to send commands from the same connection. Also make the cluster
|
||||
# ready for testing MX functionalities.
|
||||
setup
|
||||
{
|
||||
CREATE OR REPLACE FUNCTION start_session_level_connection_to_node(text, integer)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$start_session_level_connection_to_node$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$run_commands_on_session_level_connection_to_node$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION stop_session_level_connection_to_node()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$stop_session_level_connection_to_node$$;
|
||||
|
||||
SELECT citus.replace_isolation_tester_func();
|
||||
SELECT citus.refresh_isolation_tester_prepared_statement();
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', 57637);
|
||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
||||
SET citus.replication_model to streaming;
|
||||
|
||||
CREATE TABLE ref_table(user_id int, value_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
INSERT INTO ref_table VALUES (1, 11), (2, 21), (3, 31), (4, 41), (5, 51), (6, 61), (7, 71);
|
||||
}
|
||||
|
||||
# Create and use UDF to close the connection opened in the setup step. Also return the cluster
|
||||
# back to the initial state.
|
||||
teardown
|
||||
{
|
||||
DROP TABLE ref_table;
|
||||
SELECT citus.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
# We do not need to begin a transaction on coordinator, since it will be open on workers.
|
||||
|
||||
step "s1-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||
}
|
||||
|
||||
step "s1-begin-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "s1-update-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
}
|
||||
|
||||
step "s1-delete-from-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 1');
|
||||
}
|
||||
|
||||
step "s1-insert-into-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
}
|
||||
|
||||
step "s1-copy-to-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
}
|
||||
|
||||
step "s1-select-for-update"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM ref_table FOR UPDATE');
|
||||
}
|
||||
|
||||
step "s1-commit-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
}
|
||||
|
||||
step "s1-alter-table"
|
||||
{
|
||||
ALTER TABLE ref_table ADD CONSTRAINT rf_p_key PRIMARY KEY(user_id);
|
||||
}
|
||||
|
||||
step "s1-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s2-begin-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "s2-update-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE ref_table SET value_1 = 12 WHERE user_id = 1');
|
||||
}
|
||||
|
||||
step "s2-select-from-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('SELECT count(*) FROM ref_table');
|
||||
}
|
||||
|
||||
step "s2-delete-from-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('DELETE FROM ref_table WHERE user_id = 2');
|
||||
}
|
||||
|
||||
step "s2-insert-into-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('INSERT INTO ref_table VALUES(8,81),(9,91)');
|
||||
}
|
||||
|
||||
step "s2-copy-to-ref-table"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COPY ref_table FROM PROGRAM ''echo 10, 101 && echo 11, 111'' WITH CSV');
|
||||
}
|
||||
|
||||
step "s2-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
step "s2-commit-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
}
|
||||
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-delete-from-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-insert-into-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection"
|
||||
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-from-ref-table" "s1-begin" "s1-alter-table" "s2-commit-worker" "s1-commit" "s2-stop-connection"
|
|
@ -148,6 +148,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-2';
|
|||
ALTER EXTENSION citus UPDATE TO '8.0-3';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '8.0-6';
|
||||
|
||||
-- show running version
|
||||
SHOW citus.version;
|
||||
|
|
Loading…
Reference in New Issue