From 512d23934f3c99473485c704698812ab40d903d5 Mon Sep 17 00:00:00 2001 From: velioglu Date: Wed, 26 Sep 2018 14:35:39 +0300 Subject: [PATCH] Show router modify,select and real-time queries on MX views --- .../distributed/executor/citus_custom_scan.c | 6 +- .../executor/multi_router_executor.c | 5 + .../distributed/transaction/backend_data.c | 49 +++++- .../transaction/citus_dist_stat_activity.c | 149 +++++++++++++----- src/include/distributed/backend_data.h | 20 +++ .../isolation_citus_dist_activity.out | 13 ++ .../isolation_citus_dist_activity_0.out | 13 ++ .../isolation_distributed_transaction_id.out | 41 ++--- .../specs/isolation_citus_dist_activity.spec | 4 +- .../isolation_distributed_transaction_id.spec | 20 ++- 10 files changed, 231 insertions(+), 89 deletions(-) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index ec3278c52..b660abd9f 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -12,6 +12,7 @@ #include "miscadmin.h" #include "commands/copy.h" +#include "distributed/backend_data.h" #include "distributed/citus_custom_scan.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" @@ -250,12 +251,13 @@ DelayedErrorCreateScan(CustomScan *scan) /* - * CitusSelectBeginScan is an empty function for BeginCustomScan callback. + * CitusSelectBeginScan sets the coordinator backend initiated by Citus for queries using + * that function as the BeginCustomScan callback. */ static void CitusSelectBeginScan(CustomScanState *node, EState *estate, int eflags) { - /* just an empty function */ + MarkCitusInitiatedCoordinatorBackend(); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index aa03caee1..32f62223a 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -26,6 +26,7 @@ #include "access/tupdesc.h" #include "access/xact.h" #include "catalog/pg_type.h" +#include "distributed/backend_data.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" #include "distributed/connection_management.h" @@ -471,6 +472,8 @@ RequiresConsistentSnapshot(Task *task) * * The function also checks the validity of the given custom scan node and * gets locks on the shards involved in the task list of the distributed plan. + * + * It also sets the backend as initiated by Citus. */ void CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) @@ -481,6 +484,8 @@ CitusModifyBeginScan(CustomScanState *node, EState *estate, int eflags) Query *jobQuery = NULL; List *taskList = NIL; + MarkCitusInitiatedCoordinatorBackend(); + /* * We must not change the distributed plan since it may be reused across multiple * executions of a prepared statement. Instead we create a deep copy that we only diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 1f30878b9..7f89b805d 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -65,6 +65,7 @@ typedef struct BackendManagementShmemData static void StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); static void CheckReturnSetInfo(ReturnSetInfo *returnSetInfo); + static shmem_startup_hook_type prev_shmem_startup_hook = NULL; static BackendManagementShmemData *backendManagementShmemData = NULL; static BackendData *MyBackendData = NULL; @@ -124,6 +125,10 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) MyBackendData->transactionId.timestamp = PG_GETARG_TIMESTAMPTZ(2); MyBackendData->transactionId.transactionOriginator = false; + MyBackendData->citusBackend.initiatorNodeIdentifier = + MyBackendData->transactionId.initiatorNodeIdentifier; + MyBackendData->citusBackend.transactionOriginator = false; + SpinLockRelease(&MyBackendData->mutex); PG_RETURN_VOID(); @@ -399,8 +404,8 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto SpinLockAcquire(¤tBackend->mutex); - /* we're only interested in active backends */ - if (currentBackend->transactionId.transactionNumber == 0) + /* we're only interested in backends initiated by Citus */ + if (currentBackend->citusBackend.initiatorNodeIdentifier < 0) { SpinLockRelease(¤tBackend->mutex); continue; @@ -408,13 +413,17 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto values[0] = ObjectIdGetDatum(currentBackend->databaseId); values[1] = Int32GetDatum(ProcGlobal->allProcs[backendIndex].pid); - values[2] = Int32GetDatum(currentBackend->transactionId.initiatorNodeIdentifier); + values[2] = Int32GetDatum(currentBackend->citusBackend.initiatorNodeIdentifier); /* * We prefer to use worker_query instead of transactionOriginator in the user facing * functions since its more intuitive. Thus, we negate the result before returning. + * + * We prefer to use citusBackend's transactionOriginator field over transactionId's + * field with the same name. The reason is that it also covers backends that are not + * inside a distributed transaction. */ - coordinatorOriginatedQuery = currentBackend->transactionId.transactionOriginator; + coordinatorOriginatedQuery = currentBackend->citusBackend.transactionOriginator; values[3] = !coordinatorOriginatedQuery; values[4] = UInt64GetDatum(currentBackend->transactionId.transactionNumber); @@ -544,9 +553,14 @@ BackendManagementShmemInit(void) * starts its execution. Note that we initialize TotalProcs (e.g., not * MaxBackends) since some of the blocking processes could be prepared * transactions, which aren't covered by MaxBackends. + * + * We also initiate initiatorNodeIdentifier to -1, which can never be + * used as a node id. */ for (backendIndex = 0; backendIndex < TotalProcs; ++backendIndex) { + backendManagementShmemData->backends[backendIndex].citusBackend. + initiatorNodeIdentifier = -1; SpinLockInit(&backendManagementShmemData->backends[backendIndex].mutex); } } @@ -633,6 +647,9 @@ UnSetDistributedTransactionId(void) MyBackendData->transactionId.transactionNumber = 0; MyBackendData->transactionId.timestamp = 0; + MyBackendData->citusBackend.initiatorNodeIdentifier = -1; + MyBackendData->citusBackend.transactionOriginator = false; + SpinLockRelease(&MyBackendData->mutex); } } @@ -702,7 +719,7 @@ GetCurrentDistributedTransactionId(void) * processId fields. * * This function should only be called on BeginCoordinatedTransaction(). Any other - * callers is very likely to break the distributed transction management. + * callers is very likely to break the distributed transaction management. */ void AssignDistributedTransactionId(void) @@ -720,10 +737,28 @@ AssignDistributedTransactionId(void) MyBackendData->transactionId.initiatorNodeIdentifier = localGroupId; MyBackendData->transactionId.transactionOriginator = true; - MyBackendData->transactionId.transactionNumber = - nextTransactionNumber; + MyBackendData->transactionId.transactionNumber = nextTransactionNumber; MyBackendData->transactionId.timestamp = currentTimestamp; + MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; + MyBackendData->citusBackend.transactionOriginator = true; + + SpinLockRelease(&MyBackendData->mutex); +} + + +/* + * MarkCitusInitiatedCoordinatorBackend sets that coordinator backend is + * initiated by Citus. + */ +void +MarkCitusInitiatedCoordinatorBackend(void) +{ + SpinLockAcquire(&MyBackendData->mutex); + + MyBackendData->citusBackend.initiatorNodeIdentifier = GetLocalGroupId(); + MyBackendData->citusBackend.transactionOriginator = true; + SpinLockRelease(&MyBackendData->mutex); } diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 9952facda..8bbbd42a2 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -154,12 +154,48 @@ pg_stat_activity.query, \ pg_stat_activity.backend_type \ FROM \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp), \ pg_stat_activity \ + INNER JOIN \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + ON pg_stat_activity.pid = dist_txs.process_id \ WHERE \ - pg_stat_activity.pid = dist_txs.process_id \ - AND \ - dist_txs.worker_query = %s; " + dist_txs.worker_query = false;" + + #define CITUS_WORKER_STAT_ACTIVITY_QUERY \ + "\ + SELECT \ + dist_txs.initiator_node_identifier, \ + dist_txs.transaction_number, \ + dist_txs.transaction_stamp, \ + pg_stat_activity.datid, \ + pg_stat_activity.datname, \ + pg_stat_activity.pid, \ + pg_stat_activity.usesysid, \ + pg_stat_activity.usename, \ + pg_stat_activity.application_name, \ + pg_stat_activity.client_addr, \ + pg_stat_activity.client_hostname, \ + pg_stat_activity.client_port, \ + pg_stat_activity.backend_start, \ + pg_stat_activity.xact_start, \ + pg_stat_activity.query_start, \ + pg_stat_activity.state_change, \ + pg_stat_activity.wait_event_type, \ + pg_stat_activity.wait_event, \ + pg_stat_activity.state, \ + pg_stat_activity.backend_xid, \ + pg_stat_activity.backend_xmin, \ + pg_stat_activity.query, \ + pg_stat_activity.backend_type \ + FROM \ + pg_stat_activity \ + LEFT JOIN \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + ON pg_stat_activity.pid = dist_txs.process_id \ + WHERE \ + pg_stat_activity.application_name = 'citus' \ + AND \ + pg_stat_activity.query NOT ILIKE '%stat_activity%';" #else #define CITUS_DIST_STAT_ACTIVITY_QUERY \ "\ @@ -188,12 +224,48 @@ pg_stat_activity.query, \ null \ FROM \ - get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp), \ pg_stat_activity \ + INNER JOIN \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + ON pg_stat_activity.pid = dist_txs.process_id \ WHERE \ - pg_stat_activity.pid = dist_txs.process_id \ - AND \ - dist_txs.worker_query = %s; " + dist_txs.worker_query = false;" + + #define CITUS_WORKER_STAT_ACTIVITY_QUERY \ + "\ + SELECT \ + dist_txs.initiator_node_identifier, \ + dist_txs.transaction_number, \ + dist_txs.transaction_stamp, \ + pg_stat_activity.datid, \ + pg_stat_activity.datname, \ + pg_stat_activity.pid, \ + pg_stat_activity.usesysid, \ + pg_stat_activity.usename, \ + pg_stat_activity.application_name, \ + pg_stat_activity.client_addr, \ + pg_stat_activity.client_hostname, \ + pg_stat_activity.client_port, \ + pg_stat_activity.backend_start, \ + pg_stat_activity.xact_start, \ + pg_stat_activity.query_start, \ + pg_stat_activity.state_change, \ + pg_stat_activity.wait_event_type, \ + pg_stat_activity.wait_event, \ + pg_stat_activity.state, \ + pg_stat_activity.backend_xid, \ + pg_stat_activity.backend_xmin, \ + pg_stat_activity.query, \ + null \ + FROM \ + pg_stat_activity \ + LEFT JOIN \ + get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp) \ + ON pg_stat_activity.pid = dist_txs.process_id \ + WHERE \ + pg_stat_activity.application_name = 'citus' \ + AND \ + pg_stat_activity.query NOT ILIKE '%stat_activity%';" #endif @@ -232,7 +304,7 @@ typedef struct CitusDistStat /* local forward declarations */ -static List * CitusDistStatActivity(const char *statQuery); +static List * CitusStatActivity(const char *statQuery); static void ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo); static CitusDistStat * ParseCitusDistStat(PGresult *result, int64 rowIndex); @@ -273,17 +345,10 @@ Datum citus_dist_stat_activity(PG_FUNCTION_ARGS) { List *citusDistStatStatements = NIL; - StringInfo citusDistStatQuery = NULL; - const char *workerQuery = "false"; CheckCitusVersion(ERROR); - /* set the workerQuery to false in the query */ - citusDistStatQuery = makeStringInfo(); - appendStringInfo(citusDistStatQuery, CITUS_DIST_STAT_ACTIVITY_QUERY, - workerQuery); - - citusDistStatStatements = CitusDistStatActivity(citusDistStatQuery->data); + citusDistStatStatements = CitusStatActivity(CITUS_DIST_STAT_ACTIVITY_QUERY); ReturnCitusDistStats(citusDistStatStatements, fcinfo); @@ -300,17 +365,10 @@ Datum citus_worker_stat_activity(PG_FUNCTION_ARGS) { List *citusWorkerStatStatements = NIL; - StringInfo cituWorkerStatQuery = NULL; - const char *workerQuery = "true"; CheckCitusVersion(ERROR); - /* set the workerQuery to true in the query */ - cituWorkerStatQuery = makeStringInfo(); - appendStringInfo(cituWorkerStatQuery, CITUS_DIST_STAT_ACTIVITY_QUERY, - workerQuery); - - citusWorkerStatStatements = CitusDistStatActivity(cituWorkerStatQuery->data); + citusWorkerStatStatements = CitusStatActivity(CITUS_WORKER_STAT_ACTIVITY_QUERY); ReturnCitusDistStats(citusWorkerStatStatements, fcinfo); @@ -319,7 +377,7 @@ citus_worker_stat_activity(PG_FUNCTION_ARGS) /* - * CitusDistStatActivity gets the stats query, connects to each node in the + * CitusStatActivity gets the stats query, connects to each node in the * cluster, executes the query and parses the results. The function returns * list of CitusDistStat struct for further processing. * @@ -330,7 +388,7 @@ citus_worker_stat_activity(PG_FUNCTION_ARGS) * executed on the coordinator given that there is not metadata information about that. */ static List * -CitusDistStatActivity(const char *statQuery) +CitusStatActivity(const char *statQuery) { List *citusStatsList = NIL; @@ -521,9 +579,14 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex) * - If the initiator_node_identifier belongs to the coordinator and * we're executing the function on a worker node, manually mark it * as "coordinator_host" given that we cannot know the host and port + * - If the initiator_node_identifier doesn't equal to zero, we know that + * it is a worker query initiated outside of a distributed + * transaction. However, we cannot know which node has initiated + * the worker query. */ - initiator_node_identifier = ParseIntField(result, rowIndex, 0); - if (initiator_node_identifier != 0) + initiator_node_identifier = + PQgetisnull(result, rowIndex, 0) ? -1 : ParseIntField(result, rowIndex, 0); + if (initiator_node_identifier > 0) { bool nodeExists = false; @@ -540,13 +603,14 @@ ParseCitusDistStat(PGresult *result, int64 rowIndex) citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = PostPortNumber; } + else if (initiator_node_identifier == 0) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = 0; + } else { - /* - * We could only get here if the function is called from metadata workers and - * the query is initiated from the coordinator. - */ - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_name = NULL; citusDistStat->master_query_host_port = 0; } @@ -686,9 +750,13 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) * - If the initiator_node_identifier belongs to the coordinator and * we're executing the function on a worker node, manually mark it * as "coordinator_host" given that we cannot know the host and port + * - If the initiator_node_identifier doesn't equal to zero, we know that + * it is a worker query initiated outside of a distributed + * transaction. However, we cannot know which node has initiated + * the worker query. */ initiator_node_identifier = ParseIntFieldFromHeapTuple(result, rowDescriptor, 1); - if (initiator_node_identifier != 0) + if (initiator_node_identifier > 0) { bool nodeExists = false; @@ -705,13 +773,14 @@ HeapTupleToCitusDistStat(HeapTuple result, TupleDesc rowDescriptor) citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); citusDistStat->master_query_host_port = PostPortNumber; } + else if (initiator_node_identifier == 0) + { + citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_port = 0; + } else { - /* - * We could only get here if the function is called from metadata workers and - * the query is initiated from the coordinator. - */ - citusDistStat->master_query_host_name = cstring_to_text(coordinator_host_name); + citusDistStat->master_query_host_name = NULL; citusDistStat->master_query_host_port = 0; } diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index 8a4d64080..4a6eae152 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -25,15 +25,34 @@ #define TotalProcs (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts) +/* + * CitusInitiatedBackend keeps some information about the backends that are + * initiated by Citus. + */ +typedef struct CitusInitiatedBackend +{ + int initiatorNodeIdentifier; + bool transactionOriginator; +} CitusInitiatedBackend; + + /* * Each backend's active distributed transaction information is tracked via * BackendData in shared memory. + * + * DistributedTransactionId already has the same fields that CitusInitiatedBackend + * has. However, we prefer to keep them seperate since CitusInitiatedBackend is a + * broader concept which covers the backends that are not initiated via a distributed + * transaction as well. In other words, we could have backends that + * CitusInitiatedBackend is set but DistributedTransactionId is not set such as an + * "INSERT" query which is not inside a transaction block. */ typedef struct BackendData { Oid databaseId; slock_t mutex; bool cancelledDueToDeadlock; + CitusInitiatedBackend citusBackend; DistributedTransactionId transactionId; } BackendData; @@ -44,6 +63,7 @@ extern void LockBackendSharedMemory(LWLockMode lockMode); extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); extern void AssignDistributedTransactionId(void); +extern void MarkCitusInitiatedCoordinatorBackend(void); extern void GetBackendDataForProc(PGPROC *proc, BackendData *result); extern void CancelTransactionDueToDeadlock(PGPROC *proc); extern bool MyBackendGotCancelledDueToDeadlock(void); diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index c9284c513..1c83fb29a 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -42,6 +42,8 @@ SELECT worker_apply_shard_ddl_command (105831, 'public', ' SELECT worker_apply_shard_ddl_command (105830, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -82,6 +84,8 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression INSERT INTO public.test_table_105836 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -126,6 +130,8 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression COPY (SELECT count(*) AS count FROM test_table_105841 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression COPY (SELECT count(*) AS count FROM test_table_105840 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression COPY (SELECT count(*) AS count FROM test_table_105839 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression @@ -165,11 +171,18 @@ step s2-view-dist: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + SELECT count(*) FROM test_table WHERE column1 = 55; +coordinator_host57636 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s3-view-worker: SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle Client ClientRead postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle Client ClientRead postgres regression +SELECT count(*) AS count FROM public.test_table_105843 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression +COMMIT localhost 57637 0 idle Client ClientRead postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/expected/isolation_citus_dist_activity_0.out b/src/test/regress/expected/isolation_citus_dist_activity_0.out index 66ebd59a6..ad4c30f9e 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity_0.out +++ b/src/test/regress/expected/isolation_citus_dist_activity_0.out @@ -42,6 +42,8 @@ SELECT worker_apply_shard_ddl_command (105295, 'public', ' SELECT worker_apply_shard_ddl_command (105294, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transaction postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle postgres regression step s2-rollback: ROLLBACK; @@ -82,6 +84,8 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle postgres regression INSERT INTO public.test_table_105300 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transaction postgres regression step s2-rollback: ROLLBACK; @@ -126,6 +130,8 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle postgres regression COPY (SELECT count(*) AS count FROM test_table_105305 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transaction postgres regression COPY (SELECT count(*) AS count FROM test_table_105304 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transaction postgres regression COPY (SELECT count(*) AS count FROM test_table_105303 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transaction postgres regression @@ -165,11 +171,18 @@ step s2-view-dist: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname + + SELECT count(*) FROM test_table WHERE column1 = 55; +coordinator_host57636 coordinator_host57636 idle in transaction postgres regression step s3-view-worker: SELECT query, query_hostname, query_hostport, master_query_host_name, master_query_host_port, state, wait_event_type, wait_event, usename, datname FROM citus_worker_stat_activity ORDER BY query DESC; query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57638 0 idle postgres regression +SELECT gid FROM pg_prepared_xacts WHERE gid LIKE 'citus\_0\_%'localhost 57637 0 idle postgres regression +SELECT count(*) AS count FROM public.test_table_105307 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle postgres regression +COMMIT localhost 57637 0 idle postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 17c71b9ed..f60650315 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -1,6 +1,6 @@ -Parsed test spec with 4 sessions +Parsed test spec with 3 sessions -starting permutation: s1-begin s1-assign-transaction-id s4-get-all-transactions s2-begin s2-assign-transaction-id s4-get-all-transactions s3-begin s3-assign-transaction-id s4-get-all-transactions s1-commit s4-get-all-transactions s2-commit s4-get-all-transactions s3-commit s4-get-all-transactions +starting permutation: s1-begin s1-assign-transaction-id s1-get-all-transactions s2-begin s2-assign-transaction-id s2-get-all-transactions s3-begin s3-assign-transaction-id s3-get-all-transactions s1-commit s2-commit s3-commit step s1-begin: BEGIN; @@ -10,8 +10,8 @@ step s1-assign-transaction-id: assign_distributed_transaction_id -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; +step s1-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifiertransaction_numbertransaction_stamp @@ -25,12 +25,11 @@ step s2-assign-transaction-id: assign_distributed_transaction_id -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; +step s2-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifiertransaction_numbertransaction_stamp -1 1 Wed Dec 31 16:00:00 2014 PST 2 2 Thu Jan 01 16:00:00 2015 PST step s3-begin: BEGIN; @@ -41,41 +40,21 @@ step s3-assign-transaction-id: assign_distributed_transaction_id -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; +step s3-get-all-transactions: + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; initiator_node_identifiertransaction_numbertransaction_stamp -1 1 Wed Dec 31 16:00:00 2014 PST -2 2 Thu Jan 01 16:00:00 2015 PST 3 3 Fri Jan 02 16:00:00 2015 PST step s1-commit: COMMIT; -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; - -initiator_node_identifiertransaction_numbertransaction_stamp - -2 2 Thu Jan 01 16:00:00 2015 PST -3 3 Fri Jan 02 16:00:00 2015 PST step s2-commit: COMMIT; -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; - -initiator_node_identifiertransaction_numbertransaction_stamp - -3 3 Fri Jan 02 16:00:00 2015 PST step s3-commit: COMMIT; -step s4-get-all-transactions: - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; - -initiator_node_identifiertransaction_numbertransaction_stamp - starting permutation: s1-create-table s1-begin s1-insert s1-get-current-transaction-id s2-get-first-worker-active-transactions s1-commit step s1-create-table: @@ -102,14 +81,14 @@ row step s2-get-first-worker-active-transactions: SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM - get_all_active_transactions(); + get_current_transaction_id(); ') WHERE nodeport = 57637; ; nodename nodeport success result -localhost 57637 t (0,299) +localhost 57637 t (0,0) step s1-commit: COMMIT; diff --git a/src/test/regress/specs/isolation_citus_dist_activity.spec b/src/test/regress/specs/isolation_citus_dist_activity.spec index 4a3edf0de..20040d87d 100644 --- a/src/test/regress/specs/isolation_citus_dist_activity.spec +++ b/src/test/regress/specs/isolation_citus_dist_activity.spec @@ -82,7 +82,5 @@ step "s3-view-worker" permutation "s1-begin" "s2-begin" "s3-begin" "s1-alter-table" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" permutation "s1-begin" "s2-begin" "s3-begin" "s1-insert" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" -permutation "s1-begin" "s2-begin" "s3-begin" "s1-select" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" - -# router selects don't show up because BEGIN is not sent for performance reasons +permutation "s1-begin" "s2-begin" "s3-begin" "s1-select" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" permutation "s1-begin" "s2-begin" "s3-begin" "s1-select-router" "s2-view-dist" "s3-view-worker" "s2-rollback" "s1-commit" "s3-rollback" diff --git a/src/test/regress/specs/isolation_distributed_transaction_id.spec b/src/test/regress/specs/isolation_distributed_transaction_id.spec index 7625d3266..4379592a9 100644 --- a/src/test/regress/specs/isolation_distributed_transaction_id.spec +++ b/src/test/regress/specs/isolation_distributed_transaction_id.spec @@ -50,6 +50,11 @@ step "s1-get-current-transaction-id" SELECT row(initiator_node_identifier, transaction_number) FROM get_current_transaction_id(); } +step "s1-get-all-transactions" +{ + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; +} + session "s2" step "s2-begin" @@ -77,12 +82,17 @@ step "s2-get-first-worker-active-transactions" { SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM - get_all_active_transactions(); + get_current_transaction_id(); ') WHERE nodeport = 57637; ; } +step "s2-get-all-transactions" +{ + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; +} + session "s3" step "s3-begin" @@ -100,15 +110,13 @@ step "s3-commit" COMMIT; } -session "s4" - -step "s4-get-all-transactions" +step "s3-get-all-transactions" { - SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_all_active_transactions() ORDER BY 1,2,3; + SELECT initiator_node_identifier, transaction_number, transaction_stamp FROM get_current_transaction_id() ORDER BY 1,2,3; } # show that we could get all distributed transaction ids from seperate sessions -permutation "s1-begin" "s1-assign-transaction-id" "s4-get-all-transactions" "s2-begin" "s2-assign-transaction-id" "s4-get-all-transactions" "s3-begin" "s3-assign-transaction-id" "s4-get-all-transactions" "s1-commit" "s4-get-all-transactions" "s2-commit" "s4-get-all-transactions" "s3-commit" "s4-get-all-transactions" +permutation "s1-begin" "s1-assign-transaction-id" "s1-get-all-transactions" "s2-begin" "s2-assign-transaction-id" "s2-get-all-transactions" "s3-begin" "s3-assign-transaction-id" "s3-get-all-transactions" "s1-commit" "s2-commit" "s3-commit" # now show that distributed transaction id on the coordinator