mirror of https://github.com/citusdata/citus.git
Add multi tenant statistics monitoring
parent
372a93b529
commit
d6603390ab
|
@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache(
|
||||||
DistributedPlan *originalDistributedPlan);
|
DistributedPlan *originalDistributedPlan);
|
||||||
static void CitusEndScan(CustomScanState *node);
|
static void CitusEndScan(CustomScanState *node);
|
||||||
static void CitusReScan(CustomScanState *node);
|
static void CitusReScan(CustomScanState *node);
|
||||||
static void SetJobColocationId(Job *job);
|
|
||||||
static void EnsureForceDelegationDistributionKey(Job *job);
|
static void EnsureForceDelegationDistributionKey(Job *job);
|
||||||
static void EnsureAnchorShardsInJobExist(Job *job);
|
static void EnsureAnchorShardsInJobExist(Job *job);
|
||||||
static bool AnchorShardsInTaskListExist(List *taskList);
|
static bool AnchorShardsInTaskListExist(List *taskList);
|
||||||
|
@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan)
|
||||||
* colocation group, the Job's colocation ID is set to the group ID, else,
|
* colocation group, the Job's colocation ID is set to the group ID, else,
|
||||||
* it will be set to INVALID_COLOCATION_ID.
|
* it will be set to INVALID_COLOCATION_ID.
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
SetJobColocationId(Job *job)
|
SetJobColocationId(Job *job)
|
||||||
{
|
{
|
||||||
uint32 jobColocationId = INVALID_COLOCATION_ID;
|
uint32 jobColocationId = INVALID_COLOCATION_ID;
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_router_planner.h"
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
|
#include "distributed/utils/attribute.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "lib/stringinfo.h"
|
#include "lib/stringinfo.h"
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
|
@ -141,6 +142,21 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
? "(null)"
|
? "(null)"
|
||||||
: TaskQueryString(task))));
|
: TaskQueryString(task))));
|
||||||
|
|
||||||
|
Datum partitionColumnValue;
|
||||||
|
Oid partitionColumnType = 0;
|
||||||
|
char *partitionColumnString = NULL;
|
||||||
|
if (workerJob->partitionKeyValue != NULL)
|
||||||
|
{
|
||||||
|
partitionColumnValue = workerJob->partitionKeyValue->constvalue;
|
||||||
|
partitionColumnType = workerJob->partitionKeyValue->consttype;
|
||||||
|
partitionColumnString = DatumToString(partitionColumnValue,
|
||||||
|
partitionColumnType);
|
||||||
|
}
|
||||||
|
|
||||||
|
task->partitionColumn = partitionColumnString;
|
||||||
|
SetJobColocationId(workerJob);
|
||||||
|
task->colocationId = workerJob->colocationId;
|
||||||
|
|
||||||
UpdateTaskQueryString(query, task);
|
UpdateTaskQueryString(query, task);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -387,7 +403,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SetTaskQueryString(task, DeparseTaskQuery(task, query));
|
SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query),
|
||||||
|
task->partitionColumn, task->colocationId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@
|
||||||
#include "distributed/recursive_planning.h"
|
#include "distributed/recursive_planning.h"
|
||||||
#include "distributed/shardinterval_utils.h"
|
#include "distributed/shardinterval_utils.h"
|
||||||
#include "distributed/shard_utils.h"
|
#include "distributed/shard_utils.h"
|
||||||
|
#include "distributed/utils/attribute.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "distributed/worker_shard_visibility.h"
|
#include "distributed/worker_shard_visibility.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
@ -157,6 +158,8 @@ distributed_planner(Query *parse,
|
||||||
bool fastPathRouterQuery = false;
|
bool fastPathRouterQuery = false;
|
||||||
Node *distributionKeyValue = NULL;
|
Node *distributionKeyValue = NULL;
|
||||||
|
|
||||||
|
AttributeQueryIfAnnotated(query_string, parse->commandType);
|
||||||
|
|
||||||
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
List *rangeTableList = ExtractRangeTableEntryList(parse);
|
||||||
|
|
||||||
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED)
|
||||||
|
|
|
@ -165,7 +165,8 @@ static int CompareInsertValuesByShardId(const void *leftElement,
|
||||||
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
static List * SingleShardTaskList(Query *query, uint64 jobId,
|
||||||
List *relationShardList, List *placementList,
|
List *relationShardList, List *placementList,
|
||||||
uint64 shardId, bool parametersInQueryResolved,
|
uint64 shardId, bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification);
|
bool isLocalTableModification, char *partitionColumn,
|
||||||
|
int colocationId);
|
||||||
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
static bool RowLocksOnRelations(Node *node, List **rtiLockList);
|
||||||
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job,
|
||||||
TaskAssignmentPolicyType
|
TaskAssignmentPolicyType
|
||||||
|
@ -1951,11 +1952,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
|
|
||||||
if (originalQuery->commandType == CMD_SELECT)
|
if (originalQuery->commandType == CMD_SELECT)
|
||||||
{
|
{
|
||||||
|
Datum partitionColumnValue;
|
||||||
|
Oid partitionColumnType = 0;
|
||||||
|
char *partitionColumnString = NULL;
|
||||||
|
if (job->partitionKeyValue != NULL)
|
||||||
|
{
|
||||||
|
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||||
|
partitionColumnType = job->partitionKeyValue->consttype;
|
||||||
|
partitionColumnString = DatumToString(partitionColumnValue,
|
||||||
|
partitionColumnType);
|
||||||
|
}
|
||||||
|
|
||||||
|
SetJobColocationId(job);
|
||||||
|
|
||||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
isLocalTableModification);
|
isLocalTableModification,
|
||||||
|
partitionColumnString, job->colocationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Queries to reference tables, or distributed tables with multiple replica's have
|
* Queries to reference tables, or distributed tables with multiple replica's have
|
||||||
|
@ -1979,11 +1994,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
Datum partitionColumnValue;
|
||||||
|
Oid partitionColumnType = 0;
|
||||||
|
char *partitionColumnString = NULL;
|
||||||
|
if (job->partitionKeyValue != NULL)
|
||||||
|
{
|
||||||
|
partitionColumnValue = job->partitionKeyValue->constvalue;
|
||||||
|
partitionColumnType = job->partitionKeyValue->consttype;
|
||||||
|
partitionColumnString = DatumToString(partitionColumnValue,
|
||||||
|
partitionColumnType);
|
||||||
|
}
|
||||||
|
|
||||||
|
SetJobColocationId(job);
|
||||||
|
|
||||||
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
|
||||||
relationShardList, placementList,
|
relationShardList, placementList,
|
||||||
shardId,
|
shardId,
|
||||||
job->parametersInJobQueryResolved,
|
job->parametersInJobQueryResolved,
|
||||||
isLocalTableModification);
|
isLocalTableModification,
|
||||||
|
partitionColumnString, job->colocationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2077,7 +2106,8 @@ static List *
|
||||||
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
List *placementList, uint64 shardId,
|
List *placementList, uint64 shardId,
|
||||||
bool parametersInQueryResolved,
|
bool parametersInQueryResolved,
|
||||||
bool isLocalTableModification)
|
bool isLocalTableModification, char *partitionColumn,
|
||||||
|
int colocationId)
|
||||||
{
|
{
|
||||||
TaskType taskType = READ_TASK;
|
TaskType taskType = READ_TASK;
|
||||||
char replicationModel = 0;
|
char replicationModel = 0;
|
||||||
|
@ -2147,6 +2177,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
* that the query cannot be executed locally.
|
* that the query cannot be executed locally.
|
||||||
*/
|
*/
|
||||||
task->taskPlacementList = placementList;
|
task->taskPlacementList = placementList;
|
||||||
|
task->partitionColumn = partitionColumn;
|
||||||
|
task->colocationId = colocationId;
|
||||||
SetTaskQueryIfShouldLazyDeparse(task, query);
|
SetTaskQueryIfShouldLazyDeparse(task, query);
|
||||||
task->anchorShardId = shardId;
|
task->anchorShardId = shardId;
|
||||||
task->jobId = jobId;
|
task->jobId = jobId;
|
||||||
|
|
|
@ -90,6 +90,7 @@
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/transaction_recovery.h"
|
#include "distributed/transaction_recovery.h"
|
||||||
|
#include "distributed/utils/attribute.h"
|
||||||
#include "distributed/utils/directory.h"
|
#include "distributed/utils/directory.h"
|
||||||
#include "distributed/worker_log_messages.h"
|
#include "distributed/worker_log_messages.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
@ -439,6 +440,8 @@ _PG_init(void)
|
||||||
ExecutorStart_hook = CitusExecutorStart;
|
ExecutorStart_hook = CitusExecutorStart;
|
||||||
ExecutorRun_hook = CitusExecutorRun;
|
ExecutorRun_hook = CitusExecutorRun;
|
||||||
ExplainOneQuery_hook = CitusExplainOneQuery;
|
ExplainOneQuery_hook = CitusExplainOneQuery;
|
||||||
|
prev_ExecutorEnd = ExecutorEnd_hook;
|
||||||
|
ExecutorEnd_hook = CitusAttributeToEnd;
|
||||||
|
|
||||||
/* register hook for error messages */
|
/* register hook for error messages */
|
||||||
emit_log_hook = multi_log_hook;
|
emit_log_hook = multi_log_hook;
|
||||||
|
@ -472,6 +475,8 @@ _PG_init(void)
|
||||||
/* initialize shard split shared memory handle management */
|
/* initialize shard split shared memory handle management */
|
||||||
InitializeShardSplitSMHandleManagement();
|
InitializeShardSplitSMHandleManagement();
|
||||||
|
|
||||||
|
InitializeMultiTenantMonitorSMHandleManagement();
|
||||||
|
|
||||||
/* enable modification of pg_catalog tables during pg_upgrade */
|
/* enable modification of pg_catalog tables during pg_upgrade */
|
||||||
if (IsBinaryUpgrade)
|
if (IsBinaryUpgrade)
|
||||||
{
|
{
|
||||||
|
@ -1899,6 +1904,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomEnumVariable(
|
||||||
|
"citus.multi_tenant_monitoring_log_level",
|
||||||
|
gettext_noop("Sets the level of multi tenant monitoring log messages"),
|
||||||
|
NULL,
|
||||||
|
&MultiTenantMonitoringLogLevel,
|
||||||
|
CITUS_LOG_LEVEL_OFF, log_level_options,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.next_cleanup_record_id",
|
"citus.next_cleanup_record_id",
|
||||||
gettext_noop("Set the next cleanup record ID to use in operation creation."),
|
gettext_noop("Set the next cleanup record ID to use in operation creation."),
|
||||||
|
@ -2283,6 +2298,26 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.stats_tenants_limit",
|
||||||
|
gettext_noop("monitor limit"),
|
||||||
|
NULL,
|
||||||
|
&CitusStatsTenantsLimit,
|
||||||
|
10, 1, 100,
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.stats_tenants_period",
|
||||||
|
gettext_noop("monitor period"),
|
||||||
|
NULL,
|
||||||
|
&CitusStatsTenantsPeriod,
|
||||||
|
60, 1, 1000000000,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.subquery_pushdown",
|
"citus.subquery_pushdown",
|
||||||
gettext_noop("Usage of this GUC is highly discouraged, please read the long "
|
gettext_noop("Usage of this GUC is highly discouraged, please read the long "
|
||||||
|
|
|
@ -2,3 +2,4 @@
|
||||||
|
|
||||||
-- bump version to 11.3-1
|
-- bump version to 11.3-1
|
||||||
|
|
||||||
|
#include "udfs/citus_stats_tenants/11.3-1.sql"
|
||||||
|
|
|
@ -1,2 +1,4 @@
|
||||||
-- citus--11.3-1--11.2-1
|
-- citus--11.3-1--11.2-1
|
||||||
-- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now
|
|
||||||
|
DROP VIEW pg_catalog.citus_stats_tenants;
|
||||||
|
DROP FUNCTION pg_catalog.citus_stats_tenants(boolean);
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants(
|
||||||
|
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||||
|
OUT colocation_id INT,
|
||||||
|
OUT tenant_attribute TEXT,
|
||||||
|
OUT read_count_in_this_period INT,
|
||||||
|
OUT read_count_in_last_period INT,
|
||||||
|
OUT query_count_in_this_period INT,
|
||||||
|
OUT query_count_in_last_period INT,
|
||||||
|
OUT score BIGINT)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$citus_stats_tenants$$;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE VIEW citus.citus_stats_tenants AS
|
||||||
|
SELECT
|
||||||
|
colocation_id,
|
||||||
|
tenant_attribute,
|
||||||
|
read_count_in_this_period,
|
||||||
|
read_count_in_last_period,
|
||||||
|
query_count_in_this_period,
|
||||||
|
query_count_in_last_period
|
||||||
|
FROM pg_catalog.citus_stats_tenants()
|
||||||
|
ORDER BY score DESC;
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog;
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC;
|
|
@ -0,0 +1,27 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants(
|
||||||
|
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||||
|
OUT colocation_id INT,
|
||||||
|
OUT tenant_attribute TEXT,
|
||||||
|
OUT read_count_in_this_period INT,
|
||||||
|
OUT read_count_in_last_period INT,
|
||||||
|
OUT query_count_in_this_period INT,
|
||||||
|
OUT query_count_in_last_period INT,
|
||||||
|
OUT score BIGINT)
|
||||||
|
RETURNS SETOF RECORD
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$citus_stats_tenants$$;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE VIEW citus.citus_stats_tenants AS
|
||||||
|
SELECT
|
||||||
|
colocation_id,
|
||||||
|
tenant_attribute,
|
||||||
|
read_count_in_this_period,
|
||||||
|
read_count_in_last_period,
|
||||||
|
query_count_in_this_period,
|
||||||
|
query_count_in_last_period
|
||||||
|
FROM pg_catalog.citus_stats_tenants()
|
||||||
|
ORDER BY score DESC;
|
||||||
|
|
||||||
|
ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog;
|
||||||
|
GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC;
|
|
@ -0,0 +1,688 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* attribute.c
|
||||||
|
* Routines related to the multi tenant monitor.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "unistd.h"
|
||||||
|
|
||||||
|
#include "distributed/citus_safe_lib.h"
|
||||||
|
#include "distributed/log_utils.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/tuplestore.h"
|
||||||
|
#include "executor/execdesc.h"
|
||||||
|
#include "storage/ipc.h"
|
||||||
|
#include "storage/lwlock.h"
|
||||||
|
#include "storage/shmem.h"
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
|
||||||
|
#include "distributed/utils/attribute.h"
|
||||||
|
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
static void AttributeMetricsIfApplicable(void);
|
||||||
|
|
||||||
|
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||||
|
|
||||||
|
#define ATTRIBUTE_PREFIX "/* attributeTo: "
|
||||||
|
#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */"
|
||||||
|
#define CITUS_STATS_TENANTS_COLUMNS 7
|
||||||
|
#define ONE_QUERY_SCORE 1000000000
|
||||||
|
|
||||||
|
/* TODO maybe needs to be a stack */
|
||||||
|
char attributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = "";
|
||||||
|
CmdType attributeCommandType = CMD_UNKNOWN;
|
||||||
|
int colocationGroupId = -1;
|
||||||
|
clock_t attributeToTenantStart = { 0 };
|
||||||
|
|
||||||
|
const char *SharedMemoryNameForMultiTenantMonitor =
|
||||||
|
"Shared memory for multi tenant monitor";
|
||||||
|
|
||||||
|
char *tenantTrancheName = "Tenant Tranche";
|
||||||
|
char *monitorTrancheName = "Multi Tenant Monitor Tranche";
|
||||||
|
|
||||||
|
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;
|
||||||
|
|
||||||
|
static int CompareTenantScore(const void *leftElement, const void *rightElement);
|
||||||
|
static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime);
|
||||||
|
static void ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime);
|
||||||
|
static void EvictTenantsIfNecessary(time_t queryTime);
|
||||||
|
static void RecordTenantStats(TenantStats *tenantStats);
|
||||||
|
static void CreateMultiTenantMonitor(void);
|
||||||
|
static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void);
|
||||||
|
static MultiTenantMonitor * GetMultiTenantMonitor(void);
|
||||||
|
static void MultiTenantMonitorSMInit(void);
|
||||||
|
static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime);
|
||||||
|
static int FindTenantStats(MultiTenantMonitor *monitor);
|
||||||
|
static size_t MultiTenantMonitorshmemSize(void);
|
||||||
|
|
||||||
|
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
|
||||||
|
int CitusStatsTenantsPeriod = (time_t) 60;
|
||||||
|
int CitusStatsTenantsLimit = 10;
|
||||||
|
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(citus_stats_tenants);
|
||||||
|
PG_FUNCTION_INFO_V1(clean_citus_stats_tenants);
|
||||||
|
PG_FUNCTION_INFO_V1(sleep_until_next_period);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_stats_tenants finds, updates and returns the statistics for tenants.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We keep more than CitusStatsTenantsLimit tenants in our monitor.
|
||||||
|
* We do this to not lose data if a tenant falls out of top CitusStatsTenantsLimit in case they need to return soon.
|
||||||
|
* Normally we return CitusStatsTenantsLimit tenants but if returnAllTenants is true we return all of them.
|
||||||
|
*/
|
||||||
|
bool returnAllTenants = PG_GETARG_BOOL(0);
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||||
|
time_t monitoringTime = time(0);
|
||||||
|
|
||||||
|
Datum values[CITUS_STATS_TENANTS_COLUMNS];
|
||||||
|
bool isNulls[CITUS_STATS_TENANTS_COLUMNS];
|
||||||
|
|
||||||
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
|
|
||||||
|
if (monitor == NULL)
|
||||||
|
{
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
int numberOfRowsToReturn = 0;
|
||||||
|
if (returnAllTenants)
|
||||||
|
{
|
||||||
|
numberOfRowsToReturn = monitor->tenantCount;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
numberOfRowsToReturn = Min(monitor->tenantCount, CitusStatsTenantsLimit);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
|
||||||
|
{
|
||||||
|
UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
|
||||||
|
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime);
|
||||||
|
}
|
||||||
|
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
|
||||||
|
CompareTenantScore);
|
||||||
|
|
||||||
|
for (int i = 0; i < numberOfRowsToReturn; i++)
|
||||||
|
{
|
||||||
|
memset(values, 0, sizeof(values));
|
||||||
|
memset(isNulls, false, sizeof(isNulls));
|
||||||
|
|
||||||
|
TenantStats *tenantStats = &monitor->tenants[i];
|
||||||
|
|
||||||
|
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
|
||||||
|
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
|
||||||
|
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
|
||||||
|
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
|
||||||
|
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
|
||||||
|
tenantStats->writesInThisPeriod);
|
||||||
|
values[5] = Int32GetDatum(tenantStats->readsInLastPeriod +
|
||||||
|
tenantStats->writesInLastPeriod);
|
||||||
|
values[6] = Int64GetDatum(tenantStats->score);
|
||||||
|
|
||||||
|
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* clean_citus_stats_tenants cleans the citus_stats_tenants monitor.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
clean_citus_stats_tenants(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
|
monitor->tenantCount = 0;
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* sleep_until_next_period sleeps until the next monitoring period starts.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
sleep_until_next_period(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
struct timeval currentTime;
|
||||||
|
gettimeofday(¤tTime, NULL);
|
||||||
|
|
||||||
|
long int nextPeriodStart = currentTime.tv_sec -
|
||||||
|
(currentTime.tv_sec % CitusStatsTenantsPeriod) +
|
||||||
|
CitusStatsTenantsPeriod;
|
||||||
|
|
||||||
|
long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 -
|
||||||
|
currentTime.tv_usec + 100000;
|
||||||
|
pg_usleep(sleepTime);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AttributeQueryIfAnnotated checks the query annotation and if the query is annotated
|
||||||
|
* for the tenant statistics monitoring this function records the tenant attributes.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
||||||
|
{
|
||||||
|
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
|
||||||
|
|
||||||
|
attributeCommandType = commandType;
|
||||||
|
|
||||||
|
if (query_string == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
|
||||||
|
{
|
||||||
|
/* TODO create a function to safely parse the tenant identifier from the query comment */
|
||||||
|
/* query is attributed to a tenant */
|
||||||
|
char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX);
|
||||||
|
char *tenantEnd = tenantId;
|
||||||
|
while (true && tenantEnd[0] != '\0')
|
||||||
|
{
|
||||||
|
if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/')
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tenantEnd++;
|
||||||
|
}
|
||||||
|
tenantEnd--;
|
||||||
|
|
||||||
|
colocationGroupId = 0;
|
||||||
|
while (*tenantEnd != ',')
|
||||||
|
{
|
||||||
|
colocationGroupId *= 10;
|
||||||
|
colocationGroupId += *tenantEnd - '0';
|
||||||
|
tenantEnd--;
|
||||||
|
}
|
||||||
|
|
||||||
|
int t = colocationGroupId;
|
||||||
|
colocationGroupId = 0;
|
||||||
|
while (t)
|
||||||
|
{
|
||||||
|
colocationGroupId *= 10;
|
||||||
|
colocationGroupId += t % 10;
|
||||||
|
t /= 10;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* hack to get a clean copy of the tenant id string */
|
||||||
|
char tenantEndTmp = *tenantEnd;
|
||||||
|
*tenantEnd = '\0';
|
||||||
|
tenantId = pstrdup(tenantId);
|
||||||
|
*tenantEnd = tenantEndTmp;
|
||||||
|
|
||||||
|
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("attributing query to tenant: %s",
|
||||||
|
quote_literal_cstr(tenantId))));
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
|
||||||
|
attributeToTenantStart = clock();
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
|
||||||
|
}
|
||||||
|
|
||||||
|
/*DetachSegment(); */
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AnnotateQuery annotates the query with tenant attributes.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
|
||||||
|
{
|
||||||
|
if (partitionColumn == NULL)
|
||||||
|
{
|
||||||
|
return queryString;
|
||||||
|
}
|
||||||
|
StringInfo newQuery = makeStringInfo();
|
||||||
|
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId);
|
||||||
|
|
||||||
|
appendStringInfoString(newQuery, queryString);
|
||||||
|
|
||||||
|
return newQuery->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusAttributeToEnd keeps the statistics for the tenant and calls the previously installed end hook
|
||||||
|
* or the standard executor end function.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CitusAttributeToEnd(QueryDesc *queryDesc)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* At the end of the Executor is the last moment we have to attribute the previous
|
||||||
|
* attribution to a tenant, if applicable
|
||||||
|
*/
|
||||||
|
AttributeMetricsIfApplicable();
|
||||||
|
|
||||||
|
/* now call in to the previously installed hook, or the standard implementation */
|
||||||
|
if (prev_ExecutorEnd)
|
||||||
|
{
|
||||||
|
prev_ExecutorEnd(queryDesc);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
standard_ExecutorEnd(queryDesc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CompareTenantScore is used to sort the tenant statistics by score
|
||||||
|
* in descending order.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CompareTenantScore(const void *leftElement, const void *rightElement)
|
||||||
|
{
|
||||||
|
const TenantStats *leftTenant = (const TenantStats *) leftElement;
|
||||||
|
const TenantStats *rightTenant = (const TenantStats *) rightElement;
|
||||||
|
|
||||||
|
if (leftTenant->score > rightTenant->score)
|
||||||
|
{
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
else if (leftTenant->score < rightTenant->score)
|
||||||
|
{
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AttributeMetricsIfApplicable updates the metrics for current tenant's statistics
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
AttributeMetricsIfApplicable()
|
||||||
|
{
|
||||||
|
if (strcmp(attributeToTenant, "") != 0)
|
||||||
|
{
|
||||||
|
clock_t end = { 0 };
|
||||||
|
|
||||||
|
end = clock();
|
||||||
|
time_t queryTime = time(0);
|
||||||
|
double cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC;
|
||||||
|
|
||||||
|
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s",
|
||||||
|
cpu_time_used, attributeToTenant)));
|
||||||
|
}
|
||||||
|
|
||||||
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
|
|
||||||
|
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||||
|
|
||||||
|
int currentTenantIndex = FindTenantStats(monitor);
|
||||||
|
|
||||||
|
if (currentTenantIndex != -1)
|
||||||
|
{
|
||||||
|
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
||||||
|
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||||
|
ReduceScoreIfNecessary(tenantStats, queryTime);
|
||||||
|
RecordTenantStats(tenantStats);
|
||||||
|
|
||||||
|
LWLockRelease(&tenantStats->lock);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
|
LWLockAcquire(&monitor->lock, LW_EXCLUSIVE);
|
||||||
|
currentTenantIndex = FindTenantStats(monitor);
|
||||||
|
|
||||||
|
if (currentTenantIndex == -1)
|
||||||
|
{
|
||||||
|
currentTenantIndex = CreateTenantStats(monitor, queryTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(&monitor->lock);
|
||||||
|
|
||||||
|
LWLockAcquire(&monitor->lock, LW_SHARED);
|
||||||
|
currentTenantIndex = FindTenantStats(monitor);
|
||||||
|
if (currentTenantIndex != -1)
|
||||||
|
{
|
||||||
|
TenantStats *tenantStats = &monitor->tenants[currentTenantIndex];
|
||||||
|
LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
|
UpdatePeriodsIfNecessary(tenantStats, queryTime);
|
||||||
|
ReduceScoreIfNecessary(tenantStats, queryTime);
|
||||||
|
RecordTenantStats(tenantStats);
|
||||||
|
|
||||||
|
LWLockRelease(&tenantStats->lock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LWLockRelease(&monitor->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed.
|
||||||
|
*
|
||||||
|
* If 1 period has passed after the latest query, this function moves this period's counts to the last period
|
||||||
|
* and cleans this period's statistics.
|
||||||
|
*
|
||||||
|
* If 2 or more periods has passed after the last query, this function cleans all both this and last period's
|
||||||
|
* statistics.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
||||||
|
{
|
||||||
|
time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the last query in this tenant was before the start of current period
|
||||||
|
* but there are some query count for this period we move them to the last period.
|
||||||
|
*/
|
||||||
|
if (tenantStats->lastQueryTime < periodStart &&
|
||||||
|
(tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod))
|
||||||
|
{
|
||||||
|
tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod;
|
||||||
|
tenantStats->writesInThisPeriod = 0;
|
||||||
|
|
||||||
|
tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod;
|
||||||
|
tenantStats->readsInThisPeriod = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the last query is more than two periods ago, we clean the last period counts too.
|
||||||
|
*/
|
||||||
|
if (tenantStats->lastQueryTime < periodStart - CitusStatsTenantsPeriod)
|
||||||
|
{
|
||||||
|
tenantStats->writesInLastPeriod = 0;
|
||||||
|
|
||||||
|
tenantStats->readsInLastPeriod = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tenantStats->lastQueryTime = queryTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ReduceScoreIfNecessary reduces the tenant score only if it is necessary.
|
||||||
|
*
|
||||||
|
* We halve the tenants' scores after each period. This function checks the number of
|
||||||
|
* periods that passed after the lsat score reduction and reduces the score accordingly.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime)
|
||||||
|
{
|
||||||
|
time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* With each query we increase the score of tenant by ONE_QUERY_SCORE.
|
||||||
|
* After one period we halve the scores.
|
||||||
|
*
|
||||||
|
* Here we calculate how many periods passed after the last time we did score reduction
|
||||||
|
* If the latest score reduction was in this period this number should be 0,
|
||||||
|
* if it was in the last period this number should be 1 and so on.
|
||||||
|
*/
|
||||||
|
int periodCountAfterLastScoreReduction = (periodStart -
|
||||||
|
tenantStats->lastScoreReduction +
|
||||||
|
CitusStatsTenantsPeriod - 1) /
|
||||||
|
CitusStatsTenantsPeriod;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This should not happen but let's make sure
|
||||||
|
*/
|
||||||
|
if (periodCountAfterLastScoreReduction < 0)
|
||||||
|
{
|
||||||
|
periodCountAfterLastScoreReduction = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the last score reduction was not in this period we do score reduction now.
|
||||||
|
*/
|
||||||
|
if (periodCountAfterLastScoreReduction > 0)
|
||||||
|
{
|
||||||
|
tenantStats->score >>= periodCountAfterLastScoreReduction;
|
||||||
|
tenantStats->lastScoreReduction = queryTime;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EvictTenantsIfNecessary sorts and evicts the tenants if the tenant count is more than or
|
||||||
|
* equal to 3 * CitusStatsTenantsLimit.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
EvictTenantsIfNecessary(time_t queryTime)
|
||||||
|
{
|
||||||
|
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit,
|
||||||
|
* so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit
|
||||||
|
*
|
||||||
|
* Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2.
|
||||||
|
*/
|
||||||
|
if (monitor->tenantCount >= CitusStatsTenantsLimit * 3)
|
||||||
|
{
|
||||||
|
for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++)
|
||||||
|
{
|
||||||
|
ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime);
|
||||||
|
}
|
||||||
|
SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats),
|
||||||
|
CompareTenantScore);
|
||||||
|
monitor->tenantCount = CitusStatsTenantsLimit * 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RecordTenantStats records the query statistics for the tenant.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RecordTenantStats(TenantStats *tenantStats)
|
||||||
|
{
|
||||||
|
if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE)
|
||||||
|
{
|
||||||
|
tenantStats->score += ONE_QUERY_SCORE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
tenantStats->score = LLONG_MAX;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (attributeCommandType == CMD_SELECT)
|
||||||
|
{
|
||||||
|
tenantStats->readsInThisPeriod++;
|
||||||
|
}
|
||||||
|
else if (attributeCommandType == CMD_UPDATE ||
|
||||||
|
attributeCommandType == CMD_INSERT ||
|
||||||
|
attributeCommandType == CMD_DELETE)
|
||||||
|
{
|
||||||
|
tenantStats->writesInThisPeriod++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateMultiTenantMonitor creates the data structure for multi tenant monitor.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CreateMultiTenantMonitor()
|
||||||
|
{
|
||||||
|
MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor();
|
||||||
|
monitor->tenantCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor.
|
||||||
|
*/
|
||||||
|
static MultiTenantMonitor *
|
||||||
|
CreateSharedMemoryForMultiTenantMonitor()
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
|
||||||
|
MultiTenantMonitorshmemSize(),
|
||||||
|
&found);
|
||||||
|
if (found)
|
||||||
|
{
|
||||||
|
return monitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
monitor->namedLockTranche.trancheId = LWLockNewTrancheId();
|
||||||
|
monitor->namedLockTranche.trancheName = monitorTrancheName;
|
||||||
|
|
||||||
|
LWLockRegisterTranche(monitor->namedLockTranche.trancheId,
|
||||||
|
monitor->namedLockTranche.trancheName);
|
||||||
|
LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId);
|
||||||
|
|
||||||
|
return monitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetMultiTenantMonitor returns the data structure for multi tenant monitor.
|
||||||
|
*/
|
||||||
|
static MultiTenantMonitor *
|
||||||
|
GetMultiTenantMonitor()
|
||||||
|
{
|
||||||
|
bool found = false;
|
||||||
|
MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor,
|
||||||
|
MultiTenantMonitorshmemSize(),
|
||||||
|
&found);
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
elog(WARNING, "monitor not found");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return monitor;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook
|
||||||
|
* so that the multi tenant monitor can be initialized and stored in shared memory.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeMultiTenantMonitorSMHandleManagement()
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook = shmem_startup_hook;
|
||||||
|
shmem_startup_hook = MultiTenantMonitorSMInit;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MultiTenantMonitorSMInit()
|
||||||
|
{
|
||||||
|
CreateMultiTenantMonitor();
|
||||||
|
|
||||||
|
if (prev_shmem_startup_hook != NULL)
|
||||||
|
{
|
||||||
|
prev_shmem_startup_hook();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateTenantStats creates the data structure for a tenant's statistics.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If the tenant count reached 3 * CitusStatsTenantsLimit, we evict the tenants
|
||||||
|
* with the lowest score.
|
||||||
|
*/
|
||||||
|
EvictTenantsIfNecessary(queryTime);
|
||||||
|
|
||||||
|
int tenantIndex = monitor->tenantCount;
|
||||||
|
|
||||||
|
memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
|
||||||
|
|
||||||
|
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
|
||||||
|
sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant);
|
||||||
|
monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId;
|
||||||
|
|
||||||
|
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
|
||||||
|
monitor->tenants[tenantIndex].namedLockTranche.trancheName = tenantTrancheName;
|
||||||
|
|
||||||
|
LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId,
|
||||||
|
monitor->tenants[tenantIndex].namedLockTranche.trancheName);
|
||||||
|
LWLockInitialize(&monitor->tenants[tenantIndex].lock,
|
||||||
|
monitor->tenants[tenantIndex].namedLockTranche.trancheId);
|
||||||
|
|
||||||
|
monitor->tenantCount++;
|
||||||
|
|
||||||
|
return tenantIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindTenantStats finds the index for the current tenant's statistics.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
FindTenantStats(MultiTenantMonitor *monitor)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < monitor->tenantCount; i++)
|
||||||
|
{
|
||||||
|
TenantStats *tenantStats = &monitor->tenants[i];
|
||||||
|
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 &&
|
||||||
|
tenantStats->colocationGroupId == colocationGroupId)
|
||||||
|
{
|
||||||
|
return i;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MultiTenantMonitorshmemSize calculates the size of the multi tenant monitor using
|
||||||
|
* CitusStatsTenantsLimit parameter.
|
||||||
|
*/
|
||||||
|
static size_t
|
||||||
|
MultiTenantMonitorshmemSize(void)
|
||||||
|
{
|
||||||
|
Size size = sizeof(MultiTenantMonitor);
|
||||||
|
size = add_size(size, mul_size(sizeof(TenantStats), CitusStatsTenantsLimit * 3));
|
||||||
|
|
||||||
|
return size;
|
||||||
|
}
|
|
@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan);
|
||||||
extern bool IsCitusPlan(Plan *plan);
|
extern bool IsCitusPlan(Plan *plan);
|
||||||
extern bool IsCitusCustomScan(Plan *plan);
|
extern bool IsCitusCustomScan(Plan *plan);
|
||||||
|
|
||||||
|
extern void SetJobColocationId(Job *job);
|
||||||
|
|
||||||
#endif /* CITUS_CUSTOM_SCAN_H */
|
#endif /* CITUS_CUSTOM_SCAN_H */
|
||||||
|
|
|
@ -330,6 +330,9 @@ typedef struct Task
|
||||||
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
|
||||||
*/
|
*/
|
||||||
bool cannotBeExecutedInTransction;
|
bool cannotBeExecutedInTransction;
|
||||||
|
|
||||||
|
char *partitionColumn;
|
||||||
|
int colocationId;
|
||||||
} Task;
|
} Task;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,102 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* attribute.h
|
||||||
|
* Routines related to the multi tenant monitor.
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef CITUS_ATTRIBUTE_H
|
||||||
|
#define CITUS_ATTRIBUTE_H
|
||||||
|
|
||||||
|
#include "executor/execdesc.h"
|
||||||
|
#include "executor/executor.h"
|
||||||
|
#include "storage/lwlock.h"
|
||||||
|
|
||||||
|
#define MAX_TENANT_ATTRIBUTE_LENGTH 100
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TenantStats is the struct that keeps statistics about one tenant.
|
||||||
|
*/
|
||||||
|
typedef struct TenantStats
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* The attribute value, e.g distribution column, and colocation group id
|
||||||
|
* of the tenant.
|
||||||
|
*/
|
||||||
|
char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH];
|
||||||
|
int colocationGroupId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Number of SELECT queries this tenant ran in this and last periods.
|
||||||
|
*/
|
||||||
|
int readsInLastPeriod;
|
||||||
|
int readsInThisPeriod;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Number of INSERT, UPDATE, and DELETE queries this tenant ran in this and last periods.
|
||||||
|
*/
|
||||||
|
int writesInLastPeriod;
|
||||||
|
int writesInThisPeriod;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The latest time this tenant ran a query. This value is used to update the score later.
|
||||||
|
*/
|
||||||
|
time_t lastQueryTime;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query
|
||||||
|
* and halved after every period.
|
||||||
|
*/
|
||||||
|
long long score;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later.
|
||||||
|
*/
|
||||||
|
time_t lastScoreReduction;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Locks needed to update this tenant's statistics.
|
||||||
|
*/
|
||||||
|
NamedLWLockTranche namedLockTranche;
|
||||||
|
LWLock lock;
|
||||||
|
} TenantStats;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MultiTenantMonitor is the struct for keeping the statistics
|
||||||
|
* of the tenants
|
||||||
|
*/
|
||||||
|
typedef struct MultiTenantMonitor
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Lock mechanism for the monitor.
|
||||||
|
* Each tenant update acquires the lock in shared mode and
|
||||||
|
* the tenant number reduction and monitor view acquires in exclusive mode.
|
||||||
|
*/
|
||||||
|
NamedLWLockTranche namedLockTranche;
|
||||||
|
LWLock lock;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* tenantCount is the number of items in the tenants array.
|
||||||
|
* The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor
|
||||||
|
* and is 3 * citus.stats_tenants_limit
|
||||||
|
*/
|
||||||
|
int tenantCount;
|
||||||
|
TenantStats tenants[FLEXIBLE_ARRAY_MEMBER];
|
||||||
|
} MultiTenantMonitor;
|
||||||
|
|
||||||
|
|
||||||
|
extern void CitusAttributeToEnd(QueryDesc *queryDesc);
|
||||||
|
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
|
||||||
|
extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId);
|
||||||
|
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
|
||||||
|
|
||||||
|
extern ExecutorEnd_hook_type prev_ExecutorEnd;
|
||||||
|
|
||||||
|
extern int MultiTenantMonitoringLogLevel;
|
||||||
|
extern int CitusStatsTenantsPeriod;
|
||||||
|
extern int CitusStatsTenantsLimit;
|
||||||
|
|
||||||
|
#endif /*CITUS_ATTRIBUTE_H */
|
|
@ -307,3 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+
|
||||||
|
|
||||||
# shard_rebalancer output, flaky improvement number
|
# shard_rebalancer output, flaky improvement number
|
||||||
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
|
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
|
||||||
|
|
||||||
|
s/\/\* attributeTo.*\*\///g
|
||||||
|
|
|
@ -0,0 +1,290 @@
|
||||||
|
CREATE SCHEMA citus_stats_tenants;
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SET citus.next_shard_id TO 5797500;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$clean_citus_stats_tenants$$;
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$sleep_until_next_period$$;
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- set period to a high number to prevent stats from being reset
|
||||||
|
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000');
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
ALTER SYSTEM
|
||||||
|
ALTER SYSTEM
|
||||||
|
ALTER SYSTEM
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
t
|
||||||
|
t
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl (a INT, b TEXT);
|
||||||
|
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl_2 (a INT, b INT);
|
||||||
|
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl_text (a TEXT, b INT);
|
||||||
|
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE ref_tbl (a INT, b INT);
|
||||||
|
SELECT create_reference_table('ref_tbl');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO dist_tbl VALUES (1, 'abcd');
|
||||||
|
INSERT INTO dist_tbl VALUES (2, 'abcd');
|
||||||
|
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
|
||||||
|
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
|
||||||
|
DELETE FROM dist_tbl WHERE a = 5;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 0 | 0 | 1 | 0
|
||||||
|
5 | 0 | 0 | 1 | 0
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | 0 | 0 | 1 | 0
|
||||||
|
3 | 0 | 0 | 1 | 0
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- queries with multiple tenants should not be counted
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- queries with reference tables should not be counted
|
||||||
|
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
tenant_attribute | query_count_in_this_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
-- queries with multiple tables but one tenant should be counted
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1';
|
||||||
|
tenant_attribute | query_count_in_this_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
-- test scoring
|
||||||
|
-- all of these distribution column values are from second worker
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
tenant_attribute | query_count_in_this_period | score
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2 | 1 | 1000000000
|
||||||
|
3 | 1 | 1000000000
|
||||||
|
4 | 1 | 1000000000
|
||||||
|
abcd | 1 | 1000000000
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
tenant_attribute | query_count_in_this_period | score
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
abcd | 3 | 3000000000
|
||||||
|
2 | 1 | 1000000000
|
||||||
|
3 | 1 | 1000000000
|
||||||
|
4 | 1 | 1000000000
|
||||||
|
bcde | 1 | 1000000000
|
||||||
|
cdef | 1 | 1000000000
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
tenant_attribute | query_count_in_this_period | score
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
abcd | 3 | 3000000000
|
||||||
|
bcde | 3 | 3000000000
|
||||||
|
2 | 1 | 1000000000
|
||||||
|
3 | 1 | 1000000000
|
||||||
|
defg | 1 | 1000000000
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
-- test period passing
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
result
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO dist_tbl VALUES (5, 'abcd');
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1 | 0 | 1 | 0
|
||||||
|
5 | 0 | 0 | 1 | 0
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- simulate passing the period
|
||||||
|
SET citus.stats_tenants_period TO 2;
|
||||||
|
SELECT sleep_until_next_period();
|
||||||
|
sleep_until_next_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 0 | 1 | 0 | 1
|
||||||
|
5 | 0 | 0 | 0 | 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA citus_stats_tenants CASCADE;
|
|
@ -25,7 +25,7 @@ SELECT citus.clear_network_traffic();
|
||||||
---- test multiple statements spanning multiple shards,
|
---- test multiple statements spanning multiple shards,
|
||||||
---- at each significant point. These transactions are 2pc
|
---- at each significant point. These transactions are 2pc
|
||||||
-- fail at DELETE
|
-- fail at DELETE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -54,7 +54,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- cancel at DELETE
|
-- cancel at DELETE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -83,7 +83,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- fail at INSERT
|
-- fail at INSERT
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -110,7 +110,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- cancel at INSERT
|
-- cancel at INSERT
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -137,7 +137,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- fail at UPDATE
|
-- fail at UPDATE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -163,7 +163,7 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- cancel at UPDATE
|
-- cancel at UPDATE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -36,7 +36,7 @@ SELECT create_reference_table('reference_table');
|
||||||
-- (d) multi-row INSERT that hits multiple shards in multiple workers
|
-- (d) multi-row INSERT that hits multiple shards in multiple workers
|
||||||
-- (e) multi-row INSERT to a reference table
|
-- (e) multi-row INSERT to a reference table
|
||||||
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
|
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ SELECT COUNT(*) FROM ref_table;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify behavior of single INSERT; should fail to execute
|
-- verify behavior of single INSERT; should fail to execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=5;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify behavior of UPDATE ... RETURNING; should not execute
|
-- verify behavior of UPDATE ... RETURNING; should not execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=7;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify fix to #2214; should raise error and fail to execute
|
-- verify fix to #2214; should raise error and fail to execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,7 @@ CREATE TABLE partitioned_table_0
|
||||||
PARTITION OF partitioned_table (dist_key, partition_id)
|
PARTITION OF partitioned_table (dist_key, partition_id)
|
||||||
FOR VALUES IN ( 0 );
|
FOR VALUES IN ( 0 );
|
||||||
INSERT INTO partitioned_table VALUES (0, 0);
|
INSERT INTO partitioned_table VALUES (0, 0);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ SELECT create_distributed_table('mod_test', 'key');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- verify behavior of single INSERT; should mark shard as failed
|
-- verify behavior of single INSERT; should mark shard as failed
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO mod_test VALUES (2, 6);
|
INSERT INTO mod_test VALUES (2, 6);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -78,7 +78,7 @@ WHERE shardid IN (
|
||||||
TRUNCATE mod_test;
|
TRUNCATE mod_test;
|
||||||
-- verify behavior of multi-statement modifications to a single shard
|
-- verify behavior of multi-statement modifications to a single shard
|
||||||
-- should fail the transaction and never mark placements inactive
|
-- should fail the transaction and never mark placements inactive
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,7 @@ SELECT create_distributed_table('select_test', 'key');
|
||||||
|
|
||||||
-- put data in shard for which mitm node is first placement
|
-- put data in shard for which mitm node is first placement
|
||||||
INSERT INTO select_test VALUES (3, 'test data');
|
INSERT INTO select_test VALUES (3, 'test data');
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -45,7 +45,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
|
||||||
|
|
||||||
-- kill after first SELECT; txn should fail as INSERT triggers
|
-- kill after first SELECT; txn should fail as INSERT triggers
|
||||||
-- 2PC (and placementis not marked bad)
|
-- 2PC (and placementis not marked bad)
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -66,7 +66,7 @@ TRUNCATE select_test;
|
||||||
-- now the same tests with query cancellation
|
-- now the same tests with query cancellation
|
||||||
-- put data in shard for which mitm node is first placement
|
-- put data in shard for which mitm node is first placement
|
||||||
INSERT INTO select_test VALUES (3, 'test data');
|
INSERT INTO select_test VALUES (3, 'test data');
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -77,7 +77,7 @@ ERROR: canceling statement due to user request
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
ERROR: canceling statement due to user request
|
ERROR: canceling statement due to user request
|
||||||
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
|
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ SELECT citus.mitmproxy('conn.allow()');
|
||||||
TRUNCATE select_test;
|
TRUNCATE select_test;
|
||||||
-- cancel the second query
|
-- cancel the second query
|
||||||
-- error after second SELECT; txn should fail
|
-- error after second SELECT; txn should fail
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -126,7 +126,7 @@ SELECT * FROM select_test WHERE key = 3;
|
||||||
ERROR: canceling statement due to user request
|
ERROR: canceling statement due to user request
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- error after second SELECT; txn should fails the transaction
|
-- error after second SELECT; txn should fails the transaction
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -144,7 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -173,7 +173,7 @@ SELECT create_distributed_table('select_test', 'key');
|
||||||
|
|
||||||
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
|
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
|
||||||
INSERT INTO select_test VALUES (1, 'test data');
|
INSERT INTO select_test VALUES (1, 'test data');
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -188,7 +188,7 @@ SELECT * FROM select_test WHERE key = 1;
|
||||||
SELECT * FROM select_test WHERE key = 1;
|
SELECT * FROM select_test WHERE key = 1;
|
||||||
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
|
||||||
-- now the same test with query cancellation
|
-- now the same test with query cancellation
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -1360,9 +1360,11 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
-- Snapshot of state at 11.3-1
|
-- Snapshot of state at 11.3-1
|
||||||
ALTER EXTENSION citus UPDATE TO '11.3-1';
|
ALTER EXTENSION citus UPDATE TO '11.3-1';
|
||||||
SELECT * FROM multi_extension.print_extension_changes();
|
SELECT * FROM multi_extension.print_extension_changes();
|
||||||
previous_object | current_object
|
previous_object | current_object
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
| function citus_stats_tenants(boolean) SETOF record
|
||||||
|
| view citus_stats_tenants
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -121,6 +121,7 @@ ORDER BY 1;
|
||||||
function citus_stat_activity()
|
function citus_stat_activity()
|
||||||
function citus_stat_statements()
|
function citus_stat_statements()
|
||||||
function citus_stat_statements_reset()
|
function citus_stat_statements_reset()
|
||||||
|
function citus_stats_tenants(boolean)
|
||||||
function citus_table_is_visible(oid)
|
function citus_table_is_visible(oid)
|
||||||
function citus_table_size(regclass)
|
function citus_table_size(regclass)
|
||||||
function citus_task_wait(bigint,citus_task_status)
|
function citus_task_wait(bigint,citus_task_status)
|
||||||
|
@ -316,7 +317,8 @@ ORDER BY 1;
|
||||||
view citus_shards_on_worker
|
view citus_shards_on_worker
|
||||||
view citus_stat_activity
|
view citus_stat_activity
|
||||||
view citus_stat_statements
|
view citus_stat_statements
|
||||||
|
view citus_stats_tenants
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(310 rows)
|
(312 rows)
|
||||||
|
|
||||||
|
|
|
@ -102,6 +102,11 @@ test: pg13_propagate_statistics
|
||||||
# ----------
|
# ----------
|
||||||
test: citus_update_table_statistics
|
test: citus_update_table_statistics
|
||||||
|
|
||||||
|
# ----------
|
||||||
|
# Test for tenant statistics
|
||||||
|
# ----------
|
||||||
|
test: citus_stats_tenants
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Parallel TPC-H tests to check our distributed execution behavior
|
# Parallel TPC-H tests to check our distributed execution behavior
|
||||||
# ----------
|
# ----------
|
||||||
|
|
|
@ -487,6 +487,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
|
||||||
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
|
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
|
||||||
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
|
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
|
||||||
push(@pgOptions, "citus.stat_statements_track = 'all'");
|
push(@pgOptions, "citus.stat_statements_track = 'all'");
|
||||||
|
push(@pgOptions, "citus.stats_tenants_limit = 2");
|
||||||
|
|
||||||
# Some tests look at shards in pg_class, make sure we can usually see them:
|
# Some tests look at shards in pg_class, make sure we can usually see them:
|
||||||
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
|
||||||
|
|
|
@ -0,0 +1,118 @@
|
||||||
|
CREATE SCHEMA citus_stats_tenants;
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
SET citus.next_shard_id TO 5797500;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$clean_citus_stats_tenants$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period()
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C
|
||||||
|
AS 'citus', $$sleep_until_next_period$$;
|
||||||
|
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
|
||||||
|
-- set period to a high number to prevent stats from being reset
|
||||||
|
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000');
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl (a INT, b TEXT);
|
||||||
|
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl_2 (a INT, b INT);
|
||||||
|
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
|
||||||
|
|
||||||
|
CREATE TABLE dist_tbl_text (a TEXT, b INT);
|
||||||
|
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
|
||||||
|
|
||||||
|
CREATE TABLE ref_tbl (a INT, b INT);
|
||||||
|
SELECT create_reference_table('ref_tbl');
|
||||||
|
|
||||||
|
INSERT INTO dist_tbl VALUES (1, 'abcd');
|
||||||
|
INSERT INTO dist_tbl VALUES (2, 'abcd');
|
||||||
|
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
|
||||||
|
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
|
||||||
|
DELETE FROM dist_tbl WHERE a = 5;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
|
||||||
|
-- queries with multiple tenants should not be counted
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
|
||||||
|
|
||||||
|
-- queries with reference tables should not be counted
|
||||||
|
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
-- queries with multiple tables but one tenant should be counted
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1';
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
-- test scoring
|
||||||
|
-- all of these distribution column values are from second worker
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg';
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
-- test period passing
|
||||||
|
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
|
||||||
|
|
||||||
|
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
|
||||||
|
INSERT INTO dist_tbl VALUES (5, 'abcd');
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
|
||||||
|
-- simulate passing the period
|
||||||
|
SET citus.stats_tenants_period TO 2;
|
||||||
|
SELECT sleep_until_next_period();
|
||||||
|
|
||||||
|
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
|
||||||
|
\c - - - :master_port
|
||||||
|
SET search_path TO citus_stats_tenants;
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA citus_stats_tenants CASCADE;
|
|
@ -21,7 +21,7 @@ SELECT citus.clear_network_traffic();
|
||||||
---- at each significant point. These transactions are 2pc
|
---- at each significant point. These transactions are 2pc
|
||||||
|
|
||||||
-- fail at DELETE
|
-- fail at DELETE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
@ -35,7 +35,7 @@ COMMIT;
|
||||||
SELECT * FROM dml_test ORDER BY id ASC;
|
SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
|
|
||||||
-- cancel at DELETE
|
-- cancel at DELETE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
@ -49,7 +49,7 @@ COMMIT;
|
||||||
SELECT * FROM dml_test ORDER BY id ASC;
|
SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
|
|
||||||
-- fail at INSERT
|
-- fail at INSERT
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
@ -63,7 +63,7 @@ COMMIT;
|
||||||
SELECT * FROM dml_test ORDER BY id ASC;
|
SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
|
|
||||||
-- cancel at INSERT
|
-- cancel at INSERT
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
@ -77,7 +77,7 @@ COMMIT;
|
||||||
SELECT * FROM dml_test ORDER BY id ASC;
|
SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
|
|
||||||
-- fail at UPDATE
|
-- fail at UPDATE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
@ -91,7 +91,7 @@ COMMIT;
|
||||||
SELECT * FROM dml_test ORDER BY id ASC;
|
SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
|
|
||||||
-- cancel at UPDATE
|
-- cancel at UPDATE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
|
|
|
@ -30,7 +30,7 @@ SELECT create_reference_table('reference_table');
|
||||||
|
|
||||||
|
|
||||||
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
|
-- Failure and cancellation on multi-row INSERT that hits the same shard with the same value
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3);
|
INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3);
|
||||||
|
|
||||||
-- this test is broken, see https://github.com/citusdata/citus/issues/2460
|
-- this test is broken, see https://github.com/citusdata/citus/issues/2460
|
||||||
|
|
|
@ -17,19 +17,19 @@ SELECT citus.clear_network_traffic();
|
||||||
SELECT COUNT(*) FROM ref_table;
|
SELECT COUNT(*) FROM ref_table;
|
||||||
|
|
||||||
-- verify behavior of single INSERT; should fail to execute
|
-- verify behavior of single INSERT; should fail to execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
INSERT INTO ref_table VALUES (5, 6);
|
INSERT INTO ref_table VALUES (5, 6);
|
||||||
|
|
||||||
SELECT COUNT(*) FROM ref_table WHERE key=5;
|
SELECT COUNT(*) FROM ref_table WHERE key=5;
|
||||||
|
|
||||||
-- verify behavior of UPDATE ... RETURNING; should not execute
|
-- verify behavior of UPDATE ... RETURNING; should not execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
UPDATE ref_table SET key=7 RETURNING value;
|
UPDATE ref_table SET key=7 RETURNING value;
|
||||||
|
|
||||||
SELECT COUNT(*) FROM ref_table WHERE key=7;
|
SELECT COUNT(*) FROM ref_table WHERE key=7;
|
||||||
|
|
||||||
-- verify fix to #2214; should raise error and fail to execute
|
-- verify fix to #2214; should raise error and fail to execute
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM ref_table WHERE key=5;
|
DELETE FROM ref_table WHERE key=5;
|
||||||
|
|
|
@ -19,7 +19,7 @@ CREATE TABLE partitioned_table_0
|
||||||
|
|
||||||
INSERT INTO partitioned_table VALUES (0, 0);
|
INSERT INTO partitioned_table VALUES (0, 0);
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
|
|
||||||
INSERT INTO partitioned_table VALUES (0, 0);
|
INSERT INTO partitioned_table VALUES (0, 0);
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ CREATE TABLE mod_test (key int, value text);
|
||||||
SELECT create_distributed_table('mod_test', 'key');
|
SELECT create_distributed_table('mod_test', 'key');
|
||||||
|
|
||||||
-- verify behavior of single INSERT; should mark shard as failed
|
-- verify behavior of single INSERT; should mark shard as failed
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()');
|
||||||
INSERT INTO mod_test VALUES (2, 6);
|
INSERT INTO mod_test VALUES (2, 6);
|
||||||
|
|
||||||
SELECT COUNT(*) FROM mod_test WHERE key=2;
|
SELECT COUNT(*) FROM mod_test WHERE key=2;
|
||||||
|
@ -24,7 +24,7 @@ TRUNCATE mod_test;
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
INSERT INTO mod_test VALUES (2, 6);
|
INSERT INTO mod_test VALUES (2, 6);
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key;
|
UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key;
|
||||||
|
|
||||||
SELECT COUNT(*) FROM mod_test WHERE value='ok';
|
SELECT COUNT(*) FROM mod_test WHERE value='ok';
|
||||||
|
@ -38,7 +38,7 @@ TRUNCATE mod_test;
|
||||||
|
|
||||||
-- verify behavior of multi-statement modifications to a single shard
|
-- verify behavior of multi-statement modifications to a single shard
|
||||||
-- should fail the transaction and never mark placements inactive
|
-- should fail the transaction and never mark placements inactive
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO mod_test VALUES (2, 6);
|
INSERT INTO mod_test VALUES (2, 6);
|
||||||
|
|
|
@ -13,13 +13,13 @@ SELECT create_distributed_table('select_test', 'key');
|
||||||
-- put data in shard for which mitm node is first placement
|
-- put data in shard for which mitm node is first placement
|
||||||
INSERT INTO select_test VALUES (3, 'test data');
|
INSERT INTO select_test VALUES (3, 'test data');
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
|
|
||||||
-- kill after first SELECT; txn should fail as INSERT triggers
|
-- kill after first SELECT; txn should fail as INSERT triggers
|
||||||
-- 2PC (and placementis not marked bad)
|
-- 2PC (and placementis not marked bad)
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO select_test VALUES (3, 'more data');
|
INSERT INTO select_test VALUES (3, 'more data');
|
||||||
|
@ -35,12 +35,12 @@ TRUNCATE select_test;
|
||||||
-- put data in shard for which mitm node is first placement
|
-- put data in shard for which mitm node is first placement
|
||||||
INSERT INTO select_test VALUES (3, 'test data');
|
INSERT INTO select_test VALUES (3, 'test data');
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
|
|
||||||
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
|
-- cancel after first SELECT; txn should fail and nothing should be marked as invalid
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO select_test VALUES (3, 'more data');
|
INSERT INTO select_test VALUES (3, 'more data');
|
||||||
|
@ -58,7 +58,7 @@ TRUNCATE select_test;
|
||||||
|
|
||||||
-- cancel the second query
|
-- cancel the second query
|
||||||
-- error after second SELECT; txn should fail
|
-- error after second SELECT; txn should fail
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO select_test VALUES (3, 'more data');
|
INSERT INTO select_test VALUES (3, 'more data');
|
||||||
|
@ -68,7 +68,7 @@ SELECT * FROM select_test WHERE key = 3;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- error after second SELECT; txn should fails the transaction
|
-- error after second SELECT; txn should fails the transaction
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()');
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
INSERT INTO select_test VALUES (3, 'more data');
|
INSERT INTO select_test VALUES (3, 'more data');
|
||||||
|
@ -77,7 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data');
|
||||||
SELECT * FROM select_test WHERE key = 3;
|
SELECT * FROM select_test WHERE key = 3;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()');
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
SELECT recover_prepared_transactions();
|
SELECT recover_prepared_transactions();
|
||||||
|
|
||||||
|
@ -93,12 +93,12 @@ SELECT create_distributed_table('select_test', 'key');
|
||||||
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
|
SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached
|
||||||
INSERT INTO select_test VALUES (1, 'test data');
|
INSERT INTO select_test VALUES (1, 'test data');
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()');
|
||||||
SELECT * FROM select_test WHERE key = 1;
|
SELECT * FROM select_test WHERE key = 1;
|
||||||
SELECT * FROM select_test WHERE key = 1;
|
SELECT * FROM select_test WHERE key = 1;
|
||||||
|
|
||||||
-- now the same test with query cancellation
|
-- now the same test with query cancellation
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')');
|
||||||
SELECT * FROM select_test WHERE key = 1;
|
SELECT * FROM select_test WHERE key = 1;
|
||||||
SELECT * FROM select_test WHERE key = 1;
|
SELECT * FROM select_test WHERE key = 1;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue