Merge branch 'multi-tenant-monitoring-annotation-parsing' into multi-tenant-monitoring-annotate-local-queries

pull/6763/head
Gokhan Gulbiz 2023-03-15 09:19:59 +03:00
commit 7c5a724b3b
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
14 changed files with 609 additions and 137 deletions

View File

@ -1997,11 +1997,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList,
}
else
{
Datum partitionColumnValue;
Oid partitionColumnType = 0;
char *partitionColumnString = NULL;
if (job->partitionKeyValue != NULL)
{
partitionColumnValue = job->partitionKeyValue->constvalue;
partitionColumnType = job->partitionKeyValue->consttype;
partitionColumnString = DatumToString(partitionColumnValue,
partitionColumnType);
}
SetJobColocationId(job);
job->taskList = SingleShardTaskList(originalQuery, job->jobId,
relationShardList, placementList,
shardId,
job->parametersInJobQueryResolved,
isLocalTableModification, "", -1);
isLocalTableModification,
partitionColumnString, job->colocationId);
}
}

View File

@ -3,4 +3,3 @@
-- bump version to 11.3-1
#include "udfs/citus_stats_tenants/11.3-1.sql"
#include "udfs/citus_stats_tenants_storage/11.3-1.sql"

View File

@ -2,7 +2,4 @@
-- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now
DROP VIEW pg_catalog.citus_stats_tenants;
DROP VIEW pg_catalog.citus_stats_tenants_storage;
DROP FUNCTION pg_catalog.citus_stats_tenants(boolean);
DROP FUNCTION pg_catalog.citus_stats_tenants_storage;

View File

@ -1,28 +0,0 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage (
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT storage_estimate INT
)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
tn TEXT;
dc TEXT;
ci INT;
BEGIN
FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts
LOOP
RETURN QUERY
EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc;
END LOOP;
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS
SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage()
GROUP BY colocation_id, tenant_attribute
ORDER BY total_storage DESC;
ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC;

View File

@ -1,28 +0,0 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants_storage (
OUT colocation_id INT,
OUT tenant_attribute TEXT,
OUT storage_estimate INT
)
RETURNS SETOF record
LANGUAGE plpgsql
AS $function$
DECLARE
tn TEXT;
dc TEXT;
ci INT;
BEGIN
FOR ci, tn, dc IN SELECT cts.colocation_id, cts.table_name, cts.distribution_column FROM citus_tables cts
LOOP
RETURN QUERY
EXECUTE 'SELECT ' || ci || '::int, ' || dc || '::text, sum(pg_column_size(' || tn || '.*))::int FROM ' || tn || ' GROUP BY ' || dc;
END LOOP;
END;
$function$;
CREATE OR REPLACE VIEW citus.citus_stats_tenants_storage AS
SELECT colocation_id, tenant_attribute, sum(storage_estimate) total_storage FROM pg_catalog.citus_stats_tenants_storage()
GROUP BY colocation_id, tenant_attribute
ORDER BY total_storage DESC;
ALTER VIEW citus.citus_stats_tenants_storage SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_stats_tenants_storage TO PUBLIC;

View File

@ -23,6 +23,7 @@
#include "utils/builtins.h"
#include "utils/json.h"
#include "distributed/utils/attribute.h"
#include "common/base64.h"
#include <time.h>
@ -30,7 +31,7 @@ static void AttributeMetricsIfApplicable(void);
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
#define ATTRIBUTE_PREFIX "/*{"
#define ATTRIBUTE_PREFIX "/*{\"tId\":"
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
#define CITUS_STATS_TENANTS_COLUMNS 7
#define ONE_QUERY_SCORE 1000000000
@ -60,7 +61,10 @@ static void MultiTenantMonitorSMInit(void);
static int CreateTenantStats(MultiTenantMonitor *monitor);
static int FindTenantStats(MultiTenantMonitor *monitor);
static size_t MultiTenantMonitorshmemSize(void);
static char * extractTopComment(const char *inputString);
static char * ExtractTopComment(const char *inputString);
static char * Substring(const char *str, int start, int end);
static char * EscapeCommentChars(const char *str);
static char * UnescapeCommentChars(const char *str);
int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF;
int CitusStatsTenantsPeriod = (time_t) 60;
@ -68,6 +72,7 @@ int CitusStatsTenantsLimit = 10;
PG_FUNCTION_INFO_V1(citus_stats_tenants);
PG_FUNCTION_INFO_V1(clean_citus_stats_tenants);
/*
@ -128,12 +133,12 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
values[0] = Int32GetDatum(tenantStats->colocationGroupId);
values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute));
values[2] = Int32GetDatum(tenantStats->selectsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->selectsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->selectsInThisPeriod +
tenantStats->insertsInThisPeriod);
values[5] = Int32GetDatum(tenantStats->selectsInLastPeriod +
tenantStats->insertsInLastPeriod);
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
tenantStats->writesInThisPeriod);
values[5] = Int32GetDatum(tenantStats->readsInLastPeriod +
tenantStats->writesInLastPeriod);
values[6] = Int64GetDatum(tenantStats->score);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
@ -145,12 +150,28 @@ citus_stats_tenants(PG_FUNCTION_ARGS)
}
/*
* clean_citus_stats_tenants cleans the citus_stats_tenants monitor.
*/
Datum
clean_citus_stats_tenants(PG_FUNCTION_ARGS)
{
MultiTenantMonitor *monitor = GetMultiTenantMonitor();
monitor->tenantCount = 0;
monitor->periodStart = time(0);
PG_RETURN_VOID();
}
/*
* AttributeQueryIfAnnotated assigns the attributes of tenant if the query is annotated.
*/
void
AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
{
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
attributeCommandType = commandType;
if (query_string == NULL)
@ -158,7 +179,9 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
return;
}
char *annotation = extractTopComment(query_string);
if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0)
{
char *annotation = ExtractTopComment(query_string);
if (annotation != NULL)
{
Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
@ -166,7 +189,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
if (tenantIdTextP != NULL)
{
char *tenantId = text_to_cstring(tenantIdTextP);
char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId);
}
@ -180,12 +203,8 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
colocationGroupId)));
}
}
else
{
/*Assert(attributeToTenant == NULL); */
}
/*DetachSegment(); */
attributeToTenantStart = clock();
}
@ -201,12 +220,15 @@ AnnotateQuery(char *queryString, char *partitionColumn, int colocationId)
return queryString;
}
char *commentCharsEscaped = EscapeCommentChars(partitionColumn);
StringInfo escapedSourceName = makeStringInfo();
escape_json(escapedSourceName, partitionColumn);
escape_json(escapedSourceName, commentCharsEscaped);
StringInfo newQuery = makeStringInfo();
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
colocationId);
appendStringInfoString(newQuery, queryString);
return newQuery->data;
@ -308,15 +330,17 @@ AttributeMetricsIfApplicable()
if (attributeCommandType == CMD_SELECT)
{
tenantStats->selectCount++;
tenantStats->selectsInThisPeriod++;
tenantStats->totalSelectTime += cpu_time_used;
tenantStats->readCount++;
tenantStats->readsInThisPeriod++;
tenantStats->totalReadTime += cpu_time_used;
}
else if (attributeCommandType == CMD_INSERT)
else if (attributeCommandType == CMD_UPDATE ||
attributeCommandType == CMD_INSERT ||
attributeCommandType == CMD_DELETE)
{
tenantStats->insertCount++;
tenantStats->insertsInThisPeriod++;
tenantStats->totalInsertTime += cpu_time_used;
tenantStats->writeCount++;
tenantStats->writesInThisPeriod++;
tenantStats->totalWriteTime += cpu_time_used;
}
LWLockRelease(&monitor->lock);
@ -337,15 +361,15 @@ AttributeMetricsIfApplicable()
if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF)
{
ereport(NOTICE, (errmsg("total select count = %d, total CPU time = %f "
ereport(NOTICE, (errmsg("total read count = %d, total read CPU time = %f "
"to tenant: %s",
tenantStats->selectCount,
tenantStats->totalSelectTime,
tenantStats->readCount,
tenantStats->totalReadTime,
tenantStats->tenantAttribute)));
}
}
/*attributeToTenant = NULL; */
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
}
@ -366,13 +390,13 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats)
* but there are some query count for this period we move them to the last period.
*/
if (tenantStats->lastQueryTime < monitor->periodStart &&
(tenantStats->insertsInThisPeriod || tenantStats->selectsInThisPeriod))
(tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod))
{
tenantStats->insertsInLastPeriod = tenantStats->insertsInThisPeriod;
tenantStats->insertsInThisPeriod = 0;
tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod;
tenantStats->writesInThisPeriod = 0;
tenantStats->selectsInLastPeriod = tenantStats->selectsInThisPeriod;
tenantStats->selectsInThisPeriod = 0;
tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod;
tenantStats->readsInThisPeriod = 0;
}
/*
@ -380,9 +404,9 @@ UpdatePeriodsIfNecessary(MultiTenantMonitor *monitor, TenantStats *tenantStats)
*/
if (tenantStats->lastQueryTime < monitor->periodStart - CitusStatsTenantsPeriod)
{
tenantStats->insertsInLastPeriod = 0;
tenantStats->writesInLastPeriod = 0;
tenantStats->selectsInLastPeriod = 0;
tenantStats->readsInLastPeriod = 0;
}
}
@ -526,6 +550,8 @@ CreateTenantStats(MultiTenantMonitor *monitor)
{
int tenantIndex = monitor->tenantCount;
memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex]));
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId;
@ -579,32 +605,113 @@ MultiTenantMonitorshmemSize(void)
/*
* extractTopComment extracts the top-level multi-line comment from a given input string.
* ExtractTopComment extracts the top-level multi-line comment from a given input string.
*/
static char *
extractTopComment(const char *inputString)
ExtractTopComment(const char *inputString)
{
int i = 0;
int commentStartCharsLength = 2;
int inputStringLen = strlen(inputString);
if (inputStringLen < commentStartCharsLength)
{
return NULL;
}
int commentEndCharsIndex = 0;
/* If query starts with a comment */
if (inputString[i] == '/' && inputString[i + 1] == '*')
if (inputString[commentEndCharsIndex] == '/' &&
inputString[commentEndCharsIndex + 1] == '*')
{
/* Skip the comment start characters */
i += 2;
while (inputString[i] && (inputString[i] != '*' && inputString[i + 1] != '/'))
commentEndCharsIndex += commentStartCharsLength;
while (inputString[commentEndCharsIndex] &&
commentEndCharsIndex < inputStringLen &&
!(inputString[commentEndCharsIndex] == '*' &&
inputString [commentEndCharsIndex + 1] == '/'))
{
i++;
commentEndCharsIndex++;
}
}
if (i > 2)
if (commentEndCharsIndex > commentStartCharsLength)
{
char *result = (char *) palloc(sizeof(char) * (i - 1));
strncpy(result, inputString + 2, i - 2);
return result;
return Substring(inputString, commentStartCharsLength, commentEndCharsIndex);
}
else
{
return NULL;
}
}
/* Extracts a substring from the input string between the specified start and end indices.*/
static char *
Substring(const char *str, int start, int end)
{
int len = strlen(str);
/* Ensure start and end are within the bounds of the string */
if (start < 0 || end > len || start > end)
{
return NULL;
}
/* Allocate memory for the substring */
char *substr = (char *) palloc((end - start + 1) * sizeof(char));
/* Copy the substring to the new memory location */
strncpy_s(substr, end - start + 1, str + start, end - start);
/* Add null terminator to end the substring */
substr[end - start] = '\0';
return substr;
}
/* EscapeCommentChars adds a backslash before each occurrence of '*' or '/' in the input string */
static char *
EscapeCommentChars(const char *str)
{
int len = strlen(str);
char *new_str = (char *) malloc(len * 2 + 1);
int j = 0;
for (int i = 0; i < len; i++)
{
if (str[i] == '*' || str[i] == '/')
{
new_str[j++] = '\\';
}
new_str[j++] = str[i];
}
new_str[j] = '\0';
return new_str;
}
/* UnescapeCommentChars removes the backslash that precedes '*' or '/' in the input string. */
static char *
UnescapeCommentChars(const char *str)
{
int len = strlen(str);
char *new_str = (char *) malloc(len + 1);
int j = 0;
for (int i = 0; i < len; i++)
{
if (str[i] == '\\' && i < len - 1)
{
if (str[i + 1] == '*' || str[i + 1] == '/')
{
i++;
}
}
new_str[j++] = str[i];
}
new_str[j] = '\0';
return new_str;
}

View File

@ -21,21 +21,20 @@ typedef struct TenantStats
int colocationGroupId;
int selectCount;
double totalSelectTime;
int selectsInLastPeriod;
int selectsInThisPeriod;
int readCount;
double totalReadTime;
int readsInLastPeriod;
int readsInThisPeriod;
int insertCount;
double totalInsertTime;
int insertsInLastPeriod;
int insertsInThisPeriod;
int writeCount;
double totalWriteTime;
int writesInLastPeriod;
int writesInThisPeriod;
time_t lastQueryTime;
long long score;
time_t lastScoreReduction;
int rank;
NamedLWLockTranche namedLockTranche;
LWLock lock;

View File

@ -303,5 +303,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+
# shard_rebalancer output, flaky improvement number
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
s/\/\* attributeTo.*\*\///g
# normalize tenants statistics annotations
s/\/\*\{"tId":.*\*\///g

View File

@ -0,0 +1,298 @@
CREATE SCHEMA citus_stats_tenants;
SET search_path TO citus_stats_tenants;
SET citus.next_shard_id TO 5797500;
SET citus.shard_replication_factor TO 1;
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
RETURNS VOID
LANGUAGE C
AS 'citus', $$clean_citus_stats_tenants$$;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
result
---------------------------------------------------------------------
(3 rows)
CREATE TABLE dist_tbl (a INT, b TEXT);
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_tbl_2 (a INT, b INT);
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE dist_tbl_text (a TEXT, b INT);
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE ref_tbl (a INT, b INT);
SELECT create_reference_table('ref_tbl');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 0 | 0 | 1 | 0
5 | 0 | 0 | 1 | 0
(2 rows)
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
2 | 0 | 0 | 1 | 0
3 | 0 | 0 | 1 | 0
(2 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 3');
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 1 | 0 | 2 | 0
5 | 0 | 0 | 1 | 0
(2 rows)
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
2 | 1 | 0 | 2 | 0
3 | 0 | 0 | 1 | 0
(2 rows)
\c - - - :master_port
SELECT pg_sleep (3);
pg_sleep
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
1 | 0 | 1 | 0 | 2
5 | 0 | 0 | 0 | 1
(2 rows)
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
2 | 0 | 1 | 0 | 2
3 | 0 | 0 | 0 | 1
(2 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 60');
result
---------------------------------------------------------------------
ALTER SYSTEM
ALTER SYSTEM
ALTER SYSTEM
(3 rows)
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
result
---------------------------------------------------------------------
t
t
t
(3 rows)
-- queries with multiple tenants should not be counted
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
?column?
---------------------------------------------------------------------
t
(1 row)
-- queries with reference tables should not be counted
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
1 | 0
5 | 0
(2 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
-- queries with multiple tables but one tenant should be counted
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1';
tenant_attribute | query_count_in_this_period
---------------------------------------------------------------------
1 | 2
(1 row)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
-- test scoring
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
2 | 1 | 2000000000
3 | 1 | 1500000000
4 | 1 | 1500000000
abcd | 1 | 1000000000
(4 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
abcd | 3 | 3000000000
2 | 1 | 2000000000
3 | 1 | 1500000000
4 | 1 | 1500000000
bcde | 1 | 1000000000
(5 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
tenant_attribute | query_count_in_this_period | score
---------------------------------------------------------------------
abcd | 3 | 3000000000
bcde | 3 | 3000000000
2 | 1 | 2000000000
3 | 1 | 1500000000
(4 rows)
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stats_tenants CASCADE;

View File

@ -1326,10 +1326,8 @@ SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
| function citus_stats_tenants(boolean) SETOF record
| function citus_stats_tenants_storage() SETOF record
| view citus_stats_tenants
| view citus_stats_tenants_storage
(4 rows)
(2 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -122,7 +122,6 @@ ORDER BY 1;
function citus_stat_statements()
function citus_stat_statements_reset()
function citus_stats_tenants(boolean)
function citus_stats_tenants_storage()
function citus_table_is_visible(oid)
function citus_table_size(regclass)
function citus_task_wait(bigint,citus_task_status)
@ -319,8 +318,7 @@ ORDER BY 1;
view citus_stat_activity
view citus_stat_statements
view citus_stats_tenants
view citus_stats_tenants_storage
view pg_dist_shard_placement
view time_partitions
(314 rows)
(312 rows)

View File

@ -109,6 +109,11 @@ test: pg13_propagate_statistics
# ----------
test: citus_update_table_statistics
# ----------
# Test for tenant statistics
# ----------
test: citus_stats_tenants
# ----------
# Parallel TPC-H tests to check our distributed execution behavior
# ----------

View File

@ -485,6 +485,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on");
push(@pgOptions, "citus.stat_statements_track = 'all'");
push(@pgOptions, "citus.stats_tenants_limit = 2");
# Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");

View File

@ -0,0 +1,112 @@
CREATE SCHEMA citus_stats_tenants;
SET search_path TO citus_stats_tenants;
SET citus.next_shard_id TO 5797500;
SET citus.shard_replication_factor TO 1;
CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants()
RETURNS VOID
LANGUAGE C
AS 'citus', $$clean_citus_stats_tenants$$;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
CREATE TABLE dist_tbl (a INT, b TEXT);
SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none');
CREATE TABLE dist_tbl_2 (a INT, b INT);
SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl');
CREATE TABLE dist_tbl_text (a TEXT, b INT);
SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none');
CREATE TABLE ref_tbl (a INT, b INT);
SELECT create_reference_table('ref_tbl');
INSERT INTO dist_tbl VALUES (1, 'abcd');
INSERT INTO dist_tbl VALUES (2, 'abcd');
UPDATE dist_tbl SET b = a + 1 WHERE a = 3;
UPDATE dist_tbl SET b = a + 1 WHERE a = 4;
DELETE FROM dist_tbl WHERE a = 5;
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 3');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
SELECT count(*)>=0 FROM dist_tbl WHERE a = 1;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SELECT pg_sleep (3);
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 60');
SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()');
-- queries with multiple tenants should not be counted
SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5);
-- queries with reference tables should not be counted
SELECT count(*)>=0 FROM ref_tbl WHERE a = 1;
\c - - - :worker_1_port
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
-- queries with multiple tables but one tenant should be counted
SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1;
SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1;
\c - - - :worker_1_port
SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1';
\c - - - :master_port
SET search_path TO citus_stats_tenants;
-- test scoring
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl WHERE a = 2;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 3;
SELECT count(*)>=0 FROM dist_tbl WHERE a = 4;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef';
\c - - - :worker_2_port
SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC;
\c - - - :master_port
SET client_min_messages TO ERROR;
DROP SCHEMA citus_stats_tenants CASCADE;