mirror of https://github.com/citusdata/citus.git
Merge pull request #4634 from citusdata/citus_grep_command
Grep Remote/Local commandspull/5545/head
commit
695653911a
|
@ -22,6 +22,8 @@
|
|||
#include "lib/stringinfo.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/latch.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgrprotos.h"
|
||||
#include "utils/palloc.h"
|
||||
|
||||
|
||||
|
@ -34,6 +36,7 @@ int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
|
|||
|
||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||
bool LogRemoteCommands = false;
|
||||
char *GrepRemoteCommands = "";
|
||||
|
||||
|
||||
static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
|
||||
|
@ -328,7 +331,6 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
|
|||
|
||||
/* *INDENT-ON* */
|
||||
|
||||
|
||||
/*
|
||||
* LogRemoteCommand logs commands send to remote nodes if
|
||||
* citus.log_remote_commands wants us to do so.
|
||||
|
@ -341,6 +343,11 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
|
|||
return;
|
||||
}
|
||||
|
||||
if (!CommandMatchesLogGrepPattern(command))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
|
||||
errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
|
||||
connection->hostname,
|
||||
|
@ -348,6 +355,29 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* CommandMatchesLogGrepPattern returns true of the input command matches
|
||||
* the pattern specified by citus.grep_remote_commands.
|
||||
*
|
||||
* If citus.grep_remote_commands set to an empty string, all commands are
|
||||
* considered as a match.
|
||||
*/
|
||||
bool
|
||||
CommandMatchesLogGrepPattern(const char *command)
|
||||
{
|
||||
if (GrepRemoteCommands && strnlen(GrepRemoteCommands, NAMEDATALEN) > 0)
|
||||
{
|
||||
Datum boolDatum =
|
||||
DirectFunctionCall2(textlike, CStringGetTextDatum(command),
|
||||
CStringGetTextDatum(GrepRemoteCommands));
|
||||
|
||||
return DatumGetBool(boolDatum);
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/* wrappers around libpq functions, with command logging support */
|
||||
|
||||
|
||||
|
|
|
@ -513,8 +513,14 @@ LogLocalCommand(Task *task)
|
|||
return;
|
||||
}
|
||||
|
||||
const char *command = TaskQueryString(task);
|
||||
if (!CommandMatchesLogGrepPattern(command))
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("executing the command locally: %s",
|
||||
ApplyLogRedaction(TaskQueryString(task)))));
|
||||
ApplyLogRedaction(command))));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1088,6 +1088,18 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"citus.grep_remote_commands",
|
||||
gettext_noop(
|
||||
"Applies \"command\" like citus.grep_remote_commands, if returns "
|
||||
"true, the command is logged."),
|
||||
NULL,
|
||||
&GrepRemoteCommands,
|
||||
"",
|
||||
PGC_USERSET,
|
||||
GUC_NO_SHOW_ALL,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.isolation_test_session_process_id",
|
||||
NULL,
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
/* GUC, determining whether statements sent to remote nodes are logged */
|
||||
extern bool LogRemoteCommands;
|
||||
extern char *GrepRemoteCommands;
|
||||
|
||||
/* GUC that determines the number of bytes after which remote COPY is flushed */
|
||||
extern int RemoteCopyFlushThreshold;
|
||||
|
@ -38,6 +39,7 @@ extern void ReportResultError(MultiConnection *connection, PGresult *result,
|
|||
int elevel);
|
||||
extern char * pchomp(const char *in);
|
||||
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
|
||||
extern bool CommandMatchesLogGrepPattern(const char *command);
|
||||
|
||||
/* wrappers around libpq functions, with command logging support */
|
||||
extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection,
|
||||
|
|
|
@ -267,6 +267,68 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER
|
|||
5 | 6
|
||||
(5 rows)
|
||||
|
||||
-- show that we can filter remote commands
|
||||
-- given that citus.grep_remote_commands, we log all commands
|
||||
SET citus.log_local_commands to true;
|
||||
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- grep matches all commands
|
||||
SET citus.grep_remote_commands TO "%%";
|
||||
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- only filter a specific shard for the local execution
|
||||
BEGIN;
|
||||
SET LOCAL citus.grep_remote_commands TO "%90630515%";
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- match nothing
|
||||
SET LOCAL citus.grep_remote_commands TO '%nothing%';
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- only filter a specific shard for the remote execution
|
||||
BEGIN;
|
||||
SET LOCAL citus.enable_local_execution TO FALSE;
|
||||
SET LOCAL citus.grep_remote_commands TO '%90630515%';
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
NOTICE: issuing SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true
|
||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
-- match nothing
|
||||
SET LOCAL citus.grep_remote_commands TO '%nothing%';
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
RESET citus.log_local_commands;
|
||||
RESET citus.grep_remote_commands;
|
||||
-- Test upsert with constraint
|
||||
CREATE TABLE upsert_test
|
||||
(
|
||||
|
|
|
@ -110,6 +110,38 @@ SELECT * FROM test ORDER BY x;
|
|||
UPDATE test SET y = y + 1 RETURNING *;
|
||||
WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2;
|
||||
|
||||
-- show that we can filter remote commands
|
||||
-- given that citus.grep_remote_commands, we log all commands
|
||||
SET citus.log_local_commands to true;
|
||||
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
|
||||
|
||||
-- grep matches all commands
|
||||
SET citus.grep_remote_commands TO "%%";
|
||||
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
|
||||
|
||||
-- only filter a specific shard for the local execution
|
||||
BEGIN;
|
||||
SET LOCAL citus.grep_remote_commands TO "%90630515%";
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
-- match nothing
|
||||
SET LOCAL citus.grep_remote_commands TO '%nothing%';
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
COMMIT;
|
||||
|
||||
-- only filter a specific shard for the remote execution
|
||||
BEGIN;
|
||||
SET LOCAL citus.enable_local_execution TO FALSE;
|
||||
SET LOCAL citus.grep_remote_commands TO '%90630515%';
|
||||
SET LOCAL citus.log_remote_commands TO ON;
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
-- match nothing
|
||||
SET LOCAL citus.grep_remote_commands TO '%nothing%';
|
||||
SELECT count(*) FROM public.another_schema_table;
|
||||
COMMIT;
|
||||
|
||||
RESET citus.log_local_commands;
|
||||
RESET citus.grep_remote_commands;
|
||||
|
||||
-- Test upsert with constraint
|
||||
CREATE TABLE upsert_test
|
||||
(
|
||||
|
@ -1023,6 +1055,8 @@ ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;
|
|||
ALTER SYSTEM RESET citus.local_shared_pool_size;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
|
||||
|
||||
-- suppress notices
|
||||
SET client_min_messages TO error;
|
||||
|
||||
|
|
Loading…
Reference in New Issue