diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5e4afd1a7..28486f23d 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache( DistributedPlan *originalDistributedPlan); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); -static void SetJobColocationId(Job *job); static void EnsureForceDelegationDistributionKey(Job *job); static void EnsureAnchorShardsInJobExist(Job *job); static bool AnchorShardsInTaskListExist(List *taskList); @@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan) * colocation group, the Job's colocation ID is set to the group ID, else, * it will be set to INVALID_COLOCATION_ID. */ -static void +void SetJobColocationId(Job *job) { uint32 jobColocationId = INVALID_COLOCATION_ID; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index d946e15c8..7cdc896e3 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -97,6 +97,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" /* to access LogRemoteCommands */ #include "distributed/transaction_management.h" +#include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" #include "executor/tstoreReceiver.h" @@ -647,6 +648,16 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, LocalExecutorShardId = task->anchorShardId; } + + char *partitionKeyValueString = NULL; + if (task->partitionKeyValue != NULL) + { + partitionKeyValueString = DatumToString(task->partitionKeyValue->constvalue, + task->partitionKeyValue->consttype); + } + + AttributeTask(partitionKeyValueString, task->colocationId, taskPlan->commandType); + PG_TRY(); { processedRows = ExecuteTaskPlan(taskPlan, queryString, tupleDest, task, diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index e62821ad0..5743ab1c1 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -26,6 +26,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/shard_utils.h" +#include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -141,6 +142,10 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : TaskQueryString(task)))); + task->partitionKeyValue = workerJob->partitionKeyValue; + SetJobColocationId(workerJob); + task->colocationId = workerJob->colocationId; + UpdateTaskQueryString(query, task); /* @@ -387,7 +392,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) return; } - SetTaskQueryString(task, DeparseTaskQuery(task, query)); + SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionKeyValue, task->colocationId)); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 1fcc45585..38962b333 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -49,6 +49,7 @@ #include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_utils.h" +#include "distributed/utils/citus_stat_tenants.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" @@ -305,6 +306,11 @@ distributed_planner(Query *parse, errhint("Consider using PL/pgSQL functions instead."))); } + /* + * We annotate the query for tenant statisisics. + */ + AttributeQueryIfAnnotated(query_string, parse->commandType); + return result; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index c3677bb1a..94691bab9 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -164,7 +164,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification); + bool isLocalTableModification, Const *partitionKeyValue, + int colocationId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1939,11 +1940,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, if (originalQuery->commandType == CMD_SELECT) { + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + job->partitionKeyValue, job->colocationId); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1967,11 +1971,14 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, } else { + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + job->partitionKeyValue, job->colocationId); } } @@ -2065,7 +2072,8 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification) + bool isLocalTableModification, Const *partitionKeyValue, + int colocationId) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2135,6 +2143,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; + task->partitionKeyValue = partitionKeyValue; + task->colocationId = colocationId; SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index c779b0a8d..76e0ae9b9 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -91,6 +91,7 @@ #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" +#include "distributed/utils/citus_stat_tenants.h" #include "distributed/utils/directory.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" @@ -228,6 +229,12 @@ static const struct config_enum_entry stat_statements_track_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry stat_tenants_track_options[] = { + { "none", STAT_TENANTS_TRACK_NONE, false }, + { "all", STAT_TENANTS_TRACK_ALL, false }, + { NULL, 0, false } +}; + static const struct config_enum_entry task_assignment_policy_options[] = { { "greedy", TASK_ASSIGNMENT_GREEDY, false }, { "first-replica", TASK_ASSIGNMENT_FIRST_REPLICA, false }, @@ -447,6 +454,8 @@ _PG_init(void) ExecutorStart_hook = CitusExecutorStart; ExecutorRun_hook = CitusExecutorRun; ExplainOneQuery_hook = CitusExplainOneQuery; + prev_ExecutorEnd = ExecutorEnd_hook; + ExecutorEnd_hook = CitusAttributeToEnd; /* register hook for error messages */ emit_log_hook = multi_log_hook; @@ -491,6 +500,8 @@ _PG_init(void) /* initialize shard split shared memory handle management */ InitializeShardSplitSMHandleManagement(); + InitializeMultiTenantMonitorSMHandleManagement(); + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -2363,6 +2374,50 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.stat_tenants_limit", + gettext_noop("Number of tenants to be shown in citus_stat_tenants."), + NULL, + &StatTenantsLimit, + 100, 1, 10000, + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomEnumVariable( + "citus.stat_tenants_log_level", + gettext_noop("Sets the level of citus_stat_tenants log messages"), + NULL, + &StatTenantsLogLevel, + CITUS_LOG_LEVEL_OFF, log_level_options, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.stat_tenants_period", + gettext_noop("Period in seconds to be used for calculating the tenant " + "statistics in citus_stat_tenants."), + NULL, + &StatTenantsPeriod, + 60, 1, 60 * 60 * 24, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomEnumVariable( + "citus.stat_tenants_track", + gettext_noop("Enables/Disables the stats collection for citus_stat_tenants."), + gettext_noop("Enables the stats collection when set to 'all'. " + "Disables when set to 'none'. Disabling can be useful for " + "avoiding extra CPU cycles needed for the calculations."), + &StatTenantsTrack, + STAT_TENANTS_TRACK_NONE, + stat_tenants_track_options, + PGC_SUSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.subquery_pushdown", gettext_noop("Usage of this GUC is highly discouraged, please read the long " diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index bbaf0ce4d..fd14cd90d 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -10,3 +10,8 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_ #include "udfs/worker_drop_all_shell_tables/11.3-1.sql" #include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql" +#include "udfs/citus_stat_tenants_local/11.3-1.sql" +#include "udfs/citus_stat_tenants/11.3-1.sql" + +#include "udfs/citus_stat_tenants_local_reset/11.3-1.sql" +#include "udfs/citus_stat_tenants_reset/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 322613e5f..15afec40a 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -20,3 +20,12 @@ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING; DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool); DROP FUNCTION pg_catalog.citus_internal_mark_node_not_synced(int, int); + +DROP VIEW pg_catalog.citus_stat_tenants_local; +DROP FUNCTION pg_catalog.citus_stat_tenants_local(boolean); + +DROP VIEW pg_catalog.citus_stat_tenants; +DROP FUNCTION pg_catalog.citus_stat_tenants(boolean); + +DROP FUNCTION pg_catalog.citus_stat_tenants_local_reset(); +DROP FUNCTION pg_catalog.citus_stat_tenants_reset(); diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants/11.3-1.sql new file mode 100644 index 000000000..bd294307c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants/11.3-1.sql @@ -0,0 +1,78 @@ +-- cts in the query is an abbreviation for citus_stat_tenants +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants ( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT nodeid INT, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT +) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +BEGIN + IF + array_position(enumvals, 'log') >= array_position(enumvals, setting) + AND setting != 'off' + FROM pg_settings + WHERE name = 'citus.stat_tenants_log_level' + THEN + RAISE LOG 'Generating citus_stat_tenants'; + END IF; + RETURN QUERY + SELECT * + FROM jsonb_to_recordset(( + SELECT + jsonb_agg(all_cst_rows_as_jsonb.cst_row_as_jsonb)::jsonb + FROM ( + SELECT + jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb || + ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS cst_row_as_jsonb + FROM + run_command_on_all_nodes ( + $$ + SELECT + coalesce(to_jsonb (array_agg(cstl.*)), '[]'::jsonb) + FROM citus_stat_tenants_local($$||return_all_tenants||$$) cstl; + $$, + parallel:= TRUE, + give_warning_for_connection_errors:= TRUE) + WHERE + success = 't') + AS all_cst_rows_as_jsonb)) +AS ( + nodeid INT, + colocation_id INT, + tenant_attribute TEXT, + read_count_in_this_period INT, + read_count_in_last_period INT, + query_count_in_this_period INT, + query_count_in_last_period INT, + score BIGINT +) + ORDER BY score DESC + LIMIT CASE WHEN NOT return_all_tenants THEN current_setting('citus.stat_tenants_limit')::BIGINT END; +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stat_tenants AS +SELECT + nodeid, + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stat_tenants(FALSE); + +ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) TO pg_monitor; + +REVOKE ALL ON pg_catalog.citus_stat_tenants FROM PUBLIC; +GRANT SELECT ON pg_catalog.citus_stat_tenants TO pg_monitor; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants/latest.sql new file mode 100644 index 000000000..bd294307c --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants/latest.sql @@ -0,0 +1,78 @@ +-- cts in the query is an abbreviation for citus_stat_tenants +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants ( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT nodeid INT, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT +) + RETURNS SETOF record + LANGUAGE plpgsql + AS $function$ +BEGIN + IF + array_position(enumvals, 'log') >= array_position(enumvals, setting) + AND setting != 'off' + FROM pg_settings + WHERE name = 'citus.stat_tenants_log_level' + THEN + RAISE LOG 'Generating citus_stat_tenants'; + END IF; + RETURN QUERY + SELECT * + FROM jsonb_to_recordset(( + SELECT + jsonb_agg(all_cst_rows_as_jsonb.cst_row_as_jsonb)::jsonb + FROM ( + SELECT + jsonb_array_elements(run_command_on_all_nodes.result::jsonb)::jsonb || + ('{"nodeid":' || run_command_on_all_nodes.nodeid || '}')::jsonb AS cst_row_as_jsonb + FROM + run_command_on_all_nodes ( + $$ + SELECT + coalesce(to_jsonb (array_agg(cstl.*)), '[]'::jsonb) + FROM citus_stat_tenants_local($$||return_all_tenants||$$) cstl; + $$, + parallel:= TRUE, + give_warning_for_connection_errors:= TRUE) + WHERE + success = 't') + AS all_cst_rows_as_jsonb)) +AS ( + nodeid INT, + colocation_id INT, + tenant_attribute TEXT, + read_count_in_this_period INT, + read_count_in_last_period INT, + query_count_in_this_period INT, + query_count_in_last_period INT, + score BIGINT +) + ORDER BY score DESC + LIMIT CASE WHEN NOT return_all_tenants THEN current_setting('citus.stat_tenants_limit')::BIGINT END; +END; +$function$; + +CREATE OR REPLACE VIEW citus.citus_stat_tenants AS +SELECT + nodeid, + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stat_tenants(FALSE); + +ALTER VIEW citus.citus_stat_tenants SET SCHEMA pg_catalog; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants(BOOLEAN) TO pg_monitor; + +REVOKE ALL ON pg_catalog.citus_stat_tenants FROM PUBLIC; +GRANT SELECT ON pg_catalog.citus_stat_tenants TO pg_monitor; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_local/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_local/11.3-1.sql new file mode 100644 index 000000000..103ca34b4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_local/11.3-1.sql @@ -0,0 +1,32 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stat_tenants_local$$; + + +CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stat_tenants_local() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor; + +REVOKE ALL ON pg_catalog.citus_stat_tenants_local FROM PUBLIC; +GRANT SELECT ON pg_catalog.citus_stat_tenants_local TO pg_monitor; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_local/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_local/latest.sql new file mode 100644 index 000000000..103ca34b4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_local/latest.sql @@ -0,0 +1,32 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stat_tenants_local$$; + + +CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stat_tenants_local() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog; + +REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC; +GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor; + +REVOKE ALL ON pg_catalog.citus_stat_tenants_local FROM PUBLIC; +GRANT SELECT ON pg_catalog.citus_stat_tenants_local TO pg_monitor; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/11.3-1.sql new file mode 100644 index 000000000..1419163f2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/11.3-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_reset() + RETURNS VOID + LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_stat_tenants_local_reset$$; + +COMMENT ON FUNCTION pg_catalog.citus_stat_tenants_local_reset() + IS 'resets the local tenant statistics'; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/latest.sql new file mode 100644 index 000000000..1419163f2 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_local_reset/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_reset() + RETURNS VOID + LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_stat_tenants_local_reset$$; + +COMMENT ON FUNCTION pg_catalog.citus_stat_tenants_local_reset() + IS 'resets the local tenant statistics'; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/11.3-1.sql new file mode 100644 index 000000000..aedd3ad99 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/11.3-1.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_reset() + RETURNS VOID + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM run_command_on_all_nodes($$SELECT citus_stat_tenants_local_reset()$$); +END; +$function$; diff --git a/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/latest.sql b/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/latest.sql new file mode 100644 index 000000000..aedd3ad99 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stat_tenants_reset/latest.sql @@ -0,0 +1,8 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_reset() + RETURNS VOID + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM run_command_on_all_nodes($$SELECT citus_stat_tenants_local_reset()$$); +END; +$function$; diff --git a/src/backend/distributed/test/citus_stat_tenants.c b/src/backend/distributed/test/citus_stat_tenants.c new file mode 100644 index 000000000..2cfe0029b --- /dev/null +++ b/src/backend/distributed/test/citus_stat_tenants.c @@ -0,0 +1,38 @@ +/*------------------------------------------------------------------------- + * + * citus_stat_tenants.c + * + * This file contains functions to test citus_stat_tenants. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" + +#include "distributed/utils/citus_stat_tenants.h" +#include "sys/time.h" + +PG_FUNCTION_INFO_V1(sleep_until_next_period); + +/* + * sleep_until_next_period sleeps until the next monitoring period starts. + */ +Datum +sleep_until_next_period(PG_FUNCTION_ARGS) +{ + struct timeval currentTime; + gettimeofday(¤tTime, NULL); + + long int nextPeriodStart = currentTime.tv_sec - + (currentTime.tv_sec % StatTenantsPeriod) + + StatTenantsPeriod; + + long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 - + currentTime.tv_usec + 100000; + pg_usleep(sleepTime); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/utils/citus_stat_tenants.c b/src/backend/distributed/utils/citus_stat_tenants.c new file mode 100644 index 000000000..88e16e54d --- /dev/null +++ b/src/backend/distributed/utils/citus_stat_tenants.c @@ -0,0 +1,771 @@ +/*------------------------------------------------------------------------- + * + * citus_stat_tenants.c + * Routines related to the multi tenant monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "unistd.h" + +#include "distributed/citus_safe_lib.h" +#include "distributed/log_utils.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/jsonbutils.h" +#include "distributed/colocation_utils.h" +#include "distributed/tuplestore.h" +#include "distributed/colocation_utils.h" +#include "distributed/utils/citus_stat_tenants.h" +#include "executor/execdesc.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "sys/time.h" +#include "utils/builtins.h" +#include "utils/datetime.h" +#include "utils/json.h" + + +#include + +static void AttributeMetricsIfApplicable(void); + +ExecutorEnd_hook_type prev_ExecutorEnd = NULL; + +#define ATTRIBUTE_PREFIX "/*{\"tId\":" +#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/" +#define STAT_TENANTS_COLUMNS 7 +#define ONE_QUERY_SCORE 1000000000 + +static char AttributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = ""; +static CmdType AttributeToCommandType = CMD_UNKNOWN; +static int AttributeToColocationGroupId = INVALID_COLOCATION_ID; + +static const char *SharedMemoryNameForMultiTenantMonitor = + "Shared memory for multi tenant monitor"; +static char *TenantTrancheName = "Tenant Tranche"; +static char *MonitorTrancheName = "Multi Tenant Monitor Tranche"; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static int CompareTenantScore(const void *leftElement, const void *rightElement); +static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime); +static void ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime); +static void EvictTenantsIfNecessary(TimestampTz queryTime); +static void RecordTenantStats(TenantStats *tenantStats); +static void CreateMultiTenantMonitor(void); +static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); +static MultiTenantMonitor * GetMultiTenantMonitor(void); +static void MultiTenantMonitorSMInit(void); +static int CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime); +static int FindTenantStats(MultiTenantMonitor *monitor); +static size_t MultiTenantMonitorshmemSize(void); +static char * ExtractTopComment(const char *inputString); +static char * EscapeCommentChars(const char *str); +static char * UnescapeCommentChars(const char *str); + +int StatTenantsLogLevel = CITUS_LOG_LEVEL_OFF; +int StatTenantsPeriod = (time_t) 60; +int StatTenantsLimit = 100; +int StatTenantsTrack = STAT_TENANTS_TRACK_NONE; + + +PG_FUNCTION_INFO_V1(citus_stat_tenants_local); +PG_FUNCTION_INFO_V1(citus_stat_tenants_local_reset); + + +/* + * citus_stat_tenants_local finds, updates and returns the statistics for tenants. + */ +Datum +citus_stat_tenants_local(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + /* + * We keep more than StatTenantsLimit tenants in our monitor. + * We do this to not lose data if a tenant falls out of top StatTenantsLimit in case they need to return soon. + * Normally we return StatTenantsLimit tenants but if returnAllTenants is true we return all of them. + */ + bool returnAllTenants = PG_GETARG_BOOL(0); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + TimestampTz monitoringTime = GetCurrentTimestamp(); + + Datum values[STAT_TENANTS_COLUMNS]; + bool isNulls[STAT_TENANTS_COLUMNS]; + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + if (monitor == NULL) + { + PG_RETURN_VOID(); + } + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + + int numberOfRowsToReturn = 0; + if (returnAllTenants) + { + numberOfRowsToReturn = monitor->tenantCount; + } + else + { + numberOfRowsToReturn = Min(monitor->tenantCount, StatTenantsLimit); + } + + for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + { + UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); + ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); + } + SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), + CompareTenantScore); + + for (int i = 0; i < numberOfRowsToReturn; i++) + { + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + TenantStats *tenantStats = &monitor->tenants[i]; + + values[0] = Int32GetDatum(tenantStats->colocationGroupId); + values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute)); + values[2] = Int32GetDatum(tenantStats->readsInThisPeriod); + values[3] = Int32GetDatum(tenantStats->readsInLastPeriod); + values[4] = Int32GetDatum(tenantStats->readsInThisPeriod + + tenantStats->writesInThisPeriod); + values[5] = Int32GetDatum(tenantStats->readsInLastPeriod + + tenantStats->writesInLastPeriod); + values[6] = Int64GetDatum(tenantStats->score); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } + + LWLockRelease(&monitor->lock); + + PG_RETURN_VOID(); +} + + +/* + * citus_stat_tenants_local_reset resets monitor for tenant statistics + * on the local node. + */ +Datum +citus_stat_tenants_local_reset(PG_FUNCTION_ARGS) +{ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + monitor->tenantCount = 0; + + PG_RETURN_VOID(); +} + + +/* + * AttributeQueryIfAnnotated checks the query annotation and if the query is annotated + * for the tenant statistics monitoring this function records the tenant attributes. + */ +void +AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) +{ + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE) + { + return; + } + + strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), ""); + + if (query_string == NULL) + { + return; + } + + if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) + { + char *annotation = ExtractTopComment(query_string); + if (annotation != NULL) + { + Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); + + text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + char *tenantId = NULL; + if (tenantIdTextP != NULL) + { + tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); + } + + int colocationId = ExtractFieldInt32(jsonbDatum, "cId", + INVALID_COLOCATION_ID); + + AttributeTask(tenantId, colocationId, commandType); + } + } +} + + +/* + * AttributeTask assigns the given attributes of a tenant and starts a timer + */ +void +AttributeTask(char *tenantId, int colocationId, CmdType commandType) +{ + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || + tenantId == NULL || colocationId == INVALID_COLOCATION_ID) + { + return; + } + + AttributeToColocationGroupId = colocationId; + strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId, + MAX_TENANT_ATTRIBUTE_LENGTH - 1); + AttributeToCommandType = commandType; +} + + +/* + * AnnotateQuery annotates the query with tenant attributes. + */ +char * +AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId) +{ + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL) + { + return queryString; + } + + char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue, + partitionKeyValue->consttype); + + char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString); + StringInfo escapedSourceName = makeStringInfo(); + + escape_json(escapedSourceName, commentCharsEscaped); + + StringInfo newQuery = makeStringInfo(); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data, + colocationId); + + appendStringInfoString(newQuery, queryString); + + return newQuery->data; +} + + +/* + * CitusAttributeToEnd keeps the statistics for the tenant and calls the previously installed end hook + * or the standard executor end function. + */ +void +CitusAttributeToEnd(QueryDesc *queryDesc) +{ + /* + * At the end of the Executor is the last moment we have to attribute the previous + * attribution to a tenant, if applicable + */ + AttributeMetricsIfApplicable(); + + /* now call in to the previously installed hook, or the standard implementation */ + if (prev_ExecutorEnd) + { + prev_ExecutorEnd(queryDesc); + } + else + { + standard_ExecutorEnd(queryDesc); + } +} + + +/* + * CompareTenantScore is used to sort the tenant statistics by score + * in descending order. + */ +static int +CompareTenantScore(const void *leftElement, const void *rightElement) +{ + const TenantStats *leftTenant = (const TenantStats *) leftElement; + const TenantStats *rightTenant = (const TenantStats *) rightElement; + + if (leftTenant->score > rightTenant->score) + { + return -1; + } + else if (leftTenant->score < rightTenant->score) + { + return 1; + } + return 0; +} + + +/* + * AttributeMetricsIfApplicable updates the metrics for current tenant's statistics + */ +static void +AttributeMetricsIfApplicable() +{ + if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || + AttributeToTenant[0] == '\0') + { + return; + } + + TimestampTz queryTime = GetCurrentTimestamp(); + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + /* + * We need to acquire the monitor lock in shared mode to check if the tenant is + * already in the monitor. If it is not, we need to acquire the lock in + * exclusive mode to add the tenant to the monitor. + * + * We need to check again if the tenant is in the monitor after acquiring the + * exclusive lock to avoid adding the tenant twice. Some other backend might + * have added the tenant while we were waiting for the lock. + * + * After releasing the exclusive lock, we need to acquire the lock in shared + * mode to update the tenant's statistics. We need to check again if the tenant + * is in the monitor after acquiring the shared lock because some other backend + * might have removed the tenant while we were waiting for the lock. + */ + LWLockAcquire(&monitor->lock, LW_SHARED); + + int currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex != -1) + { + TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); + + UpdatePeriodsIfNecessary(tenantStats, queryTime); + ReduceScoreIfNecessary(tenantStats, queryTime); + RecordTenantStats(tenantStats); + + LWLockRelease(&tenantStats->lock); + } + else + { + LWLockRelease(&monitor->lock); + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex == -1) + { + currentTenantIndex = CreateTenantStats(monitor, queryTime); + } + + LWLockRelease(&monitor->lock); + + LWLockAcquire(&monitor->lock, LW_SHARED); + currentTenantIndex = FindTenantStats(monitor); + if (currentTenantIndex != -1) + { + TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); + + UpdatePeriodsIfNecessary(tenantStats, queryTime); + ReduceScoreIfNecessary(tenantStats, queryTime); + RecordTenantStats(tenantStats); + + LWLockRelease(&tenantStats->lock); + } + } + LWLockRelease(&monitor->lock); + + strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), ""); +} + + +/* + * UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed. + * + * If 1 period has passed after the latest query, this function moves this period's counts to the last period + * and cleans this period's statistics. + * + * If 2 or more periods has passed after the last query, this function cleans all both this and last period's + * statistics. + */ +static void +UpdatePeriodsIfNecessary(TenantStats *tenantStats, TimestampTz queryTime) +{ + long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC; + TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds); + + /* + * If the last query in this tenant was before the start of current period + * but there are some query count for this period we move them to the last period. + */ + if (tenantStats->lastQueryTime < periodStart && + (tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod)) + { + tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod; + tenantStats->writesInThisPeriod = 0; + + tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod; + tenantStats->readsInThisPeriod = 0; + } + + /* + * If the last query is more than two periods ago, we clean the last period counts too. + */ + if (TimestampDifferenceExceeds(tenantStats->lastQueryTime, periodStart, + periodInMicroSeconds)) + { + tenantStats->writesInLastPeriod = 0; + + tenantStats->readsInLastPeriod = 0; + } + + tenantStats->lastQueryTime = queryTime; +} + + +/* + * ReduceScoreIfNecessary reduces the tenant score only if it is necessary. + * + * We halve the tenants' scores after each period. This function checks the number of + * periods that passed after the lsat score reduction and reduces the score accordingly. + */ +static void +ReduceScoreIfNecessary(TenantStats *tenantStats, TimestampTz queryTime) +{ + long long int periodInMicroSeconds = StatTenantsPeriod * USECS_PER_SEC; + TimestampTz periodStart = queryTime - (queryTime % periodInMicroSeconds); + + /* + * With each query we increase the score of tenant by ONE_QUERY_SCORE. + * After one period we halve the scores. + * + * Here we calculate how many periods passed after the last time we did score reduction + * If the latest score reduction was in this period this number should be 0, + * if it was in the last period this number should be 1 and so on. + */ + int periodCountAfterLastScoreReduction = (periodStart - + tenantStats->lastScoreReduction + + periodInMicroSeconds - 1) / + periodInMicroSeconds; + + /* + * This should not happen but let's make sure + */ + if (periodCountAfterLastScoreReduction < 0) + { + periodCountAfterLastScoreReduction = 0; + } + + /* + * If the last score reduction was not in this period we do score reduction now. + */ + if (periodCountAfterLastScoreReduction > 0) + { + tenantStats->score >>= periodCountAfterLastScoreReduction; + tenantStats->lastScoreReduction = queryTime; + } +} + + +/* + * EvictTenantsIfNecessary sorts and evicts the tenants if the tenant count is more than or + * equal to 3 * StatTenantsLimit. + */ +static void +EvictTenantsIfNecessary(TimestampTz queryTime) +{ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + /* + * We keep up to StatTenantsLimit * 3 tenants instead of StatTenantsLimit, + * so we don't lose data immediately after a tenant is out of top StatTenantsLimit + * + * Every time tenant count hits StatTenantsLimit * 3, we reduce it back to StatTenantsLimit * 2. + */ + if (monitor->tenantCount >= StatTenantsLimit * 3) + { + for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + { + ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime); + } + SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), + CompareTenantScore); + monitor->tenantCount = StatTenantsLimit * 2; + } +} + + +/* + * RecordTenantStats records the query statistics for the tenant. + */ +static void +RecordTenantStats(TenantStats *tenantStats) +{ + if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE) + { + tenantStats->score += ONE_QUERY_SCORE; + } + else + { + tenantStats->score = LLONG_MAX; + } + + if (AttributeToCommandType == CMD_SELECT) + { + tenantStats->readsInThisPeriod++; + } + else if (AttributeToCommandType == CMD_UPDATE || + AttributeToCommandType == CMD_INSERT || + AttributeToCommandType == CMD_DELETE) + { + tenantStats->writesInThisPeriod++; + } +} + + +/* + * CreateMultiTenantMonitor creates the data structure for multi tenant monitor. + */ +static void +CreateMultiTenantMonitor() +{ + MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor(); + monitor->tenantCount = 0; +} + + +/* + * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. + */ +static MultiTenantMonitor * +CreateSharedMemoryForMultiTenantMonitor() +{ + bool found = false; + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); + if (found) + { + return monitor; + } + + monitor->namedLockTranche.trancheId = LWLockNewTrancheId(); + monitor->namedLockTranche.trancheName = MonitorTrancheName; + + LWLockRegisterTranche(monitor->namedLockTranche.trancheId, + monitor->namedLockTranche.trancheName); + LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId); + + return monitor; +} + + +/* + * GetMultiTenantMonitor returns the data structure for multi tenant monitor. + */ +static MultiTenantMonitor * +GetMultiTenantMonitor() +{ + bool found = false; + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); + + if (!found) + { + elog(WARNING, "monitor not found"); + return NULL; + } + + return monitor; +} + + +/* + * InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook + * so that the multi tenant monitor can be initialized and stored in shared memory. + */ +void +InitializeMultiTenantMonitorSMHandleManagement() +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = MultiTenantMonitorSMInit; +} + + +/* + * MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData. + */ +static void +MultiTenantMonitorSMInit() +{ + CreateMultiTenantMonitor(); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * CreateTenantStats creates the data structure for a tenant's statistics. + * + * Calling this function should be protected by the monitor->lock in LW_EXCLUSIVE mode. + */ +static int +CreateTenantStats(MultiTenantMonitor *monitor, TimestampTz queryTime) +{ + /* + * If the tenant count reached 3 * StatTenantsLimit, we evict the tenants + * with the lowest score. + */ + EvictTenantsIfNecessary(queryTime); + + int tenantIndex = monitor->tenantCount; + + memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex])); + + strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, + sizeof(monitor->tenants[tenantIndex].tenantAttribute), AttributeToTenant); + monitor->tenants[tenantIndex].colocationGroupId = AttributeToColocationGroupId; + + monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId(); + monitor->tenants[tenantIndex].namedLockTranche.trancheName = TenantTrancheName; + + LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, + monitor->tenants[tenantIndex].namedLockTranche.trancheName); + LWLockInitialize(&monitor->tenants[tenantIndex].lock, + monitor->tenants[tenantIndex].namedLockTranche.trancheId); + + monitor->tenantCount++; + + return tenantIndex; +} + + +/* + * FindTenantStats finds the index for the current tenant's statistics. + */ +static int +FindTenantStats(MultiTenantMonitor *monitor) +{ + for (int i = 0; i < monitor->tenantCount; i++) + { + TenantStats *tenantStats = &monitor->tenants[i]; + if (strcmp(tenantStats->tenantAttribute, AttributeToTenant) == 0 && + tenantStats->colocationGroupId == AttributeToColocationGroupId) + { + return i; + } + } + + return -1; +} + + +/* + * MultiTenantMonitorshmemSize calculates the size of the multi tenant monitor using + * StatTenantsLimit parameter. + */ +static size_t +MultiTenantMonitorshmemSize(void) +{ + Size size = sizeof(MultiTenantMonitor); + size = add_size(size, mul_size(sizeof(TenantStats), StatTenantsLimit * 3)); + + return size; +} + + +/* + * ExtractTopComment extracts the top-level multi-line comment from a given input string. + */ +static char * +ExtractTopComment(const char *inputString) +{ + int commentCharsLength = 2; + int inputStringLen = strlen(inputString); + if (inputStringLen < commentCharsLength) + { + return NULL; + } + + const char *commentStartChars = "/*"; + const char *commentEndChars = "*/"; + + /* If query doesn't start with a comment, return NULL */ + if (strstr(inputString, commentStartChars) != inputString) + { + return NULL; + } + + StringInfo commentData = makeStringInfo(); + + /* Skip the comment start characters */ + const char *commentStart = inputString + commentCharsLength; + + /* Find the first comment end character */ + const char *commentEnd = strstr(commentStart, commentEndChars); + if (commentEnd == NULL) + { + return NULL; + } + + /* Append the comment to the StringInfo buffer */ + int commentLength = commentEnd - commentStart; + appendStringInfo(commentData, "%.*s", commentLength, commentStart); + + /* Return the extracted comment */ + return commentData->data; +} + + +/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */ +static char * +EscapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo escapedString = makeStringInfo(); + + for (int originalStringIndex = 0; originalStringIndex < originalStringLength; + originalStringIndex++) + { + if (str[originalStringIndex] == '*' || str[originalStringIndex] == '/') + { + appendStringInfoChar(escapedString, '\\'); + } + + appendStringInfoChar(escapedString, str[originalStringIndex]); + } + + return escapedString->data; +} + + +/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */ +static char * +UnescapeCommentChars(const char *str) +{ + int originalStringLength = strlen(str); + StringInfo unescapedString = makeStringInfo(); + + for (int originalStringindex = 0; originalStringindex < originalStringLength; + originalStringindex++) + { + if (str[originalStringindex] == '\\' && + originalStringindex < originalStringLength - 1 && + (str[originalStringindex + 1] == '*' || + str[originalStringindex + 1] == '/')) + { + originalStringindex++; + } + appendStringInfoChar(unescapedString, str[originalStringindex]); + } + + return unescapedString->data; +} diff --git a/src/backend/distributed/utils/jsonbutils.c b/src/backend/distributed/utils/jsonbutils.c index 22fa4f568..4855ee004 100644 --- a/src/backend/distributed/utils/jsonbutils.c +++ b/src/backend/distributed/utils/jsonbutils.c @@ -83,6 +83,25 @@ ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue) } +/* + * ExtractFieldInt32 gets value of fieldName from jsonbDoc, or returns + * defaultValue if it doesn't exist. + */ +int32 +ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue) +{ + Datum jsonbDatum = 0; + bool found = ExtractFieldJsonb(jsonbDoc, fieldName, &jsonbDatum, false); + if (!found) + { + return defaultValue; + } + + Datum int32Datum = DirectFunctionCall1(jsonb_int4, jsonbDatum); + return DatumGetInt32(int32Datum); +} + + /* * ExtractFieldTextP gets value of fieldName as text* from jsonbDoc, or * returns NULL if it doesn't exist. diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 92301fceb..f31138ac2 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan); extern bool IsCitusPlan(Plan *plan); extern bool IsCitusCustomScan(Plan *plan); +extern void SetJobColocationId(Job *job); + #endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/jsonbutils.h b/src/include/distributed/jsonbutils.h index 3e37fa38e..d44044fcb 100644 --- a/src/include/distributed/jsonbutils.h +++ b/src/include/distributed/jsonbutils.h @@ -16,5 +16,6 @@ bool ExtractFieldJsonbDatum(Datum jsonbDoc, const char *fieldName, Datum *result); text * ExtractFieldTextP(Datum jsonbDoc, const char *fieldName); bool ExtractFieldBoolean(Datum jsonbDoc, const char *fieldName, bool defaultValue); +int32 ExtractFieldInt32(Datum jsonbDoc, const char *fieldName, int32 defaultValue); #endif /* CITUS_JSONBUTILS_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index d6ad4c248..ea5d15c83 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -330,6 +330,9 @@ typedef struct Task * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ bool cannotBeExecutedInTransction; + + Const *partitionKeyValue; + int colocationId; } Task; diff --git a/src/include/distributed/utils/citus_stat_tenants.h b/src/include/distributed/utils/citus_stat_tenants.h new file mode 100644 index 000000000..dbc867071 --- /dev/null +++ b/src/include/distributed/utils/citus_stat_tenants.h @@ -0,0 +1,113 @@ +/*------------------------------------------------------------------------- + * + * citus_stat_tenants.h + * Routines related to the multi tenant monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_ATTRIBUTE_H +#define CITUS_ATTRIBUTE_H + +#include "executor/execdesc.h" +#include "executor/executor.h" +#include "storage/lwlock.h" +#include "utils/datetime.h" + +#define MAX_TENANT_ATTRIBUTE_LENGTH 100 + +/* + * TenantStats is the struct that keeps statistics about one tenant. + */ +typedef struct TenantStats +{ + /* + * The attribute value, e.g distribution column, and colocation group id + * of the tenant. + */ + char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH]; + int colocationGroupId; + + /* + * Number of SELECT queries this tenant ran in this and last periods. + */ + int readsInLastPeriod; + int readsInThisPeriod; + + /* + * Number of INSERT, UPDATE, and DELETE queries this tenant ran in this and last periods. + */ + int writesInLastPeriod; + int writesInThisPeriod; + + /* + * The latest time this tenant ran a query. This value is used to update the score later. + */ + TimestampTz lastQueryTime; + + /* + * The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query + * and halved after every period. This custom scoring mechanism is used to rank the tenants based on + * the recency and frequency of their activity. The score is used to rank the tenants and decide which + * tenants should be removed from the monitor. + */ + long long score; + + /* + * The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later. + */ + TimestampTz lastScoreReduction; + + /* + * Locks needed to update this tenant's statistics. + */ + NamedLWLockTranche namedLockTranche; + LWLock lock; +} TenantStats; + +/* + * MultiTenantMonitor is the struct for keeping the statistics + * of the tenants + */ +typedef struct MultiTenantMonitor +{ + /* + * Lock mechanism for the monitor. + * Each tenant update acquires the lock in shared mode and + * the tenant number reduction and monitor view acquires in exclusive mode. + */ + NamedLWLockTranche namedLockTranche; + LWLock lock; + + /* + * tenantCount is the number of items in the tenants array. + * The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor + * and is 3 * citus.stat_tenants_limit + */ + int tenantCount; + TenantStats tenants[FLEXIBLE_ARRAY_MEMBER]; +} MultiTenantMonitor; + +typedef enum +{ + STAT_TENANTS_TRACK_NONE = 0, + STAT_TENANTS_TRACK_ALL = 1 +} StatTenantsTrackType; + +extern void CitusAttributeToEnd(QueryDesc *queryDesc); +extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); +extern char * AnnotateQuery(char *queryString, Const *partitionKeyValue, + int colocationId); +extern void InitializeMultiTenantMonitorSMHandleManagement(void); +extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType); + +extern ExecutorEnd_hook_type prev_ExecutorEnd; + +extern int StatTenantsLogLevel; +extern int StatTenantsPeriod; +extern int StatTenantsLimit; +extern int StatTenantsTrack; + +#endif /*CITUS_ATTRIBUTE_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 2ebb31f47..65692e1c9 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -307,3 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g +# normalize tenants statistics annotations +s/\/\*\{"tId":.*\*\///g diff --git a/src/test/regress/expected/citus_stat_tenants.out b/src/test/regress/expected/citus_stat_tenants.out new file mode 100644 index 000000000..debe8b4c8 --- /dev/null +++ b/src/test/regress/expected/citus_stat_tenants.out @@ -0,0 +1,720 @@ +CREATE SCHEMA citus_stat_tenants; +SET search_path TO citus_stat_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; +-- make sure that we are tracking the tenant stats +SELECT result FROM run_command_on_all_nodes('SHOW citus.stat_tenants_track'); + result +--------------------------------------------------------------------- + all + all + all +(3 rows) + +CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() +RETURNS VOID +LANGUAGE C +AS 'citus', $$sleep_until_next_period$$; +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +-- set period to upper limit to prevent stats from being reset +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stat_tenants_period TO 86400'); + result +--------------------------------------------------------------------- + ALTER SYSTEM + ALTER SYSTEM + ALTER SYSTEM +(3 rows) + +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 0 | 1 | 0 + 2 | 0 | 0 | 1 | 0 + 3 | 0 | 0 | 1 | 0 + 4 | 0 | 0 | 1 | 0 + 5 | 0 | 0 | 1 | 0 +(5 rows) + +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- +(0 rows) + +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) WHERE tenant_attribute = '1'; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +-- test scoring +-- all of these distribution column values are from second worker +SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport = :worker_2_port \gset +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + abcd | 1 | 1000000000 +(4 rows) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + bcde | 1 | 1000000000 + cdef | 1 | 1000000000 +(6 rows) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + bcde | 3 | 3000000000 + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + cdef | 1 | 1000000000 + defg | 1 | 1000000000 +(7 rows) + +-- test period passing +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +INSERT INTO dist_tbl VALUES (5, 'abcd'); +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 1 | 0 | 1 | 0 + 5 | 0 | 0 | 1 | 0 +(2 rows) + +-- simulate passing the period +SET citus.stat_tenants_period TO 2; +SELECT sleep_until_next_period(); + sleep_until_next_period +--------------------------------------------------------------------- + +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 1 | 0 | 1 + 5 | 0 | 0 | 0 | 1 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stat_tenants; +-- test logs +SET client_min_messages TO LOG; +SELECT count(*)>=0 FROM citus_stat_tenants; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET citus.stat_tenants_log_level TO ERROR; +SELECT count(*)>=0 FROM citus_stat_tenants; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET citus.stat_tenants_log_level TO OFF; +SELECT count(*)>=0 FROM citus_stat_tenants; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET citus.stat_tenants_log_level TO LOG; +SELECT count(*)>=0 FROM citus_stat_tenants; +LOG: Generating citus_stat_tenants +CONTEXT: PL/pgSQL function citus_stat_tenants(boolean) line XX at RAISE + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SET citus.stat_tenants_log_level TO DEBUG; +SELECT count(*)>=0 FROM citus_stat_tenants; +LOG: Generating citus_stat_tenants +CONTEXT: PL/pgSQL function citus_stat_tenants(boolean) line XX at RAISE + ?column? +--------------------------------------------------------------------- + t +(1 row) + +RESET client_min_messages; +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +-- test turning monitoring on/off +SET citus.stat_tenants_track TO "NONE"; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +INSERT INTO dist_tbl VALUES (1, 1); +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- +(0 rows) + +SET citus.stat_tenants_track TO "ALL"; +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- +(0 rows) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +INSERT INTO dist_tbl VALUES (1, 1); +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +-- test special and multibyte characters in tenant attribute +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +TRUNCATE TABLE dist_tbl_text; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +\c - - - :worker_2_port +SET search_path TO citus_stat_tenants; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /*bcde | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 1 | 0 + /b*cde | 1 | 0 | 1 | 0 + /b/**/cde | 1 | 0 | 1 | 0 + /b/*/cde | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + b/*//cde | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 + bcde*/ | 1 | 0 | 1 | 0 +(10 rows) + +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +-- test local queries +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 +(4 rows) + +-- test local cached queries & prepared statements +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 4 | 0 | 4 | 0 + /bcde | 4 | 0 | 4 | 0 + äbc | 4 | 0 | 4 | 0 + bcde* | 4 | 0 | 4 | 0 +(4 rows) + +\c - - - :master_port +SET search_path TO citus_stat_tenants; +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SET search_path TO citus_stat_tenants; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 7 | 0 | 7 | 0 + /bcde | 7 | 0 | 7 | 0 + äbc | 7 | 0 | 7 | 0 + bcde* | 7 | 0 | 7 | 0 +(4 rows) + +\c - - - :master_port +SET search_path TO citus_stat_tenants; +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + thisisaverylooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooo | 1 | 0 | 1 | 0 +(1 row) + +-- test role permissions +CREATE ROLE stats_non_superuser WITH LOGIN; +SET ROLE stats_non_superuser; +SELECT count(*)>=0 FROM citus_stat_tenants; +ERROR: permission denied for view citus_stat_tenants +SELECT count(*)>=0 FROM citus_stat_tenants_local; +ERROR: permission denied for view citus_stat_tenants_local +SELECT count(*)>=0 FROM citus_stat_tenants(); +ERROR: permission denied for function citus_stat_tenants +SELECT count(*)>=0 FROM citus_stat_tenants_local(); +ERROR: permission denied for function citus_stat_tenants_local +RESET ROLE; +GRANT pg_monitor TO stats_non_superuser; +SET ROLE stats_non_superuser; +SELECT count(*)>=0 FROM citus_stat_tenants; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM citus_stat_tenants_local; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM citus_stat_tenants(); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM citus_stat_tenants_local(); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +RESET ROLE; +DROP ROLE stats_non_superuser; +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stat_tenants CASCADE; diff --git a/src/test/regress/expected/failure_multi_dml.out b/src/test/regress/expected/failure_multi_dml.out index 7ca8a8f91..bbea2c999 100644 --- a/src/test/regress/expected/failure_multi_dml.out +++ b/src/test/regress/expected/failure_multi_dml.out @@ -25,7 +25,7 @@ SELECT citus.clear_network_traffic(); ---- test multiple statements spanning multiple shards, ---- at each significant point. These transactions are 2pc -- fail at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -54,7 +54,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -83,7 +83,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- fail at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -110,7 +110,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -137,7 +137,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- fail at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -163,7 +163,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_multi_row_insert.out b/src/test/regress/expected/failure_multi_row_insert.out index 8948be94e..f3cd4919a 100644 --- a/src/test/regress/expected/failure_multi_row_insert.out +++ b/src/test/regress/expected/failure_multi_row_insert.out @@ -36,7 +36,7 @@ SELECT create_reference_table('reference_table'); -- (d) multi-row INSERT that hits multiple shards in multiple workers -- (e) multi-row INSERT to a reference table -- Failure and cancellation on multi-row INSERT that hits the same shard with the same value -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_ref_tables.out b/src/test/regress/expected/failure_ref_tables.out index 6485691af..4984cc1bf 100644 --- a/src/test/regress/expected/failure_ref_tables.out +++ b/src/test/regress/expected/failure_ref_tables.out @@ -26,7 +26,7 @@ SELECT COUNT(*) FROM ref_table; (1 row) -- verify behavior of single INSERT; should fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -41,7 +41,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=5; (1 row) -- verify behavior of UPDATE ... RETURNING; should not execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -56,7 +56,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=7; (1 row) -- verify fix to #2214; should raise error and fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_replicated_partitions.out b/src/test/regress/expected/failure_replicated_partitions.out index 4ae2d604c..7294df98b 100644 --- a/src/test/regress/expected/failure_replicated_partitions.out +++ b/src/test/regress/expected/failure_replicated_partitions.out @@ -21,7 +21,7 @@ CREATE TABLE partitioned_table_0 PARTITION OF partitioned_table (dist_key, partition_id) FOR VALUES IN ( 0 ); INSERT INTO partitioned_table VALUES (0, 0); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_single_mod.out b/src/test/regress/expected/failure_single_mod.out index 54db33ff6..2a6ed2d77 100644 --- a/src/test/regress/expected/failure_single_mod.out +++ b/src/test/regress/expected/failure_single_mod.out @@ -20,7 +20,7 @@ SELECT create_distributed_table('mod_test', 'key'); (1 row) -- verify behavior of single INSERT; should mark shard as failed -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -52,7 +52,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) INSERT INTO mod_test VALUES (2, 6); -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -78,7 +78,7 @@ WHERE shardid IN ( TRUNCATE mod_test; -- verify behavior of multi-statement modifications to a single shard -- should fail the transaction and never mark placements inactive -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index 5d17cc4ad..1b60f3125 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -23,7 +23,7 @@ SELECT create_distributed_table('select_test', 'key'); -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); mitmproxy --------------------------------------------------------------------- @@ -45,7 +45,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin -- kill after first SELECT; txn should fail as INSERT triggers -- 2PC (and placementis not marked bad) -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); mitmproxy --------------------------------------------------------------------- @@ -66,7 +66,7 @@ TRUNCATE select_test; -- now the same tests with query cancellation -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -77,7 +77,7 @@ ERROR: canceling statement due to user request SELECT * FROM select_test WHERE key = 3; ERROR: canceling statement due to user request -- cancel after first SELECT; txn should fail and nothing should be marked as invalid -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -107,7 +107,7 @@ SELECT citus.mitmproxy('conn.allow()'); TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -126,7 +126,7 @@ SELECT * FROM select_test WHERE key = 3; ERROR: canceling statement due to user request COMMIT; -- error after second SELECT; txn should fails the transaction -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()'); mitmproxy --------------------------------------------------------------------- @@ -144,7 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open COMMIT; -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); mitmproxy --------------------------------------------------------------------- @@ -173,7 +173,7 @@ SELECT create_distributed_table('select_test', 'key'); SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()'); mitmproxy --------------------------------------------------------------------- @@ -188,7 +188,7 @@ SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- now the same test with query cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index d0ed4f82a..85e77160a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1366,9 +1366,15 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_mark_node_not_synced(integer,integer) void | function citus_internal_start_replication_origin_tracking() void | function citus_internal_stop_replication_origin_tracking() void + | function citus_stat_tenants(boolean) SETOF record + | function citus_stat_tenants_local(boolean) SETOF record + | function citus_stat_tenants_local_reset() void + | function citus_stat_tenants_reset() void | function worker_adjust_identity_column_seq_ranges(regclass) void | function worker_drop_all_shell_tables(boolean) -(6 rows) + | view citus_stat_tenants + | view citus_stat_tenants_local +(12 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_metadata_access.out b/src/test/regress/expected/multi_metadata_access.out index 0503f2fc7..ec840dd72 100644 --- a/src/test/regress/expected/multi_metadata_access.out +++ b/src/test/regress/expected/multi_metadata_access.out @@ -21,7 +21,9 @@ ORDER BY 1; --------------------------------------------------------------------- pg_dist_authinfo pg_dist_clock_logical_seq -(2 rows) + citus_stat_tenants_local + citus_stat_tenants +(4 rows) RESET role; DROP USER no_access; diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index a234c4bac..29db2faea 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -125,6 +125,10 @@ ORDER BY 1; function citus_stat_activity() function citus_stat_statements() function citus_stat_statements_reset() + function citus_stat_tenants(boolean) + function citus_stat_tenants_local(boolean) + function citus_stat_tenants_local_reset() + function citus_stat_tenants_reset() function citus_table_is_visible(oid) function citus_table_size(regclass) function citus_task_wait(bigint,citus_task_status) @@ -322,7 +326,9 @@ ORDER BY 1; view citus_shards_on_worker view citus_stat_activity view citus_stat_statements + view citus_stat_tenants + view citus_stat_tenants_local view pg_dist_shard_placement view time_partitions -(316 rows) +(322 rows) diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index ee81bde38..cefb1777f 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -102,6 +102,11 @@ test: pg13_propagate_statistics # ---------- test: citus_update_table_statistics +# ---------- +# Test for tenant statistics +# ---------- +test: citus_stat_tenants + # ---------- # Parallel TPC-H tests to check our distributed execution behavior # ---------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 544cd6ba1..7b02ca5cc 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -489,6 +489,8 @@ push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); push(@pgOptions, "citus.enable_change_data_capture=on"); +push(@pgOptions, "citus.stat_tenants_limit = 10"); +push(@pgOptions, "citus.stat_tenants_track = 'ALL'"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/citus_stat_tenants.sql b/src/test/regress/sql/citus_stat_tenants.sql new file mode 100644 index 000000000..61de57511 --- /dev/null +++ b/src/test/regress/sql/citus_stat_tenants.sql @@ -0,0 +1,235 @@ +CREATE SCHEMA citus_stat_tenants; +SET search_path TO citus_stat_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; + +-- make sure that we are tracking the tenant stats +SELECT result FROM run_command_on_all_nodes('SHOW citus.stat_tenants_track'); + +CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() +RETURNS VOID +LANGUAGE C +AS 'citus', $$sleep_until_next_period$$; + +SELECT citus_stat_tenants_reset(); + +-- set period to upper limit to prevent stats from being reset +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stat_tenants_period TO 86400'); +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute; + +SELECT citus_stat_tenants_reset(); + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) ORDER BY tenant_attribute; + +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants(true) WHERE tenant_attribute = '1'; + +-- test scoring +-- all of these distribution column values are from second worker +SELECT nodeid AS worker_2_nodeid FROM pg_dist_node WHERE nodeport = :worker_2_port \gset + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg'; + +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stat_tenants(true) WHERE nodeid = :worker_2_nodeid ORDER BY score DESC, tenant_attribute; + +-- test period passing +SELECT citus_stat_tenants_reset(); + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; +INSERT INTO dist_tbl VALUES (5, 'abcd'); + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + +-- simulate passing the period +SET citus.stat_tenants_period TO 2; +SELECT sleep_until_next_period(); + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + +\c - - - :master_port +SET search_path TO citus_stat_tenants; + +-- test logs +SET client_min_messages TO LOG; +SELECT count(*)>=0 FROM citus_stat_tenants; +SET citus.stat_tenants_log_level TO ERROR; +SELECT count(*)>=0 FROM citus_stat_tenants; +SET citus.stat_tenants_log_level TO OFF; +SELECT count(*)>=0 FROM citus_stat_tenants; +SET citus.stat_tenants_log_level TO LOG; +SELECT count(*)>=0 FROM citus_stat_tenants; +SET citus.stat_tenants_log_level TO DEBUG; +SELECT count(*)>=0 FROM citus_stat_tenants; +RESET client_min_messages; + +SELECT citus_stat_tenants_reset(); + +-- test turning monitoring on/off +SET citus.stat_tenants_track TO "NONE"; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; +INSERT INTO dist_tbl VALUES (1, 1); + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + +SET citus.stat_tenants_track TO "ALL"; + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; +INSERT INTO dist_tbl VALUES (1, 1); + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + +-- test special and multibyte characters in tenant attribute +SELECT citus_stat_tenants_reset(); +TRUNCATE TABLE dist_tbl_text; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/*bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'b/*//cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/*/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b/**/cde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*/'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SET search_path TO citus_stat_tenants; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + +SELECT citus_stat_tenants_reset(); + +-- test local queries +-- all of these distribution column values are from second worker + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + +-- test local cached queries & prepared statements + +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; + +\c - - - :master_port +SET search_path TO citus_stat_tenants; + +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); + +\c - - - :worker_2_port +SET search_path TO citus_stat_tenants; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + +\c - - - :master_port +SET search_path TO citus_stat_tenants; + +SELECT citus_stat_tenants_reset(); +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'thisisaveryloooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooongname'; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; + +-- test role permissions +CREATE ROLE stats_non_superuser WITH LOGIN; +SET ROLE stats_non_superuser; + +SELECT count(*)>=0 FROM citus_stat_tenants; +SELECT count(*)>=0 FROM citus_stat_tenants_local; +SELECT count(*)>=0 FROM citus_stat_tenants(); +SELECT count(*)>=0 FROM citus_stat_tenants_local(); + +RESET ROLE; +GRANT pg_monitor TO stats_non_superuser; + +SET ROLE stats_non_superuser; + +SELECT count(*)>=0 FROM citus_stat_tenants; +SELECT count(*)>=0 FROM citus_stat_tenants_local; +SELECT count(*)>=0 FROM citus_stat_tenants(); +SELECT count(*)>=0 FROM citus_stat_tenants_local(); + +RESET ROLE; +DROP ROLE stats_non_superuser; + +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stat_tenants CASCADE; diff --git a/src/test/regress/sql/failure_multi_dml.sql b/src/test/regress/sql/failure_multi_dml.sql index 390c01461..f62ede4d5 100644 --- a/src/test/regress/sql/failure_multi_dml.sql +++ b/src/test/regress/sql/failure_multi_dml.sql @@ -21,7 +21,7 @@ SELECT citus.clear_network_traffic(); ---- at each significant point. These transactions are 2pc -- fail at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -35,7 +35,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -49,7 +49,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- fail at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -63,7 +63,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -77,7 +77,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- fail at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -91,7 +91,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; diff --git a/src/test/regress/sql/failure_multi_row_insert.sql b/src/test/regress/sql/failure_multi_row_insert.sql index 53ab8a84d..cfc98f719 100644 --- a/src/test/regress/sql/failure_multi_row_insert.sql +++ b/src/test/regress/sql/failure_multi_row_insert.sql @@ -30,7 +30,7 @@ SELECT create_reference_table('reference_table'); -- Failure and cancellation on multi-row INSERT that hits the same shard with the same value -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3); -- this test is broken, see https://github.com/citusdata/citus/issues/2460 diff --git a/src/test/regress/sql/failure_ref_tables.sql b/src/test/regress/sql/failure_ref_tables.sql index 0088a375e..29b90dc22 100644 --- a/src/test/regress/sql/failure_ref_tables.sql +++ b/src/test/regress/sql/failure_ref_tables.sql @@ -17,19 +17,19 @@ SELECT citus.clear_network_traffic(); SELECT COUNT(*) FROM ref_table; -- verify behavior of single INSERT; should fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO ref_table VALUES (5, 6); SELECT COUNT(*) FROM ref_table WHERE key=5; -- verify behavior of UPDATE ... RETURNING; should not execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); UPDATE ref_table SET key=7 RETURNING value; SELECT COUNT(*) FROM ref_table WHERE key=7; -- verify fix to #2214; should raise error and fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; DELETE FROM ref_table WHERE key=5; diff --git a/src/test/regress/sql/failure_replicated_partitions.sql b/src/test/regress/sql/failure_replicated_partitions.sql index 1ea79fc83..fbe6ec7a0 100644 --- a/src/test/regress/sql/failure_replicated_partitions.sql +++ b/src/test/regress/sql/failure_replicated_partitions.sql @@ -19,7 +19,7 @@ CREATE TABLE partitioned_table_0 INSERT INTO partitioned_table VALUES (0, 0); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO partitioned_table VALUES (0, 0); diff --git a/src/test/regress/sql/failure_single_mod.sql b/src/test/regress/sql/failure_single_mod.sql index e4dfc8f9f..48fdddcc6 100644 --- a/src/test/regress/sql/failure_single_mod.sql +++ b/src/test/regress/sql/failure_single_mod.sql @@ -8,7 +8,7 @@ CREATE TABLE mod_test (key int, value text); SELECT create_distributed_table('mod_test', 'key'); -- verify behavior of single INSERT; should mark shard as failed -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO mod_test VALUES (2, 6); SELECT COUNT(*) FROM mod_test WHERE key=2; @@ -24,7 +24,7 @@ TRUNCATE mod_test; SELECT citus.mitmproxy('conn.allow()'); INSERT INTO mod_test VALUES (2, 6); -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key; SELECT COUNT(*) FROM mod_test WHERE value='ok'; @@ -38,7 +38,7 @@ TRUNCATE mod_test; -- verify behavior of multi-statement modifications to a single shard -- should fail the transaction and never mark placements inactive -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; INSERT INTO mod_test VALUES (2, 6); diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index 8dfb33d3e..c8218c950 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -13,13 +13,13 @@ SELECT create_distributed_table('select_test', 'key'); -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3; -- kill after first SELECT; txn should fail as INSERT triggers -- 2PC (and placementis not marked bad) -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -35,12 +35,12 @@ TRUNCATE select_test; -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3; -- cancel after first SELECT; txn should fail and nothing should be marked as invalid -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -58,7 +58,7 @@ TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -68,7 +68,7 @@ SELECT * FROM select_test WHERE key = 3; COMMIT; -- error after second SELECT; txn should fails the transaction -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -77,7 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; COMMIT; -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions(); @@ -93,12 +93,12 @@ SELECT create_distributed_table('select_test', 'key'); SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()'); SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1; -- now the same test with query cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1;