Properly set worker_query and use

pull/5716/head
Onder Kalaci 2022-02-16 16:20:16 +01:00
parent 0dd2ddaa70
commit 95d5918967
5 changed files with 68 additions and 18 deletions

View File

@ -58,6 +58,7 @@ static int64 GetRemoteProcessId(void);
PG_FUNCTION_INFO_V1(start_session_level_connection_to_node); PG_FUNCTION_INFO_V1(start_session_level_connection_to_node);
PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node); PG_FUNCTION_INFO_V1(run_commands_on_session_level_connection_to_node);
PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node); PG_FUNCTION_INFO_V1(stop_session_level_connection_to_node);
PG_FUNCTION_INFO_V1(override_backend_data_command_originator);
/* /*
@ -119,6 +120,17 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS)
ExecuteCriticalRemoteCommand(singleConnection, setAppName); ExecuteCriticalRemoteCommand(singleConnection, setAppName);
/*
* We are hackily overriding the remote processes' worker_query to be false
* such that relevant observibility UDFs work fine.
*/
StringInfo overrideBackendDataCommandOriginator = makeStringInfo();
appendStringInfo(overrideBackendDataCommandOriginator,
"SELECT override_backend_data_command_originator(true);");
ExecuteCriticalRemoteCommand(singleConnection,
overrideBackendDataCommandOriginator->data);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -174,6 +186,23 @@ run_commands_on_session_level_connection_to_node(PG_FUNCTION_ARGS)
} }
/*
* override_backend_data_command_originator is a wrapper around
* OverrideBackendDataDistributedCommandOriginator().
*/
Datum
override_backend_data_command_originator(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
bool distributedCommandOriginator = PG_GETARG_BOOL(0);
OverrideBackendDataDistributedCommandOriginator(distributedCommandOriginator);
PG_RETURN_VOID();
}
/* /*
* stop_session_level_connection_to_node closes the connection opened by the * stop_session_level_connection_to_node closes the connection opened by the
* start_session_level_connection_to_node and set the flag to false which * start_session_level_connection_to_node and set the flag to false which

View File

@ -154,7 +154,6 @@ assign_distributed_transaction_id(PG_FUNCTION_ARGS)
MyBackendData->citusBackend.initiatorNodeIdentifier = MyBackendData->citusBackend.initiatorNodeIdentifier =
MyBackendData->transactionId.initiatorNodeIdentifier; MyBackendData->transactionId.initiatorNodeIdentifier;
MyBackendData->citusBackend.transactionOriginator = false;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
@ -412,15 +411,12 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier; initiatorNodeIdentifier = currentBackend->citusBackend.initiatorNodeIdentifier;
/* /*
* We prefer to use worker_query instead of transactionOriginator in the user facing * We prefer to use worker_query instead of distributedCommandOriginator in
* functions since its more intuitive. Thus, we negate the result before returning. * the user facing functions since its more intuitive. Thus,
* * we negate the result before returning.
* We prefer to use citusBackend's transactionOriginator field over transactionId's
* field with the same name. The reason is that it also covers backends that are not
* inside a distributed transaction.
*/ */
bool coordinatorOriginatedQuery = bool distributedCommandOriginator =
currentBackend->citusBackend.transactionOriginator; currentBackend->distributedCommandOriginator;
transactionNumber = currentBackend->transactionId.transactionNumber; transactionNumber = currentBackend->transactionId.transactionNumber;
TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp; TimestampTz transactionIdTimestamp = currentBackend->transactionId.timestamp;
@ -430,7 +426,7 @@ StoreAllActiveTransactions(Tuplestorestate *tupleStore, TupleDesc tupleDescripto
values[0] = ObjectIdGetDatum(databaseId); values[0] = ObjectIdGetDatum(databaseId);
values[1] = Int32GetDatum(backendPid); values[1] = Int32GetDatum(backendPid);
values[2] = Int32GetDatum(initiatorNodeIdentifier); values[2] = Int32GetDatum(initiatorNodeIdentifier);
values[3] = !coordinatorOriginatedQuery; values[3] = !distributedCommandOriginator;
values[4] = UInt64GetDatum(transactionNumber); values[4] = UInt64GetDatum(transactionNumber);
values[5] = TimestampTzGetDatum(transactionIdTimestamp); values[5] = TimestampTzGetDatum(transactionIdTimestamp);
values[6] = UInt64GetDatum(currentBackend->globalPID); values[6] = UInt64GetDatum(currentBackend->globalPID);
@ -665,7 +661,6 @@ UnSetDistributedTransactionId(void)
MyBackendData->transactionId.timestamp = 0; MyBackendData->transactionId.timestamp = 0;
MyBackendData->citusBackend.initiatorNodeIdentifier = -1; MyBackendData->citusBackend.initiatorNodeIdentifier = -1;
MyBackendData->citusBackend.transactionOriginator = false;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }
@ -778,7 +773,6 @@ AssignDistributedTransactionId(void)
MyBackendData->transactionId.timestamp = currentTimestamp; MyBackendData->transactionId.timestamp = currentTimestamp;
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
MyBackendData->citusBackend.transactionOriginator = true;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }
@ -800,7 +794,6 @@ MarkCitusInitiatedCoordinatorBackend(void)
SpinLockAcquire(&MyBackendData->mutex); SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId; MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
MyBackendData->citusBackend.transactionOriginator = true;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }
@ -816,10 +809,12 @@ void
AssignGlobalPID(void) AssignGlobalPID(void)
{ {
uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID; uint64 globalPID = INVALID_CITUS_INTERNAL_BACKEND_GPID;
bool distributedCommandOriginator = false;
if (!IsCitusInternalBackend()) if (!IsCitusInternalBackend())
{ {
globalPID = GenerateGlobalPID(); globalPID = GenerateGlobalPID();
distributedCommandOriginator = true;
} }
else else
{ {
@ -828,6 +823,21 @@ AssignGlobalPID(void)
SpinLockAcquire(&MyBackendData->mutex); SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->globalPID = globalPID; MyBackendData->globalPID = globalPID;
MyBackendData->distributedCommandOriginator = distributedCommandOriginator;
SpinLockRelease(&MyBackendData->mutex);
}
/*
* OverrideBackendDataDistributedCommandOriginator should only be used for isolation testing.
* See how it is used in the relevant functions.
*/
void
OverrideBackendDataDistributedCommandOriginator(bool distributedCommandOriginator)
{
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->distributedCommandOriginator =
distributedCommandOriginator;
SpinLockRelease(&MyBackendData->mutex); SpinLockRelease(&MyBackendData->mutex);
} }

View File

@ -157,10 +157,9 @@ FROM \
WHERE \ WHERE \
backend_type = 'client backend' \ backend_type = 'client backend' \
AND \ AND \
pg_stat_activity.query NOT ILIKE '%stat_activity%' \ worker_query = False \
AND \ AND \
pg_stat_activity.application_name NOT SIMILAR TO 'citus_internal gpid=\\d+'; \ pg_stat_activity.query NOT ILIKE '%stat_activity%';"
"
#define CITUS_WORKER_STAT_ACTIVITY_QUERY \ #define CITUS_WORKER_STAT_ACTIVITY_QUERY \
"\ "\
@ -195,7 +194,7 @@ FROM \
get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \ get_all_active_transactions() AS dist_txs(database_id, process_id, initiator_node_identifier, worker_query, transaction_number, transaction_stamp, global_id) \
ON pg_stat_activity.pid = dist_txs.process_id \ ON pg_stat_activity.pid = dist_txs.process_id \
WHERE \ WHERE \
pg_stat_activity.application_name SIMILAR TO 'citus_internal gpid=\\d+' \ worker_query = True \
AND \ AND \
pg_stat_activity.query NOT ILIKE '%stat_activity%';" pg_stat_activity.query NOT ILIKE '%stat_activity%';"

View File

@ -29,7 +29,6 @@
typedef struct CitusInitiatedBackend typedef struct CitusInitiatedBackend
{ {
int initiatorNodeIdentifier; int initiatorNodeIdentifier;
bool transactionOriginator;
} CitusInitiatedBackend; } CitusInitiatedBackend;
@ -51,6 +50,7 @@ typedef struct BackendData
slock_t mutex; slock_t mutex;
bool cancelledDueToDeadlock; bool cancelledDueToDeadlock;
uint64 globalPID; uint64 globalPID;
bool distributedCommandOriginator;
CitusInitiatedBackend citusBackend; CitusInitiatedBackend citusBackend;
DistributedTransactionId transactionId; DistributedTransactionId transactionId;
} BackendData; } BackendData;
@ -67,6 +67,8 @@ extern void AssignDistributedTransactionId(void);
extern void MarkCitusInitiatedCoordinatorBackend(void); extern void MarkCitusInitiatedCoordinatorBackend(void);
extern void AssignGlobalPID(void); extern void AssignGlobalPID(void);
extern uint64 GetGlobalPID(void); extern uint64 GetGlobalPID(void);
extern void OverrideBackendDataDistributedCommandOriginator(bool
distributedCommandOriginator);
extern uint64 ExtractGlobalPID(char *applicationName); extern uint64 ExtractGlobalPID(char *applicationName);
extern int ExtractNodeIdFromGlobalPID(uint64 globalPID); extern int ExtractNodeIdFromGlobalPID(uint64 globalPID);
extern int ExtractProcessIdFromGlobalPID(uint64 globalPID); extern int ExtractProcessIdFromGlobalPID(uint64 globalPID);

View File

@ -7,6 +7,16 @@ setup
LANGUAGE C STRICT VOLATILE LANGUAGE C STRICT VOLATILE
AS 'citus', $$start_session_level_connection_to_node$$; AS 'citus', $$start_session_level_connection_to_node$$;
CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool)
RETURNS void
LANGUAGE C STRICT IMMUTABLE
AS 'citus', $$override_backend_data_command_originator$$;
SELECT run_command_on_workers($$SET citus.enable_metadata_sync TO off;CREATE OR REPLACE FUNCTION override_backend_data_command_originator(bool)
RETURNS void
LANGUAGE C STRICT IMMUTABLE
AS 'citus'$$);
CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text) CREATE OR REPLACE FUNCTION run_commands_on_session_level_connection_to_node(text)
RETURNS void RETURNS void
LANGUAGE C STRICT VOLATILE LANGUAGE C STRICT VOLATILE