From 95d59189676f8b790054add89cd6f2925916c13b Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 16 Feb 2022 16:20:16 +0100 Subject: [PATCH] Properly set worker_query and use --- .../test/run_from_same_connection.c | 29 +++++++++++++++ .../distributed/transaction/backend_data.c | 36 ++++++++++++------- .../transaction/citus_dist_stat_activity.c | 7 ++-- src/include/distributed/backend_data.h | 4 ++- .../spec/isolation_mx_common.include.spec | 10 ++++++ 5 files changed, 68 insertions(+), 18 deletions(-) diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index 3b5f804b4..ad39664bf 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -58,6 +58,7 @@ static int64 GetRemoteProcessId(void); PG_FUNCTION_INFO_V1(start_session_level_connection_to_node); PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node); PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node); +PG_FUNCTION_INFO_V1(override_backend_data_command_originator); /* @@ -119,6 +120,17 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) ExecuteCriticalRemoteCommand(singleConnection, setAppName); + /* + * We are hackily overriding the remote processes' worker_query to be false + * such that relevant observibility UDFs work fine. + */ + StringInfo overrideBackendDataCommandOriginator = makeStringInfo(); + appendStringInfo(overrideBackendDataCommandOriginator, + "SELECT override_backend_data_command_originator(true);"); + ExecuteCriticalRemoteCommand(singleConnection, + overrideBackendDataCommandOriginator->data); + + PG_RETURN_VOID(); } @@ -174,6 +186,23 @@ run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS) } +/* + * override_backend_data_command_originator is a wrapper around + * OverrideBackendDataDistributedCommandOriginator(). + */ +Datum +override_backend_data_command_originator(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + bool distributedCommandOriginator = PG_GETARG_BOOL(0); + + OverrideBackendDataDistributedCommandOriginator(distributedCommandOriginator); + + PG_RETURN_VOID(); +} + + /* * stop_session_level_connection_to_node closes the connection opened by the * start_session_level_connection_to_node and set the flag to false which diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 90f7b719a..5ca517199 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -154,7 +154,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS) MyBackendData->citusBackend.initiatorNodeIdentifier = MyBackendData->transactionId.initiatorNodeIdentifier; - MyBackendData->citusBackend.transactionOriginator = false; SpinLockRelease(&MyBackendData->mutex); @@ -412,15 +411,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier; /* - * We prefer to use worker_query instead of transactionOriginator in the user facing - * functions since its more intuitive. Thus, we negate the result before returning. - * - * We prefer to use citusBackend's transactionOriginator field over transactionId's - * field with the same name. The reason is that it also covers backends that are not - * inside a distributed transaction. + * We prefer to use worker_query instead of distributedCommandOriginator in + * the user facing functions since its more intuitive. Thus, + * we negate the result before returning. */ - bool coordinatorOriginatedQuery = - currentBackend->citusBackend.transactionOriginator; + bool distributedCommandOriginator = + currentBackend->distributedCommandOriginator; transactionNumber = currentBackend->transactionId.transactionNumber; TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp; @@ -430,7 +426,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto values[0] = ObjectIdGetDatum(databaseId); values[1] = Int32GetDatum(backendPid); values[2] = Int32GetDatum(initiatorNodeIdentifier); - values[3] = !coordinatorOriginatedQuery; + values[3] = !distributedCommandOriginator; values[4] = UInt64GetDatum(transactionNumber); values[5] = TimestampTzGetDatum(transactionIdTimestamp); values[6] = UInt64GetDatum(currentBackend->globalPID); @@ -665,7 +661,6 @@ UnSetDistributedTransactionId(void) MyBackendData->transactionId.timestamp = 0; MyBackendData->citusBackend.initiatorNodeIdentifier = -1; - MyBackendData->citusBackend.transactionOriginator = false; SpinLockRelease(&MyBackendData->mutex); } @@ -778,7 +773,6 @@ AssignDistributedTransactionId(void) MyBackendData->transactionId.timestamp = currentTimestamp; MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - MyBackendData->citusBackend.transactionOriginator = true; SpinLockRelease(&MyBackendData->mutex); } @@ -800,7 +794,6 @@ MarkCitusInitiatedCoordinatorBackend(void) SpinLockAcquire(&MyBackendData->mutex); MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; - MyBackendData->citusBackend.transactionOriginator = true; SpinLockRelease(&MyBackendData->mutex); } @@ -816,10 +809,12 @@ void AssignGlobalPID(void) { uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID; + bool distributedCommandOriginator = false; if (!IsCitusInternalBackend()) { globalPID = GenerateGlobalPID(); + distributedCommandOriginator = true; } else { @@ -828,6 +823,21 @@ AssignGlobalPID(void) SpinLockAcquire(&MyBackendData->mutex); MyBackendData->globalPID = globalPID; + MyBackendData->distributedCommandOriginator = distributedCommandOriginator; + SpinLockRelease(&MyBackendData->mutex); +} + + +/* + * OverrideBackendDataDistributedCommandOriginator should only be used for isolation testing. + * See how it is used in the relevant functions. + */ +void +OverrideBackendDataDistributedCommandOriginator(bool distributedCommandOriginator) +{ + SpinLockAcquire(&MyBackendData->mutex); + MyBackendData->distributedCommandOriginator = + distributedCommandOriginator; SpinLockRelease(&MyBackendData->mutex); } diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 3fb15295f..ec4f6e8a3 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -157,10 +157,9 @@ FROM \ WHERE \ backend_type = 'client backend' \ AND \ - pg_stat_activity.query NOT ILIKE '%stat_activity%' \ + worker_query = False \ AND \ - pg_stat_activity.application_name NOT SIMILAR TO 'citus_internal gpid=\\d+'; \ -" + pg_stat_activity.query NOT ILIKE '%stat_activity%';" #define CITUS_WORKER_STAT_ACTIVITY_QUERY \ "\ @@ -195,7 +194,7 @@ FROM \ get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \ ON pg_stat_activity.pid = dist_txs.process_id \ WHERE \ - pg_stat_activity.application_name SIMILAR TO 'citus_internal gpid=\\d+' \ + worker_query = True \ AND \ pg_stat_activity.query NOT ILIKE '%stat_activity%';" diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index f01358407..b463b89f5 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -29,7 +29,6 @@ typedef struct CitusInitiatedBackend { int initiatorNodeIdentifier; - bool transactionOriginator; } CitusInitiatedBackend; @@ -51,6 +50,7 @@ typedef struct BackendData slock_t mutex; bool cancelledDueToDeadlock; uint64 globalPID; + bool distributedCommandOriginator; CitusInitiatedBackend citusBackend; DistributedTransactionId transactionId; } BackendData; @@ -67,6 +67,8 @@ extern void AssignDistributedTransactionId(void); extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void); extern uint64 GetGlobalPID(void); +extern void OverrideBackendDataDistributedCommandOriginator(bool + distributedCommandOriginator); extern uint64 ExtractGlobalPID(char *applicationName); extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); diff --git a/src/test/regress/spec/isolation_mx_common.include.spec b/src/test/regress/spec/isolation_mx_common.include.spec index 497d57fc4..4749e334c 100644 --- a/src/test/regress/spec/isolation_mx_common.include.spec +++ b/src/test/regress/spec/isolation_mx_common.include.spec @@ -7,6 +7,16 @@ setup LANGUAGE C STRICT VOLATILE AS 'citus', $$start_session_level_connection_to_node$$; + CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool) + RETURNS void + LANGUAGE C STRICT IMMUTABLE + AS 'citus', $$override_backend_data_command_originator$$; + + SELECT run_command_on_workers($$SET citus.enable_metadata_sync TO off;CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool) + RETURNS void + LANGUAGE C STRICT IMMUTABLE + AS 'citus'$$); + CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text) RETURNS void LANGUAGE C STRICT VOLATILE