Multi tenant monitoring (#6725)

DESCRIPTION: Adds views that monitor statistics on tenant usages

This PR adds `citus_stats_tenants` view that monitors the tenants on the
cluster.

`citus_stats_tenants` shows the node id, colocation id, tenant
attribute, read count in this period and last period, and query count in
this period and last period of the tenant.
Tenant attribute currently is the tenant's distribution column value,
later when schema based sharding is introduced, this meaning might
change.
A period is a time bucket the queries are counted by. Read and query
counts for this period can increase until the current period ends. After
that those counts are moved to last period's counts, which cannot
change. The period length can be set using 'citus.stats_tenants_period'.

`SELECT` queries are counted as _read_ queries, `INSERT`, `UPDATE` and
`DELETE` queries are counted as _write_ queries. So in the view read
counts are `SELECT` counts and query counts are `SELECT`, `INSERT`,
`UPDATE` and `DELETE` count.

The data is stored in shared memory, in a struct named
`MultiTenantMonitor`.

`citus_stats_tenants` shows the data from local tenants.

`citus_stats_tenants` show up to `citus.stats_tenant_limit` number of
tenants.
The tenants are scored based on the number of queries they run and the
recency of those queries. Every query ran increases the score of tenant
by `ONE_QUERY_SCORE`, and after every period ends the scores are halved.
Halving is done lazily.
To retain information a longer the monitor keeps up to 3 times
`citus.stats_tenant_limit` tenants. When the tenant count hits `3 *
citus.stats_tenant_limit`, last `citus.stats_tenant_limit` tenants are
removed. To see all stored tenants you can use
`citus_stats_tenants(return_all_tenants := true)`

- [x] Create collector view that gets data from all nodes. #6761 
- [x] Add monitoring log #6762 
- [x] Create enable/disable GUC #6769 
- [x] Parse the annotation string correctly #6796 
- [x] Add local queries and prepared statements #6797
- [x] Rename to citus_stat_statements #6821 
- [x] Run pgbench
- [x] Fix role permissions #6812

---------

Co-authored-by: Gokhan Gulbiz <ggulbiz@gmail.com>
Co-authored-by: Jelte Fennema <github-tech@jeltef.nl>
pull/6824/head
Halil Ozan Akgül 2023-04-05 17:44:17 +03:00 committed by GitHub
parent d04d32b314
commit 52ad2d08c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
43 changed files with 2332 additions and 56 deletions

View File

@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache(
DistributedPlan *originalDistributedPlan);
static void CitusEndScan(CustomScanState *node);
static void CitusReScan(CustomScanState *node);
static void SetJobColocationId(Job *job);
static void EnsureForceDelegationDistributionKey(Job *job);
static void EnsureAnchorShardsInJobExist(Job *job);
static bool AnchorShardsInTaskListExist(List *taskList);
@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan)
* colocation group, the Job's colocation ID is set to the group ID, else,
* it will be set to INVALID_COLOCATION_ID.
*/
static void
void
SetJobColocationId(Job *job)
{
uint32 jobColocationId = INVALID_COLOCATION_ID;

View File

@ -97,6 +97,7 @@
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" /* to access LogRemoteCommands */
#include "distributed/transaction_management.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "executor/tstoreReceiver.h"
@ -647,6 +648,16 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString,
LocalExecutorShardId = task->anchorShardId;
}
char *partitionKeyValueString = NULL;
if (task->partitionKeyValue != NULL)
{
partitionKeyValueString = DatumToString(task->partitionKeyValue->constvalue,
task->partitionKeyValue->consttype);
}
AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType);
PG_TRY();
{
processedRows = ExecuteTaskPlan(taskPlan, queryString, tupleDest, task,

View File

@ -26,6 +26,7 @@
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/shard_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "nodes/makefuncs.h"
@ -141,6 +142,10 @@ RebuildQueryStrings(Job *workerJob)
? "(null)"
: TaskQueryString(task))));
task->partitionKeyValue = workerJob->partitionKeyValue;
SetJobColocationId(workerJob);
task->colocationId = workerJob->colocationId;
UpdateTaskQueryString(query, task);
/*
@ -387,7 +392,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
return;
}
SetTaskQueryString(task, DeparseTaskQuery(task, query));
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
task->partitionKeyValue, task->colocationId));
}

View File

@ -49,6 +49,7 @@
#include "distributed/recursive_planning.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/version_compat.h"
#include "distributed/worker_shard_visibility.h"
#include "executor/executor.h"
@ -305,6 +306,11 @@ distributed_planner(Query *parse,
errhint("Consider using PL/pgSQL functions instead.")));
}
/*
* We annotate the query for tenant statisisics.
*/
AttributeQueryIfAnnotated(query_string, parse->commandType);
return result;
}

View File

@ -164,7 +164,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
static List * SingleShardTaskList(Query *query, uint64 jobId,
List *relationShardList, List *placementList,
uint64 shardId, bool parametersInQueryResolved,
bool isLocalTableModification);
bool isLocalTableModification, Const *partitionKeyValue,
int colocationId);
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
TaskAssignmentPolicyType
@ -1939,11 +1940,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
if (originalQuery->commandType == CMD_SELECT)
{
SetJobColocationId(job);
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
job->partitionKeyValue, job->colocationId);
/*
* Queries to reference tables, or distributed tables with multiple replica's have
@ -1967,11 +1971,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
}
else
{
SetJobColocationId(job);
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification);
isLocalTableModification,
job->partitionKeyValue, job->colocationId);
}
}
@ -2065,7 +2072,8 @@ static List *
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
List *placementList, uint64 shardId,
bool parametersInQueryResolved,
bool isLocalTableModification)
bool isLocalTableModification, Const *partitionKeyValue,
int colocationId)
{
TaskType taskType = READ_TASK;
char replicationModel = 0;
@ -2135,6 +2143,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
* that the query cannot be executed locally.
*/
task->taskPlacementList = placementList;
task->partitionKeyValue = partitionKeyValue;
task->colocationId = colocationId;
SetTaskQueryIfShouldLazyDeparse(task, query);
task->anchorShardId = shardId;
task->jobId = jobId;

View File

@ -91,6 +91,7 @@
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/transaction_recovery.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "distributed/utils/directory.h"
#include "distributed/worker_log_messages.h"
#include "distributed/worker_manager.h"
@ -228,6 +229,12 @@ static const struct config_enum_entry stat_statements_track_options[] = {
{ NULL, 0, false }
};
static const struct config_enum_entry stat_tenants_track_options[] = {
{ "none", STAT_TENANTS_TRACK_NONE, false },
{ "all", STAT_TENANTS_TRACK_ALL, false },
{ NULL, 0, false }
};
static const struct config_enum_entry task_assignment_policy_options[] = {
{ "greedy", TASK_ASSIGNMENT_GREEDY, false },
{ "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false },
@ -447,6 +454,8 @@ _PG_init(void)
ExecutorStart_hook = CitusExecutorStart;
ExecutorRun_hook = CitusExecutorRun;
ExplainOneQuery_hook = CitusExplainOneQuery;
prev_ExecutorEnd = ExecutorEnd_hook;
ExecutorEnd_hook = CitusAttributeToEnd;
/* register hook for error messages */
emit_log_hook = multi_log_hook;
@ -491,6 +500,8 @@ _PG_init(void)
/* initialize shard split shared memory handle management */
InitializeShardSplitSMHandleManagement();
InitializeMultiTenantMonitorSMHandleManagement();
/* enable modification of pg_catalog tables during pg_upgrade */
if (IsBinaryUpgrade)
{
@ -2363,6 +2374,50 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.stat_tenants_limit",
gettext_noop("Number of tenants to be shown in citus_stat_tenants."),
NULL,
&StatTenantsLimit,
100, 1, 10000,
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.stat_tenants_log_level",
gettext_noop("Sets the level of citus_stat_tenants log messages"),
NULL,
&StatTenantsLogLevel,
CITUS_LOG_LEVEL_OFF, log_level_options,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.stat_tenants_period",
gettext_noop("Period in seconds to be used for calculating the tenant "
"statistics in citus_stat_tenants."),
NULL,
&StatTenantsPeriod,
60, 1, 60 * 60 * 24,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.stat_tenants_track",
gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."),
gettext_noop("Enables the stats collection when set to 'all'. "
"Disables when set to 'none'. Disabling can be useful for "
"avoiding extra CPU cycles needed for the calculations."),
&StatTenantsTrack,
STAT_TENANTS_TRACK_NONE,
stat_tenants_track_options,
PGC_SUSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.subquery_pushdown",
gettext_noop("Usage of this GUC is highly discouraged, please read the long "

View File

@ -10,3 +10,8 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_
#include "udfs/worker_drop_all_shell_tables/11.3-1.sql"
#include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql"
#include "udfs/citus_stat_tenants_local/11.3-1.sql"
#include "udfs/citus_stat_tenants/11.3-1.sql"
#include "udfs/citus_stat_tenants_local_reset/11.3-1.sql"
#include "udfs/citus_stat_tenants_reset/11.3-1.sql"

View File

@ -20,3 +20,12 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;
DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool);
DROP FUNCTION pg_catalog.citus_internal_mark_node_not_synced(int, int);
DROP VIEW pg_catalog.citus_stat_tenants_local;
DROP FUNCTION pg_catalog.citus_stat_tenants_local(boolean);
DROP VIEW pg_catalog.citus_stat_tenants;
DROP FUNCTION pg_catalog.citus_stat_tenants(boolean);
DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset();
DROP FUNCTION pg_catalog.citus_stat_tenants_reset();

View File

@ -0,0 +1,78 @@
-- cts in the query is an abbreviation for citus_stat_tenants
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants (
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT nodeid INT,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT score BIGINT
)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
BEGIN
IF
array_position(enumvals, 'log') >= array_position(enumvals, setting)
AND setting != 'off'
FROM pg_settings
WHERE name = 'citus.stat_tenants_log_level'
THEN
RAISE LOG 'Generating citus_stat_tenants';
END IF;
RETURN QUERY
SELECT *
FROM jsonb_to_recordset((
SELECT
jsonb_agg(all_cst_rows_as_jsonb.cst_row_as_jsonb)::jsonb
FROM (
SELECT
jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb ||
('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS cst_row_as_jsonb
FROM
run_command_on_all_nodes (
$$
SELECT
coalesce(to_jsonb (array_agg(cstl.*)), '[]'::jsonb)
FROM citus_stat_tenants_local($$||return_all_tenants||$$) cstl;
$$,
parallel:= TRUE,
give_warning_for_connection_errors:= TRUE)
WHERE
success = 't')
AS all_cst_rows_as_jsonb))
AS (
nodeid INT,
colocation_id INT,
tenant_attribute TEXT,
read_count_in_this_period INT,
read_count_in_last_period INT,
query_count_in_this_period INT,
query_count_in_last_period INT,
score BIGINT
)
ORDER BY score DESC
LIMIT CASE WHEN NOT return_all_tenants THEN current_setting('citus.stat_tenants_limit')::BIGINT END;
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stat_tenants AS
SELECT
nodeid,
colocation_id,
tenant_attribute,
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
FROM pg_catalog.citus_stat_tenants(FALSE);
ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog;
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) TO pg_monitor;
REVOKE ALL ON pg_catalog.citus_stat_tenants FROM PUBLIC;
GRANT SELECT ON pg_catalog.citus_stat_tenants TO pg_monitor;

View File

@ -0,0 +1,78 @@
-- cts in the query is an abbreviation for citus_stat_tenants
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants (
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT nodeid INT,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT score BIGINT
)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
BEGIN
IF
array_position(enumvals, 'log') >= array_position(enumvals, setting)
AND setting != 'off'
FROM pg_settings
WHERE name = 'citus.stat_tenants_log_level'
THEN
RAISE LOG 'Generating citus_stat_tenants';
END IF;
RETURN QUERY
SELECT *
FROM jsonb_to_recordset((
SELECT
jsonb_agg(all_cst_rows_as_jsonb.cst_row_as_jsonb)::jsonb
FROM (
SELECT
jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb ||
('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS cst_row_as_jsonb
FROM
run_command_on_all_nodes (
$$
SELECT
coalesce(to_jsonb (array_agg(cstl.*)), '[]'::jsonb)
FROM citus_stat_tenants_local($$||return_all_tenants||$$) cstl;
$$,
parallel:= TRUE,
give_warning_for_connection_errors:= TRUE)
WHERE
success = 't')
AS all_cst_rows_as_jsonb))
AS (
nodeid INT,
colocation_id INT,
tenant_attribute TEXT,
read_count_in_this_period INT,
read_count_in_last_period INT,
query_count_in_this_period INT,
query_count_in_last_period INT,
score BIGINT
)
ORDER BY score DESC
LIMIT CASE WHEN NOT return_all_tenants THEN current_setting('citus.stat_tenants_limit')::BIGINT END;
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stat_tenants AS
SELECT
nodeid,
colocation_id,
tenant_attribute,
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
FROM pg_catalog.citus_stat_tenants(FALSE);
ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog;
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) TO pg_monitor;
REVOKE ALL ON pg_catalog.citus_stat_tenants FROM PUBLIC;
GRANT SELECT ON pg_catalog.citus_stat_tenants TO pg_monitor;

View File

@ -0,0 +1,32 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE C
AS 'citus', $$citus_stat_tenants_local$$;
CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS
SELECT
colocation_id,
tenant_attribute,
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
FROM pg_catalog.citus_stat_tenants_local()
ORDER BY score DESC;
ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog;
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor;
REVOKE ALL ON pg_catalog.citus_stat_tenants_local FROM PUBLIC;
GRANT SELECT ON pg_catalog.citus_stat_tenants_local TO pg_monitor;

View File

@ -0,0 +1,32 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
return_all_tenants BOOLEAN DEFAULT FALSE,
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT read_count_in_this_period INT,
OUT read_count_in_last_period INT,
OUT query_count_in_this_period INT,
OUT query_count_in_last_period INT,
OUT score BIGINT)
RETURNS SETOF RECORD
LANGUAGE C
AS 'citus', $$citus_stat_tenants_local$$;
CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS
SELECT
colocation_id,
tenant_attribute,
read_count_in_this_period,
read_count_in_last_period,
query_count_in_this_period,
query_count_in_last_period
FROM pg_catalog.citus_stat_tenants_local()
ORDER BY score DESC;
ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog;
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor;
REVOKE ALL ON pg_catalog.citus_stat_tenants_local FROM PUBLIC;
GRANT SELECT ON pg_catalog.citus_stat_tenants_local TO pg_monitor;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_reset()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_stat_tenants_local_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_tenants_local_reset()
IS 'resets the local tenant statistics';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_reset()
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_stat_tenants_local_reset$$;
COMMENT ON FUNCTION pg_catalog.citus_stat_tenants_local_reset()
IS 'resets the local tenant statistics';

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_reset()
RETURNS VOID
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM run_command_on_all_nodes($$SELECT citus_stat_tenants_local_reset()$$);
END;
$function$;

View File

@ -0,0 +1,8 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_reset()
RETURNS VOID
LANGUAGE plpgsql
AS $function$
BEGIN
PERFORM run_command_on_all_nodes($$SELECT citus_stat_tenants_local_reset()$$);
END;
$function$;

View File

@ -0,0 +1,38 @@
/*-------------------------------------------------------------------------
*
* citus_stat_tenants.c
*
* This file contains functions to test citus_stat_tenants.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "sys/time.h"
PG_FUNCTION_INFO_V1(sleep_until_next_period);
/*
* sleep_until_next_period sleeps until the next monitoring period starts.
*/
Datum
sleep_until_next_period(PG_FUNCTION_ARGS)
{
struct timeval currentTime;
gettimeofday(&currentTime, NULL);
long int nextPeriodStart = currentTime.tv_sec -
(currentTime.tv_sec % StatTenantsPeriod) +
StatTenantsPeriod;
long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 -
currentTime.tv_usec + 100000;
pg_usleep(sleepTime);
PG_RETURN_VOID();
}

View File

@ -0,0 +1,771 @@
/*-------------------------------------------------------------------------
*
* citus_stat_tenants.c
* Routines related to the multi tenant monitor.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "unistd.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/log_utils.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/jsonbutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/tuplestore.h"
#include "distributed/colocation_utils.h"
#include "distributed/utils/citus_stat_tenants.h"
#include "executor/execdesc.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "sys/time.h"
#include "utils/builtins.h"
#include "utils/datetime.h"
#include "utils/json.h"
#include <time.h>
static void AttributeMetricsIfApplicable(void);
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
#define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define STAT_TENANTS_COLUMNS 7
#define ONE_QUERY_SCORE 1000000000
static char AttributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = "";
static CmdType AttributeToCommandType = CMD_UNKNOWN;
static int AttributeToColocationGroupId = INVALID_COLOCATION_ID;
static const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory for multi tenant monitor";
static char *TenantTrancheName = "Tenant Tranche";
static char *MonitorTrancheName = "Multi Tenant Monitor Tranche";
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
static int CompareTenantScore(const void *leftElement, const void *rightElement);
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime);
static void EvictTenantsIfNecessary(TimestampTz queryTime);
static void RecordTenantStats(TenantStats *tenantStats);
static void CreateMultiTenantMonitor(void);
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
static MultiTenantMonitor * GetMultiTenantMonitor(void);
static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime);
static int FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void);
static char * ExtractTopComment(const char *inputString);
static char * EscapeCommentChars(const char *str);
static char * UnescapeCommentChars(const char *str);
int StatTenantsLogLevel = CITUS_LOG_LEVEL_OFF;
int StatTenantsPeriod = (time_t) 60;
int StatTenantsLimit = 100;
int StatTenantsTrack = STAT_TENANTS_TRACK_NONE;
PG_FUNCTION_INFO_V1(citus_stat_tenants_local);
PG_FUNCTION_INFO_V1(citus_stat_tenants_local_reset);
/*
* citus_stat_tenants_local finds, updates and returns the statistics for tenants.
*/
Datum
citus_stat_tenants_local(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/*
* We keep more than StatTenantsLimit tenants in our monitor.
* We do this to not lose data if a tenant falls out of top StatTenantsLimit in case they need to return soon.
* Normally we return StatTenantsLimit tenants but if returnAllTenants is true we return all of them.
*/
bool returnAllTenants = PG_GETARG_BOOL(0);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
TimestampTz monitoringTime = GetCurrentTimestamp();
Datum values[STAT_TENANTS_COLUMNS];
bool isNulls[STAT_TENANTS_COLUMNS];
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
if (monitor == NULL)
{
PG_RETURN_VOID();
}
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
int numberOfRowsToReturn = 0;
if (returnAllTenants)
{
numberOfRowsToReturn = monitor->tenantCount;
}
else
{
numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit);
}
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
{
UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
CompareTenantScore);
for (int i = 0; i < numberOfRowsToReturn; i++)
{
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
TenantStats *tenantStats = &monitor->tenants[i];
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
tenantStats->writesInThisPeriod);
values[5] = Int32GetDatum(tenantStats->readsInLastPeriod +
tenantStats->writesInLastPeriod);
values[6] = Int64GetDatum(tenantStats->score);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
}
LWLockRelease(&monitor->lock);
PG_RETURN_VOID();
}
/*
* citus_stat_tenants_local_reset resets monitor for tenant statistics
* on the local node.
*/
Datum
citus_stat_tenants_local_reset(PG_FUNCTION_ARGS)
{
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
monitor->tenantCount = 0;
PG_RETURN_VOID();
}
/*
* AttributeQueryIfAnnotated checks the query annotation and if the query is annotated
* for the tenant statistics monitoring this function records the tenant attributes.
*/
void
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE)
{
return;
}
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
if (query_string == NULL)
{
return;
}
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
{
char *annotation = ExtractTopComment(query_string);
if (annotation != NULL)
{
Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
char *tenantId = NULL;
if (tenantIdTextP != NULL)
{
tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
}
int colocationId = ExtractFieldInt32(jsonbDatum, "cId",
INVALID_COLOCATION_ID);
AttributeTask(tenantId, colocationId, commandType);
}
}
}
/*
* AttributeTask assigns the given attributes of a tenant and starts a timer
*/
void
AttributeTask(char *tenantId, int colocationId, CmdType commandType)
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
{
return;
}
AttributeToColocationGroupId = colocationId;
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
AttributeToCommandType = commandType;
}
/*
* AnnotateQuery annotates the query with tenant attributes.
*/
char *
AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId)
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL)
{
return queryString;
}
char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
partitionKeyValue->consttype);
char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
StringInfo escapedSourceName = makeStringInfo();
escape_json(escapedSourceName, commentCharsEscaped);
StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);
appendStringInfoString(newQuery, queryString);
return newQuery->data;
}
/*
* CitusAttributeToEnd keeps the statistics for the tenant and calls the previously installed end hook
* or the standard executor end function.
*/
void
CitusAttributeToEnd(QueryDesc *queryDesc)
{
/*
* At the end of the Executor is the last moment we have to attribute the previous
* attribution to a tenant, if applicable
*/
AttributeMetricsIfApplicable();
/* now call in to the previously installed hook, or the standard implementation */
if (prev_ExecutorEnd)
{
prev_ExecutorEnd(queryDesc);
}
else
{
standard_ExecutorEnd(queryDesc);
}
}
/*
* CompareTenantScore is used to sort the tenant statistics by score
* in descending order.
*/
static int
CompareTenantScore(const void *leftElement, const void *rightElement)
{
const TenantStats *leftTenant = (const TenantStats *) leftElement;
const TenantStats *rightTenant = (const TenantStats *) rightElement;
if (leftTenant->score > rightTenant->score)
{
return -1;
}
else if (leftTenant->score < rightTenant->score)
{
return 1;
}
return 0;
}
/*
* AttributeMetricsIfApplicable updates the metrics for current tenant's statistics
*/
static void
AttributeMetricsIfApplicable()
{
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
AttributeToTenant[0] == '\0')
{
return;
}
TimestampTz queryTime = GetCurrentTimestamp();
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
/*
* We need to acquire the monitor lock in shared mode to check if the tenant is
* already in the monitor. If it is not, we need to acquire the lock in
* exclusive mode to add the tenant to the monitor.
*
* We need to check again if the tenant is in the monitor after acquiring the
* exclusive lock to avoid adding the tenant twice. Some other backend might
* have added the tenant while we were waiting for the lock.
*
* After releasing the exclusive lock, we need to acquire the lock in shared
* mode to update the tenant's statistics. We need to check again if the tenant
* is in the monitor after acquiring the shared lock because some other backend
* might have removed the tenant while we were waiting for the lock.
*/
LWLockAcquire(&monitor->lock, LW_SHARED);
int currentTenantIndex = FindTenantStats(monitor);
if (currentTenantIndex != -1)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
LWLockRelease(&tenantStats->lock);
}
else
{
LWLockRelease(&monitor->lock);
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
currentTenantIndex = FindTenantStats(monitor);
if (currentTenantIndex == -1)
{
currentTenantIndex = CreateTenantStats(monitor, queryTime);
}
LWLockRelease(&monitor->lock);
LWLockAcquire(&monitor->lock, LW_SHARED);
currentTenantIndex = FindTenantStats(monitor);
if (currentTenantIndex != -1)
{
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
UpdatePeriodsIfNecessary(tenantStats, queryTime);
ReduceScoreIfNecessary(tenantStats, queryTime);
RecordTenantStats(tenantStats);
LWLockRelease(&tenantStats->lock);
}
}
LWLockRelease(&monitor->lock);
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
}
/*
* UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed.
*
* If 1 period has passed after the latest query, this function moves this period's counts to the last period
* and cleans this period's statistics.
*
* If 2 or more periods has passed after the last query, this function cleans all both this and last period's
* statistics.
*/
static void
UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
{
long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC;
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
/*
* If the last query in this tenant was before the start of current period
* but there are some query count for this period we move them to the last period.
*/
if (tenantStats->lastQueryTime < periodStart &&
(tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod))
{
tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod;
tenantStats->writesInThisPeriod = 0;
tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod;
tenantStats->readsInThisPeriod = 0;
}
/*
* If the last query is more than two periods ago, we clean the last period counts too.
*/
if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart,
periodInMicroSeconds))
{
tenantStats->writesInLastPeriod = 0;
tenantStats->readsInLastPeriod = 0;
}
tenantStats->lastQueryTime = queryTime;
}
/*
* ReduceScoreIfNecessary reduces the tenant score only if it is necessary.
*
* We halve the tenants' scores after each period. This function checks the number of
* periods that passed after the lsat score reduction and reduces the score accordingly.
*/
static void
ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime)
{
long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC;
TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds);
/*
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
* After one period we halve the scores.
*
* Here we calculate how many periods passed after the last time we did score reduction
* If the latest score reduction was in this period this number should be 0,
* if it was in the last period this number should be 1 and so on.
*/
int periodCountAfterLastScoreReduction = (periodStart -
tenantStats->lastScoreReduction +
periodInMicroSeconds - 1) /
periodInMicroSeconds;
/*
* This should not happen but let's make sure
*/
if (periodCountAfterLastScoreReduction < 0)
{
periodCountAfterLastScoreReduction = 0;
}
/*
* If the last score reduction was not in this period we do score reduction now.
*/
if (periodCountAfterLastScoreReduction > 0)
{
tenantStats->score >>= periodCountAfterLastScoreReduction;
tenantStats->lastScoreReduction = queryTime;
}
}
/*
* EvictTenantsIfNecessary sorts and evicts the tenants if the tenant count is more than or
* equal to 3 * StatTenantsLimit.
*/
static void
EvictTenantsIfNecessary(TimestampTz queryTime)
{
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
/*
* We keep up to StatTenantsLimit * 3 tenants instead of StatTenantsLimit,
* so we don't lose data immediately after a tenant is out of top StatTenantsLimit
*
* Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2.
*/
if (monitor->tenantCount >= StatTenantsLimit * 3)
{
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
{
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime);
}
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
CompareTenantScore);
monitor->tenantCount = StatTenantsLimit * 2;
}
}
/*
* RecordTenantStats records the query statistics for the tenant.
*/
static void
RecordTenantStats(TenantStats *tenantStats)
{
if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE)
{
tenantStats->score += ONE_QUERY_SCORE;
}
else
{
tenantStats->score = LLONG_MAX;
}
if (AttributeToCommandType == CMD_SELECT)
{
tenantStats->readsInThisPeriod++;
}
else if (AttributeToCommandType == CMD_UPDATE ||
AttributeToCommandType == CMD_INSERT ||
AttributeToCommandType == CMD_DELETE)
{
tenantStats->writesInThisPeriod++;
}
}
/*
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
*/
static void
CreateMultiTenantMonitor()
{
MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor();
monitor->tenantCount = 0;
}
/*
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
*/
static MultiTenantMonitor *
CreateSharedMemoryForMultiTenantMonitor()
{
bool found = false;
MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
MultiTenantMonitorshmemSize(),
&found);
if (found)
{
return monitor;
}
monitor->namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->namedLockTranche.trancheName = MonitorTrancheName;
LWLockRegisterTranche(monitor->namedLockTranche.trancheId,
monitor->namedLockTranche.trancheName);
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
return monitor;
}
/*
* GetMultiTenantMonitor returns the data structure for multi tenant monitor.
*/
static MultiTenantMonitor *
GetMultiTenantMonitor()
{
bool found = false;
MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
MultiTenantMonitorshmemSize(),
&found);
if (!found)
{
elog(WARNING, "monitor not found");
return NULL;
}
return monitor;
}
/*
* InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook
* so that the multi tenant monitor can be initialized and stored in shared memory.
*/
void
InitializeMultiTenantMonitorSMHandleManagement()
{
prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = MultiTenantMonitorSMInit;
}
/*
* MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData.
*/
static void
MultiTenantMonitorSMInit()
{
CreateMultiTenantMonitor();
if (prev_shmem_startup_hook != NULL)
{
prev_shmem_startup_hook();
}
}
/*
* CreateTenantStats creates the data structure for a tenant's statistics.
*
* Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode.
*/
static int
CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime)
{
/*
* If the tenant count reached 3 * StatTenantsLimit, we evict the tenants
* with the lowest score.
*/
EvictTenantsIfNecessary(queryTime);
int tenantIndex = monitor->tenantCount;
memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId;
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->tenants[tenantIndex].namedLockTranche.trancheName = TenantTrancheName;
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId,
monitor->tenants[tenantIndex].namedLockTranche.trancheName);
LWLockInitialize(&monitor->tenants[tenantIndex].lock,
monitor->tenants[tenantIndex].namedLockTranche.trancheId);
monitor->tenantCount++;
return tenantIndex;
}
/*
* FindTenantStats finds the index for the current tenant's statistics.
*/
static int
FindTenantStats(MultiTenantMonitor *monitor)
{
for (int i = 0; i < monitor->tenantCount; i++)
{
TenantStats *tenantStats = &monitor->tenants[i];
if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 &&
tenantStats->colocationGroupId == AttributeToColocationGroupId)
{
return i;
}
}
return -1;
}
/*
* MultiTenantMonitorshmemSize calculates the size of the multi tenant monitor using
* StatTenantsLimit parameter.
*/
static size_t
MultiTenantMonitorshmemSize(void)
{
Size size = sizeof(MultiTenantMonitor);
size = add_size(size, mul_size(sizeof(TenantStats), StatTenantsLimit * 3));
return size;
}
/*
* ExtractTopComment extracts the top-level multi-line comment from a given input string.
*/
static char *
ExtractTopComment(const char *inputString)
{
int commentCharsLength = 2;
int inputStringLen = strlen(inputString);
if (inputStringLen < commentCharsLength)
{
return NULL;
}
const char *commentStartChars = "/*";
const char *commentEndChars = "*/";
/* If query doesn't start with a comment, return NULL */
if (strstr(inputString, commentStartChars) != inputString)
{
return NULL;
}
StringInfo commentData = makeStringInfo();
/* Skip the comment start characters */
const char *commentStart = inputString + commentCharsLength;
/* Find the first comment end character */
const char *commentEnd = strstr(commentStart, commentEndChars);
if (commentEnd == NULL)
{
return NULL;
}
/* Append the comment to the StringInfo buffer */
int commentLength = commentEnd - commentStart;
appendStringInfo(commentData, "%.*s", commentLength, commentStart);
/* Return the extracted comment */
return commentData->data;
}
/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */
static char *
EscapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo escapedString = makeStringInfo();
for (int originalStringIndex = 0; originalStringIndex < originalStringLength;
originalStringIndex++)
{
if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/')
{
appendStringInfoChar(escapedString, '\\');
}
appendStringInfoChar(escapedString, str[originalStringIndex]);
}
return escapedString->data;
}
/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */
static char *
UnescapeCommentChars(const char *str)
{
int originalStringLength = strlen(str);
StringInfo unescapedString = makeStringInfo();
for (int originalStringindex = 0; originalStringindex < originalStringLength;
originalStringindex++)
{
if (str[originalStringindex] == '\\' &&
originalStringindex < originalStringLength - 1 &&
(str[originalStringindex + 1] == '*' ||
str[originalStringindex + 1] == '/'))
{
originalStringindex++;
}
appendStringInfoChar(unescapedString, str[originalStringindex]);
}
return unescapedString->data;
}

View File

@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue)
}
/*
* ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns
* defaultValue if it doesn't exist.
*/
int32
ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue)
{
Datum jsonbDatum = 0;
bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false);
if (!found)
{
return defaultValue;
}
Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum);
return DatumGetInt32(int32Datum);
}
/*
* ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or
* returns NULL if it doesn't exist.

View File

@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan);
extern bool IsCitusPlan(Plan *plan);
extern bool IsCitusCustomScan(Plan *plan);
extern void SetJobColocationId(Job *job);
#endif /* CITUS_CUSTOM_SCAN_H */

View File

@ -16,5 +16,6 @@
bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result);
text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName);
bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue);
int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue);
#endif /* CITUS_JSONBUTILS_H */

View File

@ -330,6 +330,9 @@ typedef struct Task
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
*/
bool cannotBeExecutedInTransction;
Const *partitionKeyValue;
int colocationId;
} Task;

View File

@ -0,0 +1,113 @@
/*-------------------------------------------------------------------------
*
* citus_stat_tenants.h
* Routines related to the multi tenant monitor.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_ATTRIBUTE_H
#define CITUS_ATTRIBUTE_H
#include "executor/execdesc.h"
#include "executor/executor.h"
#include "storage/lwlock.h"
#include "utils/datetime.h"
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
/*
* TenantStats is the struct that keeps statistics about one tenant.
*/
typedef struct TenantStats
{
/*
* The attribute value, e.g distribution column, and colocation group id
* of the tenant.
*/
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
int colocationGroupId;
/*
* Number of SELECT queries this tenant ran in this and last periods.
*/
int readsInLastPeriod;
int readsInThisPeriod;
/*
* Number of INSERT, UPDATE, and DELETE queries this tenant ran in this and last periods.
*/
int writesInLastPeriod;
int writesInThisPeriod;
/*
* The latest time this tenant ran a query. This value is used to update the score later.
*/
TimestampTz lastQueryTime;
/*
* The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query
* and halved after every period. This custom scoring mechanism is used to rank the tenants based on
* the recency and frequency of their activity. The score is used to rank the tenants and decide which
* tenants should be removed from the monitor.
*/
long long score;
/*
* The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later.
*/
TimestampTz lastScoreReduction;
/*
* Locks needed to update this tenant's statistics.
*/
NamedLWLockTranche namedLockTranche;
LWLock lock;
} TenantStats;
/*
* MultiTenantMonitor is the struct for keeping the statistics
* of the tenants
*/
typedef struct MultiTenantMonitor
{
/*
* Lock mechanism for the monitor.
* Each tenant update acquires the lock in shared mode and
* the tenant number reduction and monitor view acquires in exclusive mode.
*/
NamedLWLockTranche namedLockTranche;
LWLock lock;
/*
* tenantCount is the number of items in the tenants array.
* The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor
* and is 3 * citus.stat_tenants_limit
*/
int tenantCount;
TenantStats tenants[FLEXIBLE_ARRAY_MEMBER];
} MultiTenantMonitor;
typedef enum
{
STAT_TENANTS_TRACK_NONE = 0,
STAT_TENANTS_TRACK_ALL = 1
} StatTenantsTrackType;
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue,
int colocationId);
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType);
extern ExecutorEnd_hook_type prev_ExecutorEnd;
extern int StatTenantsLogLevel;
extern int StatTenantsPeriod;
extern int StatTenantsLimit;
extern int StatTenantsTrack;
#endif /*CITUS_ATTRIBUTE_H */

View File

@ -307,3 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+
# shard_rebalancer output, flaky improvement number
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
# normalize tenants statistics annotations
s/\/\*\{"tId":.*\*\///g

View File

@ -0,0 +1,720 @@
CREATE SCHEMA citus_stat_tenants;
SET search_path TO citus_stat_tenants;
SET citus.next_shard_id TO 5797500;
SET citus.shard_replication_factor TO 1;
-- make sure that we are tracking the tenant stats
SELECT result FROM run_command_on_all_nodes('SHOW citus.stat_tenants_track');
result
---------------------------------------------------------------------
all
all
all
(3 rows)
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
RETURNS VOID
LANGUAGE C
AS 'citus', $$sleep_until_next_period$$;
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
-- set period to upper limit to prevent stats from being reset
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stat_tenants_period TO 86400');
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
result
---------------------------------------------------------------------
t
t
t
(3 rows)
CREATE TABLE dist_tbl (a INT, b TEXT);
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_tbl_2 (a INT, b INT);
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_tbl_text (a TEXT, b INT);
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_tbl (a INT, b INT);
SELECT create_reference_table('ref_tbl');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 1 | 0
2 | 0 | 0 | 1 | 0
3 | 0 | 0 | 1 | 0
4 | 0 | 0 | 1 | 0
5 | 0 | 0 | 1 | 0
(5 rows)
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
-- queries with multiple tenants should not be counted
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
?column?
---------------------------------------------------------------------
t
(1 row)
-- queries with reference tables should not be counted
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
(0 rows)
-- queries with multiple tables but one tenant should be counted
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) WHERE tenant_attribute = '1';
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
1 | 2
(1 row)
-- test scoring
-- all of these distribution column values are from second worker
SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
2 | 1 | 1000000000
3 | 1 | 1000000000
4 | 1 | 1000000000
abcd | 1 | 1000000000
(4 rows)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
abcd | 3 | 3000000000
2 | 1 | 1000000000
3 | 1 | 1000000000
4 | 1 | 1000000000
bcde | 1 | 1000000000
cdef | 1 | 1000000000
(6 rows)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
abcd | 3 | 3000000000
bcde | 3 | 3000000000
2 | 1 | 1000000000
3 | 1 | 1000000000
4 | 1 | 1000000000
cdef | 1 | 1000000000
defg | 1 | 1000000000
(7 rows)
-- test period passing
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 1 | 0 | 1 | 0
5 | 0 | 0 | 1 | 0
(2 rows)
-- simulate passing the period
SET citus.stat_tenants_period TO 2;
SELECT sleep_until_next_period();
sleep_until_next_period
---------------------------------------------------------------------
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 0 | 1 | 0 | 1
5 | 0 | 0 | 0 | 1
(2 rows)
\c - - - :master_port
SET search_path TO citus_stat_tenants;
-- test logs
SET client_min_messages TO LOG;
SELECT count(*)>=0 FROM citus_stat_tenants;
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.stat_tenants_log_level TO ERROR;
SELECT count(*)>=0 FROM citus_stat_tenants;
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.stat_tenants_log_level TO OFF;
SELECT count(*)>=0 FROM citus_stat_tenants;
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.stat_tenants_log_level TO LOG;
SELECT count(*)>=0 FROM citus_stat_tenants;
LOG: Generating citus_stat_tenants
CONTEXT: PL/pgSQL function citus_stat_tenants(boolean) line XX at RAISE
?column?
---------------------------------------------------------------------
t
(1 row)
SET citus.stat_tenants_log_level TO DEBUG;
SELECT count(*)>=0 FROM citus_stat_tenants;
LOG: Generating citus_stat_tenants
CONTEXT: PL/pgSQL function citus_stat_tenants(boolean) line XX at RAISE
?column?
---------------------------------------------------------------------
t
(1 row)
RESET client_min_messages;
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
-- test turning monitoring on/off
SET citus.stat_tenants_track TO "NONE";
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
INSERT INTO dist_tbl VALUES (1, 1);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
(0 rows)
SET citus.stat_tenants_track TO "ALL";
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
(0 rows)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
INSERT INTO dist_tbl VALUES (1, 1);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
1 | 2
(1 row)
-- test special and multibyte characters in tenant attribute
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
TRUNCATE TABLE dist_tbl_text;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/*bcde | 1 | 0 | 1 | 0
/b*c/de | 1 | 0 | 1 | 0
/b*cde | 1 | 0 | 1 | 0
/b/**/cde | 1 | 0 | 1 | 0
/b/*/cde | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
b/*//cde | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
bcde*/ | 1 | 0 | 1 | 0
(10 rows)
\c - - - :worker_2_port
SET search_path TO citus_stat_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/*bcde | 1 | 0 | 1 | 0
/b*c/de | 1 | 0 | 1 | 0
/b*cde | 1 | 0 | 1 | 0
/b/**/cde | 1 | 0 | 1 | 0
/b/*/cde | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
b/*//cde | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
bcde*/ | 1 | 0 | 1 | 0
(10 rows)
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
-- test local queries
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
(4 rows)
-- test local cached queries & prepared statements
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 4 | 0 | 4 | 0
/bcde | 4 | 0 | 4 | 0
äbc | 4 | 0 | 4 | 0
bcde* | 4 | 0 | 4 | 0
(4 rows)
\c - - - :master_port
SET search_path TO citus_stat_tenants;
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SET search_path TO citus_stat_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 7 | 0 | 7 | 0
/bcde | 7 | 0 | 7 | 0
äbc | 7 | 0 | 7 | 0
bcde* | 7 | 0 | 7 | 0
(4 rows)
\c - - - :master_port
SET search_path TO citus_stat_tenants;
SELECT citus_stat_tenants_reset();
citus_stat_tenants_reset
---------------------------------------------------------------------
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
thisisaverylooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo | 1 | 0 | 1 | 0
(1 row)
-- test role permissions
CREATE ROLE stats_non_superuser WITH LOGIN;
SET ROLE stats_non_superuser;
SELECT count(*)>=0 FROM citus_stat_tenants;
ERROR: permission denied for view citus_stat_tenants
SELECT count(*)>=0 FROM citus_stat_tenants_local;
ERROR: permission denied for view citus_stat_tenants_local
SELECT count(*)>=0 FROM citus_stat_tenants();
ERROR: permission denied for function citus_stat_tenants
SELECT count(*)>=0 FROM citus_stat_tenants_local();
ERROR: permission denied for function citus_stat_tenants_local
RESET ROLE;
GRANT pg_monitor TO stats_non_superuser;
SET ROLE stats_non_superuser;
SELECT count(*)>=0 FROM citus_stat_tenants;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM citus_stat_tenants_local;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM citus_stat_tenants();
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM citus_stat_tenants_local();
?column?
---------------------------------------------------------------------
t
(1 row)
RESET ROLE;
DROP ROLE stats_non_superuser;
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;

View File

@ -25,7 +25,7 @@ SELECT citus.clear_network_traffic();
---- test multiple statements spanning multiple shards,
---- at each significant point. These transactions are 2pc
-- fail at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -54,7 +54,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -83,7 +83,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- fail at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -110,7 +110,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -137,7 +137,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- fail at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -163,7 +163,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
(4 rows)
-- cancel at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------

View File

@ -36,7 +36,7 @@ SELECT create_reference_table('reference_table');
-- (d) multi-row INSERT that hits multiple shards in multiple workers
-- (e) multi-row INSERT to a reference table
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -26,7 +26,7 @@ SELECT COUNT(*) FROM ref_table;
(1 row)
-- verify behavior of single INSERT; should fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -41,7 +41,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=5;
(1 row)
-- verify behavior of UPDATE ... RETURNING; should not execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -56,7 +56,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=7;
(1 row)
-- verify fix to #2214; should raise error and fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -21,7 +21,7 @@ CREATE TABLE partitioned_table_0
PARTITION OF partitioned_table (dist_key, partition_id)
FOR VALUES IN ( 0 );
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -20,7 +20,7 @@ SELECT create_distributed_table('mod_test', 'key');
(1 row)
-- verify behavior of single INSERT; should mark shard as failed
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
mitmproxy
---------------------------------------------------------------------
@ -52,7 +52,7 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
INSERT INTO mod_test VALUES (2, 6);
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------
@ -78,7 +78,7 @@ WHERE shardid IN (
TRUNCATE mod_test;
-- verify behavior of multi-statement modifications to a single shard
-- should fail the transaction and never mark placements inactive
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
mitmproxy
---------------------------------------------------------------------

View File

@ -23,7 +23,7 @@ SELECT create_distributed_table('select_test', 'key');
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
mitmproxy
---------------------------------------------------------------------
@ -45,7 +45,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
-- kill after first SELECT; txn should fail as INSERT triggers
-- 2PC (and placementis not marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
mitmproxy
---------------------------------------------------------------------
@ -66,7 +66,7 @@ TRUNCATE select_test;
-- now the same tests with query cancellation
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -77,7 +77,7 @@ ERROR: canceling statement due to user request
SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -107,7 +107,7 @@ SELECT citus.mitmproxy('conn.allow()');
TRUNCATE select_test;
-- cancel the second query
-- error after second SELECT; txn should fail
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
@ -126,7 +126,7 @@ SELECT * FROM select_test WHERE key = 3;
ERROR: canceling statement due to user request
COMMIT;
-- error after second SELECT; txn should fails the transaction
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
mitmproxy
---------------------------------------------------------------------
@ -144,7 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
COMMIT;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
mitmproxy
---------------------------------------------------------------------
@ -173,7 +173,7 @@ SELECT create_distributed_table('select_test', 'key');
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
mitmproxy
---------------------------------------------------------------------
@ -188,7 +188,7 @@ SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------

View File

@ -1366,9 +1366,15 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal_mark_node_not_synced(integer,integer) void
| function citus_internal_start_replication_origin_tracking() void
| function citus_internal_stop_replication_origin_tracking() void
| function citus_stat_tenants(boolean) SETOF record
| function citus_stat_tenants_local(boolean) SETOF record
| function citus_stat_tenants_local_reset() void
| function citus_stat_tenants_reset() void
| function worker_adjust_identity_column_seq_ranges(regclass) void
| function worker_drop_all_shell_tables(boolean)
(6 rows)
| view citus_stat_tenants
| view citus_stat_tenants_local
(12 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -21,7 +21,9 @@ ORDER BY 1;
---------------------------------------------------------------------
pg_dist_authinfo
pg_dist_clock_logical_seq
(2 rows)
citus_stat_tenants_local
citus_stat_tenants
(4 rows)
RESET role;
DROP USER no_access;

View File

@ -125,6 +125,10 @@ ORDER BY 1;
function citus_stat_activity()
function citus_stat_statements()
function citus_stat_statements_reset()
function citus_stat_tenants(boolean)
function citus_stat_tenants_local(boolean)
function citus_stat_tenants_local_reset()
function citus_stat_tenants_reset()
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_task_wait(bigint,citus_task_status)
@ -322,7 +326,9 @@ ORDER BY 1;
view citus_shards_on_worker
view citus_stat_activity
view citus_stat_statements
view citus_stat_tenants
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(316 rows)
(322 rows)

View File

@ -102,6 +102,11 @@ test: pg13_propagate_statistics
# ----------
test: citus_update_table_statistics
# ----------
# Test for tenant statistics
# ----------
test: citus_stat_tenants
# ----------
# Parallel TPC-H tests to check our distributed execution behavior
# ----------

View File

@ -489,6 +489,8 @@ push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.enable_change_data_capture=on");
push(@pgOptions, "citus.stat_tenants_limit = 10");
push(@pgOptions, "citus.stat_tenants_track = 'ALL'");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -0,0 +1,235 @@
CREATE SCHEMA citus_stat_tenants;
SET search_path TO citus_stat_tenants;
SET citus.next_shard_id TO 5797500;
SET citus.shard_replication_factor TO 1;
-- make sure that we are tracking the tenant stats
SELECT result FROM run_command_on_all_nodes('SHOW citus.stat_tenants_track');
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
RETURNS VOID
LANGUAGE C
AS 'citus', $$sleep_until_next_period$$;
SELECT citus_stat_tenants_reset();
-- set period to upper limit to prevent stats from being reset
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stat_tenants_period TO 86400');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
CREATE TABLE dist_tbl (a INT, b TEXT);
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
CREATE TABLE dist_tbl_2 (a INT, b INT);
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
CREATE TABLE dist_tbl_text (a TEXT, b INT);
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
CREATE TABLE ref_tbl (a INT, b INT);
SELECT create_reference_table('ref_tbl');
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
SELECT citus_stat_tenants_reset();
-- queries with multiple tenants should not be counted
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
-- queries with reference tables should not be counted
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute;
-- queries with multiple tables but one tenant should be counted
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) WHERE tenant_attribute = '1';
-- test scoring
-- all of these distribution column values are from second worker
SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute;
-- test period passing
SELECT citus_stat_tenants_reset();
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (5, 'abcd');
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
-- simulate passing the period
SET citus.stat_tenants_period TO 2;
SELECT sleep_until_next_period();
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stat_tenants;
-- test logs
SET client_min_messages TO LOG;
SELECT count(*)>=0 FROM citus_stat_tenants;
SET citus.stat_tenants_log_level TO ERROR;
SELECT count(*)>=0 FROM citus_stat_tenants;
SET citus.stat_tenants_log_level TO OFF;
SELECT count(*)>=0 FROM citus_stat_tenants;
SET citus.stat_tenants_log_level TO LOG;
SELECT count(*)>=0 FROM citus_stat_tenants;
SET citus.stat_tenants_log_level TO DEBUG;
SELECT count(*)>=0 FROM citus_stat_tenants;
RESET client_min_messages;
SELECT citus_stat_tenants_reset();
-- test turning monitoring on/off
SET citus.stat_tenants_track TO "NONE";
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (1, 1);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SET citus.stat_tenants_track TO "ALL";
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
INSERT INTO dist_tbl VALUES (1, 1);
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
-- test special and multibyte characters in tenant attribute
SELECT citus_stat_tenants_reset();
TRUNCATE TABLE dist_tbl_text;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SET search_path TO citus_stat_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
SELECT citus_stat_tenants_reset();
-- test local queries
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
-- test local cached queries & prepared statements
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stat_tenants;
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
\c - - - :worker_2_port
SET search_path TO citus_stat_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stat_tenants;
SELECT citus_stat_tenants_reset();
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
-- test role permissions
CREATE ROLE stats_non_superuser WITH LOGIN;
SET ROLE stats_non_superuser;
SELECT count(*)>=0 FROM citus_stat_tenants;
SELECT count(*)>=0 FROM citus_stat_tenants_local;
SELECT count(*)>=0 FROM citus_stat_tenants();
SELECT count(*)>=0 FROM citus_stat_tenants_local();
RESET ROLE;
GRANT pg_monitor TO stats_non_superuser;
SET ROLE stats_non_superuser;
SELECT count(*)>=0 FROM citus_stat_tenants;
SELECT count(*)>=0 FROM citus_stat_tenants_local;
SELECT count(*)>=0 FROM citus_stat_tenants();
SELECT count(*)>=0 FROM citus_stat_tenants_local();
RESET ROLE;
DROP ROLE stats_non_superuser;
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stat_tenants CASCADE;

View File

@ -21,7 +21,7 @@ SELECT citus.clear_network_traffic();
---- at each significant point. These transactions are 2pc
-- fail at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -35,7 +35,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at DELETE
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -49,7 +49,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- fail at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -63,7 +63,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at INSERT
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -77,7 +77,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- fail at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
DELETE FROM dml_test WHERE id = 1;
@ -91,7 +91,7 @@ COMMIT;
SELECT * FROM dml_test ORDER BY id ASC;
-- cancel at UPDATE
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
BEGIN;
DELETE FROM dml_test WHERE id = 1;

View File

@ -30,7 +30,7 @@ SELECT create_reference_table('reference_table');
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3);
-- this test is broken, see https://github.com/citusdata/citus/issues/2460

View File

@ -17,19 +17,19 @@ SELECT citus.clear_network_traffic();
SELECT COUNT(*) FROM ref_table;
-- verify behavior of single INSERT; should fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO ref_table VALUES (5, 6);
SELECT COUNT(*) FROM ref_table WHERE key=5;
-- verify behavior of UPDATE ... RETURNING; should not execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
UPDATE ref_table SET key=7 RETURNING value;
SELECT COUNT(*) FROM ref_table WHERE key=7;
-- verify fix to #2214; should raise error and fail to execute
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
DELETE FROM ref_table WHERE key=5;

View File

@ -19,7 +19,7 @@ CREATE TABLE partitioned_table_0
INSERT INTO partitioned_table VALUES (0, 0);
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO partitioned_table VALUES (0, 0);

View File

@ -8,7 +8,7 @@ CREATE TABLE mod_test (key int, value text);
SELECT create_distributed_table('mod_test', 'key');
-- verify behavior of single INSERT; should mark shard as failed
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
INSERT INTO mod_test VALUES (2, 6);
SELECT COUNT(*) FROM mod_test WHERE key=2;
@ -24,7 +24,7 @@ TRUNCATE mod_test;
SELECT citus.mitmproxy('conn.allow()');
INSERT INTO mod_test VALUES (2, 6);
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key;
SELECT COUNT(*) FROM mod_test WHERE value='ok';
@ -38,7 +38,7 @@ TRUNCATE mod_test;
-- verify behavior of multi-statement modifications to a single shard
-- should fail the transaction and never mark placements inactive
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
BEGIN;
INSERT INTO mod_test VALUES (2, 6);

View File

@ -13,13 +13,13 @@ SELECT create_distributed_table('select_test', 'key');
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 3;
-- kill after first SELECT; txn should fail as INSERT triggers
-- 2PC (and placementis not marked bad)
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -35,12 +35,12 @@ TRUNCATE select_test;
-- put data in shard for which mitm node is first placement
INSERT INTO select_test VALUES (3, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT * FROM select_test WHERE key = 3;
SELECT * FROM select_test WHERE key = 3;
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -58,7 +58,7 @@ TRUNCATE select_test;
-- cancel the second query
-- error after second SELECT; txn should fail
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -68,7 +68,7 @@ SELECT * FROM select_test WHERE key = 3;
COMMIT;
-- error after second SELECT; txn should fails the transaction
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
BEGIN;
INSERT INTO select_test VALUES (3, 'more data');
@ -77,7 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
SELECT * FROM select_test WHERE key = 3;
COMMIT;
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
SELECT recover_prepared_transactions();
SELECT recover_prepared_transactions();
@ -93,12 +93,12 @@ SELECT create_distributed_table('select_test', 'key');
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
INSERT INTO select_test VALUES (1, 'test data');
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;
-- now the same test with query cancellation
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
SELECT * FROM select_test WHERE key = 1;
SELECT * FROM select_test WHERE key = 1;