mirror of https://github.com/citusdata/citus.git
Make citus_stat_tenants work with schema-based tenants. (#6936)
DESCRIPTION: Enabling citus_stat_tenants to support schema-based tenants. This pull request modifies the existing logic to enable tenant monitoring with schema-based tenants. The changes made are as follows: - If a query has a partitionKeyValue (which serves as a tenant key/identifier for distributed tables), Citus annotates the query with both the partitionKeyValue and colocationId. This allows for accurate tracking of the query. - If a query does not have a partitionKeyValue, but its colocationId belongs to a distributed schema, Citus annotates the query with only the colocationId. The tenant monitor can then easily look up the schema to determine if it's a distributed schema and make a decision on whether to track the query. --------- Co-authored-by: Jelte Fennema <jelte.fennema@microsoft.com>pull/6957/head
parent
5acbd735ca
commit
e0ccd155ab
|
@ -896,12 +896,6 @@ SetJobColocationId(Job *job)
|
|||
{
|
||||
uint32 jobColocationId = INVALID_COLOCATION_ID;
|
||||
|
||||
if (!job->partitionKeyValue)
|
||||
{
|
||||
/* if the Job has no shard key, nothing to do */
|
||||
return;
|
||||
}
|
||||
|
||||
List *rangeTableList = ExtractRangeTableEntryList(job->jobQuery);
|
||||
ListCell *rangeTableCell = NULL;
|
||||
foreach(rangeTableCell, rangeTableList)
|
||||
|
|
|
@ -84,6 +84,7 @@
|
|||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/citus_custom_scan.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/query_utils.h"
|
||||
#include "distributed/deparse_shard_query.h"
|
||||
#include "distributed/listutils.h"
|
||||
|
@ -382,13 +383,12 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
|
||||
/*
|
||||
* SetColocationIdAndPartitionKeyValueForTasks sets colocationId and partitionKeyValue
|
||||
* for the tasks in the taskList if workerJob has a colocationId and partitionKeyValue.
|
||||
* for the tasks in the taskList.
|
||||
*/
|
||||
static void
|
||||
SetColocationIdAndPartitionKeyValueForTasks(List *taskList, Job *workerJob)
|
||||
{
|
||||
if (workerJob->colocationId != 0 &&
|
||||
workerJob->partitionKeyValue != NULL)
|
||||
if (workerJob->colocationId != INVALID_COLOCATION_ID)
|
||||
{
|
||||
Task *task = NULL;
|
||||
foreach_ptr(task, taskList)
|
||||
|
|
|
@ -27,6 +27,9 @@ GRANT SELECT ON pg_catalog.pg_dist_tenant_schema TO public;
|
|||
#include "udfs/citus_tables/12.0-1.sql"
|
||||
#include "udfs/citus_shards/12.0-1.sql"
|
||||
|
||||
-- udfs used to include schema-based tenants in tenant monitoring
|
||||
#include "udfs/citus_stat_tenants_local/12.0-1.sql"
|
||||
|
||||
-- udfs to convert a regular/tenant schema to a tenant/regular schema
|
||||
#include "udfs/citus_schema_distribute/12.0-1.sql"
|
||||
#include "udfs/citus_schema_undistribute/12.0-1.sql"
|
||||
|
|
|
@ -47,3 +47,17 @@ DROP FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(Oid, t
|
|||
#include "../udfs/citus_shards/11.1-1.sql"
|
||||
|
||||
DROP TABLE pg_catalog.pg_dist_tenant_schema;
|
||||
|
||||
DROP VIEW pg_catalog.citus_stat_tenants_local;
|
||||
DROP FUNCTION pg_catalog.citus_stat_tenants_local_internal(
|
||||
BOOLEAN,
|
||||
OUT INT,
|
||||
OUT TEXT,
|
||||
OUT INT,
|
||||
OUT INT,
|
||||
OUT INT,
|
||||
OUT INT,
|
||||
OUT DOUBLE PRECISION,
|
||||
OUT DOUBLE PRECISION,
|
||||
OUT BIGINT);
|
||||
#include "../udfs/citus_stat_tenants_local/11.3-1.sql"
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_internal(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT read_count_in_this_period INT,
|
||||
OUT read_count_in_last_period INT,
|
||||
OUT query_count_in_this_period INT,
|
||||
OUT query_count_in_last_period INT,
|
||||
OUT cpu_usage_in_this_period DOUBLE PRECISION,
|
||||
OUT cpu_usage_in_last_period DOUBLE PRECISION,
|
||||
OUT score BIGINT)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C
|
||||
AS 'citus', $$citus_stat_tenants_local$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT read_count_in_this_period INT,
|
||||
OUT read_count_in_last_period INT,
|
||||
OUT query_count_in_this_period INT,
|
||||
OUT query_count_in_last_period INT,
|
||||
OUT cpu_usage_in_this_period DOUBLE PRECISION,
|
||||
OUT cpu_usage_in_last_period DOUBLE PRECISION,
|
||||
OUT score BIGINT)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT
|
||||
L.colocation_id,
|
||||
CASE WHEN L.tenant_attribute IS NULL THEN N.nspname ELSE L.tenant_attribute END COLLATE "default" as tenant_attribute,
|
||||
L.read_count_in_this_period,
|
||||
L.read_count_in_last_period,
|
||||
L.query_count_in_this_period,
|
||||
L.query_count_in_last_period,
|
||||
L.cpu_usage_in_this_period,
|
||||
L.cpu_usage_in_last_period,
|
||||
L.score
|
||||
FROM pg_catalog.citus_stat_tenants_local_internal(return_all_tenants) L
|
||||
LEFT JOIN pg_dist_tenant_schema S ON L.tenant_attribute IS NULL AND L.colocation_id = S.colocationid
|
||||
LEFT JOIN pg_namespace N ON N.oid = S.schemaid
|
||||
ORDER BY L.score DESC;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW pg_catalog.citus_stat_tenants_local AS
|
||||
SELECT
|
||||
colocation_id,
|
||||
tenant_attribute,
|
||||
read_count_in_this_period,
|
||||
read_count_in_last_period,
|
||||
query_count_in_this_period,
|
||||
query_count_in_last_period,
|
||||
cpu_usage_in_this_period,
|
||||
cpu_usage_in_last_period
|
||||
FROM pg_catalog.citus_stat_tenants_local()
|
||||
ORDER BY score DESC;
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) FROM PUBLIC;
|
||||
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) TO pg_monitor;
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC;
|
||||
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor;
|
||||
|
||||
REVOKE ALL ON pg_catalog.citus_stat_tenants_local FROM PUBLIC;
|
||||
GRANT SELECT ON pg_catalog.citus_stat_tenants_local TO pg_monitor;
|
|
@ -1,4 +1,4 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local_internal(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
|
@ -13,8 +13,40 @@ RETURNS SETOF RECORD
|
|||
LANGUAGE C
|
||||
AS 'citus', $$citus_stat_tenants_local$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_stat_tenants_local(
|
||||
return_all_tenants BOOLEAN DEFAULT FALSE,
|
||||
OUT colocation_id INT,
|
||||
OUT tenant_attribute TEXT,
|
||||
OUT read_count_in_this_period INT,
|
||||
OUT read_count_in_last_period INT,
|
||||
OUT query_count_in_this_period INT,
|
||||
OUT query_count_in_last_period INT,
|
||||
OUT cpu_usage_in_this_period DOUBLE PRECISION,
|
||||
OUT cpu_usage_in_last_period DOUBLE PRECISION,
|
||||
OUT score BIGINT)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE plpgsql
|
||||
AS $function$
|
||||
BEGIN
|
||||
RETURN QUERY
|
||||
SELECT
|
||||
L.colocation_id,
|
||||
CASE WHEN L.tenant_attribute IS NULL THEN N.nspname ELSE L.tenant_attribute END COLLATE "default" as tenant_attribute,
|
||||
L.read_count_in_this_period,
|
||||
L.read_count_in_last_period,
|
||||
L.query_count_in_this_period,
|
||||
L.query_count_in_last_period,
|
||||
L.cpu_usage_in_this_period,
|
||||
L.cpu_usage_in_last_period,
|
||||
L.score
|
||||
FROM pg_catalog.citus_stat_tenants_local_internal(return_all_tenants) L
|
||||
LEFT JOIN pg_dist_tenant_schema S ON L.tenant_attribute IS NULL AND L.colocation_id = S.colocationid
|
||||
LEFT JOIN pg_namespace N ON N.oid = S.schemaid
|
||||
ORDER BY L.score DESC;
|
||||
END;
|
||||
$function$;
|
||||
|
||||
CREATE OR REPLACE VIEW citus.citus_stat_tenants_local AS
|
||||
CREATE OR REPLACE VIEW pg_catalog.citus_stat_tenants_local AS
|
||||
SELECT
|
||||
colocation_id,
|
||||
tenant_attribute,
|
||||
|
@ -27,7 +59,8 @@ SELECT
|
|||
FROM pg_catalog.citus_stat_tenants_local()
|
||||
ORDER BY score DESC;
|
||||
|
||||
ALTER VIEW citus.citus_stat_tenants_local SET SCHEMA pg_catalog;
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) FROM PUBLIC;
|
||||
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local_internal(BOOLEAN) TO pg_monitor;
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) FROM PUBLIC;
|
||||
GRANT EXECUTE ON FUNCTION pg_catalog.citus_stat_tenants_local(BOOLEAN) TO pg_monitor;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/tenant_schema_metadata.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "distributed/utils/citus_stat_tenants.h"
|
||||
#include "executor/execdesc.h"
|
||||
|
@ -30,7 +31,8 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/datetime.h"
|
||||
#include "utils/json.h"
|
||||
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
#include <time.h>
|
||||
|
||||
|
@ -38,8 +40,9 @@ static void AttributeMetricsIfApplicable(void);
|
|||
|
||||
ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
|
||||
|
||||
#define ATTRIBUTE_PREFIX "/*{\"tId\":"
|
||||
#define ATTRIBUTE_STRING_FORMAT "/*{\"tId\":%s,\"cId\":%d}*/"
|
||||
#define ATTRIBUTE_PREFIX "/*{\"cId\":"
|
||||
#define ATTRIBUTE_STRING_FORMAT "/*{\"cId\":%d,\"tId\":%s}*/"
|
||||
#define ATTRIBUTE_STRING_FORMAT_WITHOUT_TID "/*{\"cId\":%d}*/"
|
||||
#define STAT_TENANTS_COLUMNS 9
|
||||
#define ONE_QUERY_SCORE 1000000000
|
||||
|
||||
|
@ -155,7 +158,17 @@ citus_stat_tenants_local(PG_FUNCTION_ARGS)
|
|||
TenantStats *tenantStats = stats[i];
|
||||
|
||||
values[0] = Int32GetDatum(tenantStats->key.colocationGroupId);
|
||||
values[1] = PointerGetDatum(cstring_to_text(tenantStats->key.tenantAttribute));
|
||||
|
||||
if (tenantStats->key.tenantAttribute[0] == '\0')
|
||||
{
|
||||
isNulls[1] = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
values[1] = PointerGetDatum(cstring_to_text(
|
||||
tenantStats->key.tenantAttribute));
|
||||
}
|
||||
|
||||
values[2] = Int32GetDatum(tenantStats->readsInThisPeriod);
|
||||
values[3] = Int32GetDatum(tenantStats->readsInLastPeriod);
|
||||
values[4] = Int32GetDatum(tenantStats->readsInThisPeriod +
|
||||
|
@ -221,7 +234,7 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
|
|||
return;
|
||||
}
|
||||
|
||||
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
|
||||
AttributeToColocationGroupId = INVALID_COLOCATION_ID;
|
||||
|
||||
if (query_string == NULL)
|
||||
{
|
||||
|
@ -258,7 +271,7 @@ void
|
|||
AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
||||
{
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||
tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
|
||||
colocationId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -281,9 +294,28 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
|||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* if tenantId is NULL, it must be a schema-based tenant and
|
||||
* we try to get the tenantId from the colocationId to lookup schema name and use it as a tenantId
|
||||
*/
|
||||
if (tenantId == NULL)
|
||||
{
|
||||
if (!IsTenantSchemaColocationGroup(colocationId))
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
AttributeToColocationGroupId = colocationId;
|
||||
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
|
||||
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
|
||||
if (tenantId != NULL)
|
||||
{
|
||||
strncpy_s(AttributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
|
||||
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
|
||||
}
|
||||
AttributeToCommandType = commandType;
|
||||
QueryStartClock = clock();
|
||||
}
|
||||
|
@ -291,26 +323,51 @@ AttributeTask(char *tenantId, int colocationId, CmdType commandType)
|
|||
|
||||
/*
|
||||
* AnnotateQuery annotates the query with tenant attributes.
|
||||
* if the query has a partition key, we annotate it with the partition key value and colocationId
|
||||
* if the query doesn't have a partition key and if it's a schema-based tenant, we annotate it with the colocationId only.
|
||||
*/
|
||||
char *
|
||||
AnnotateQuery(char *queryString, Const *partitionKeyValue, int colocationId)
|
||||
{
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE || partitionKeyValue == NULL)
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||
colocationId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
return queryString;
|
||||
}
|
||||
|
||||
char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
|
||||
partitionKeyValue->consttype);
|
||||
|
||||
char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
|
||||
StringInfo escapedSourceName = makeStringInfo();
|
||||
|
||||
escape_json(escapedSourceName, commentCharsEscaped);
|
||||
|
||||
StringInfo newQuery = makeStringInfo();
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, escapedSourceName->data,
|
||||
colocationId);
|
||||
|
||||
/* if the query doesn't have a parititon key value, check if it is a tenant schema */
|
||||
if (partitionKeyValue == NULL)
|
||||
{
|
||||
if (IsTenantSchemaColocationGroup(colocationId))
|
||||
{
|
||||
/* If it is a schema-based tenant, we only annotate the query with colocationId */
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT_WITHOUT_TID,
|
||||
colocationId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* If it is not a schema-based tenant query and doesn't have a parititon key,
|
||||
* we don't annotate it
|
||||
*/
|
||||
return queryString;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* if the query has a partition key value, we annotate it with both tenantId and colocationId */
|
||||
char *partitionKeyValueString = DatumToString(partitionKeyValue->constvalue,
|
||||
partitionKeyValue->consttype);
|
||||
|
||||
char *commentCharsEscaped = EscapeCommentChars(partitionKeyValueString);
|
||||
StringInfo escapedSourceName = makeStringInfo();
|
||||
escape_json(escapedSourceName, commentCharsEscaped);
|
||||
|
||||
appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, colocationId,
|
||||
escapedSourceName->data
|
||||
);
|
||||
}
|
||||
|
||||
appendStringInfoString(newQuery, queryString);
|
||||
|
||||
|
@ -372,7 +429,7 @@ static void
|
|||
AttributeMetricsIfApplicable()
|
||||
{
|
||||
if (StatTenantsTrack == STAT_TENANTS_TRACK_NONE ||
|
||||
AttributeToTenant[0] == '\0')
|
||||
AttributeToColocationGroupId == INVALID_COLOCATION_ID)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
@ -449,7 +506,7 @@ AttributeMetricsIfApplicable()
|
|||
}
|
||||
LWLockRelease(&monitor->lock);
|
||||
|
||||
strcpy_s(AttributeToTenant, sizeof(AttributeToTenant), "");
|
||||
AttributeToColocationGroupId = INVALID_COLOCATION_ID;
|
||||
}
|
||||
|
||||
|
||||
|
@ -761,7 +818,12 @@ FillTenantStatsHashKey(TenantStatsHashKey *key, char *tenantAttribute, uint32
|
|||
colocationGroupId)
|
||||
{
|
||||
memset(key->tenantAttribute, 0, MAX_TENANT_ATTRIBUTE_LENGTH);
|
||||
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
|
||||
|
||||
if (tenantAttribute != NULL)
|
||||
{
|
||||
strlcpy(key->tenantAttribute, tenantAttribute, MAX_TENANT_ATTRIBUTE_LENGTH);
|
||||
}
|
||||
|
||||
key->colocationGroupId = colocationGroupId;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,9 +24,6 @@
|
|||
#include "utils/fmgroids.h"
|
||||
|
||||
|
||||
static Oid ColocationIdGetTenantSchemaId(uint32 colocationId);
|
||||
|
||||
|
||||
/*
|
||||
* IsTenantSchema returns true if there is a tenant schema with given schemaId.
|
||||
*/
|
||||
|
@ -125,7 +122,7 @@ SchemaIdGetTenantColocationId(Oid schemaId)
|
|||
*
|
||||
* Returns InvalidOid if there is no such tenant schema.
|
||||
*/
|
||||
static Oid
|
||||
Oid
|
||||
ColocationIdGetTenantSchemaId(uint32 colocationId)
|
||||
{
|
||||
if (colocationId == INVALID_COLOCATION_ID)
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include "postgres.h"
|
||||
|
||||
/* accessors */
|
||||
extern Oid ColocationIdGetTenantSchemaId(uint32 colocationId);
|
||||
extern uint32 SchemaIdGetTenantColocationId(Oid schemaId);
|
||||
extern bool IsTenantSchema(Oid schemaId);
|
||||
extern bool IsTenantSchemaColocationGroup(uint32 colocationId);
|
||||
|
|
|
@ -309,7 +309,7 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+
|
|||
# shard_rebalancer output, flaky improvement number
|
||||
s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g
|
||||
# normalize tenants statistics annotations
|
||||
s/\/\*\{"tId":.*\*\///g
|
||||
s/\/\*\{"cId":.*\*\///g
|
||||
|
||||
# Notice message that contains current columnar version that makes it harder to bump versions
|
||||
s/(NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION )"[0-9]+\.[0-9]+-[0-9]+"/\1 "x.y-z"/
|
||||
|
|
|
@ -167,12 +167,5 @@ select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.
|
|||
829
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA adaptive_executor CASCADE;
|
||||
NOTICE: drop cascades to 7 other objects
|
||||
DETAIL: drop cascades to table ab
|
||||
drop cascades to table single_hash_repartition_first
|
||||
drop cascades to table single_hash_repartition_second
|
||||
drop cascades to table ref_table
|
||||
drop cascades to table ref_table_361397
|
||||
drop cascades to table cars
|
||||
drop cascades to table trips
|
||||
|
|
|
@ -881,5 +881,133 @@ SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDE
|
|||
äbc | 11
|
||||
(2 rows)
|
||||
|
||||
-- single shard distributed table, which is not part of a tenant schema
|
||||
SELECT citus_stat_tenants_reset();
|
||||
citus_stat_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE dist_tbl_text_single_shard(a text, b int);
|
||||
select create_distributed_table('dist_tbl_text_single_shard', NULL);
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_tbl_text_single_shard VALUES ('/b*c/de', 1);
|
||||
SELECT count(*)>=0 FROM dist_tbl_text_single_shard WHERE a = '/b*c/de';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DELETE FROM dist_tbl_text_single_shard WHERE a = '/b*c/de';
|
||||
UPDATE dist_tbl_text_single_shard SET b = 1 WHERE a = '/b*c/de';
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
tenant_attribute | query_count_in_this_period
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- schema based tenants
|
||||
SELECT citus_stat_tenants_reset();
|
||||
citus_stat_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
CREATE SCHEMA citus_stat_tenants_t1;
|
||||
CREATE TABLE citus_stat_tenants_t1.users(id int);
|
||||
SELECT id FROM citus_stat_tenants_t1.users WHERE id = 2;
|
||||
id
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
INSERT INTO citus_stat_tenants_t1.users VALUES (1);
|
||||
UPDATE citus_stat_tenants_t1.users SET id = 2 WHERE id = 1;
|
||||
DELETE FROM citus_stat_tenants_t1.users WHERE id = 2;
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
citus_stat_tenants_t1 | 1 | 0 | 4 | 0
|
||||
(1 row)
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
citus_stat_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
PREPARE schema_tenant_insert_plan (int) AS insert into citus_stat_tenants_t1.users values ($1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
PREPARE schema_tenant_select_plan (int) AS SELECT count(*) > 1 FROM citus_stat_tenants_t1.users where Id = $1;
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
f
|
||||
(1 row)
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
citus_stat_tenants_t1 | 1 | 0 | 2 | 0
|
||||
(1 row)
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
citus_stat_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- local execution & prepared statements
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO citus_stat_tenants;
|
||||
PREPARE schema_tenant_insert_plan (int) AS insert into citus_stat_tenants_t1.users values ($1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
PREPARE schema_tenant_select_plan (int) AS SELECT count(*) > 1 FROM citus_stat_tenants_t1.users where Id = $1;
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
citus_stat_tenants_t1 | 5 | 0 | 10 | 0
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO citus_stat_tenants;
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA citus_stat_tenants CASCADE;
|
||||
DROP SCHEMA citus_stat_tenants_t1 CASCADE;
|
||||
|
|
|
@ -1364,8 +1364,9 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_internal_unregister_tenant_schema_globally(oid,text) void
|
||||
| function citus_schema_distribute(regnamespace) void
|
||||
| function citus_schema_undistribute(regnamespace) void
|
||||
| function citus_stat_tenants_local_internal(boolean) SETOF record
|
||||
| table pg_dist_tenant_schema
|
||||
(6 rows)
|
||||
(7 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -132,6 +132,7 @@ ORDER BY 1;
|
|||
function citus_stat_statements_reset()
|
||||
function citus_stat_tenants(boolean)
|
||||
function citus_stat_tenants_local(boolean)
|
||||
function citus_stat_tenants_local_internal(boolean)
|
||||
function citus_stat_tenants_local_reset()
|
||||
function citus_stat_tenants_reset()
|
||||
function citus_table_is_visible(oid)
|
||||
|
@ -336,5 +337,5 @@ ORDER BY 1;
|
|||
view citus_stat_tenants_local
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(328 rows)
|
||||
(329 rows)
|
||||
|
||||
|
|
|
@ -79,4 +79,5 @@ select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.
|
|||
set citus.enable_single_hash_repartition_joins to on;
|
||||
select count(*) from trips t1, cars r1, trips t2, cars r2 where t1.trip_id = t2.trip_id and t1.car_id = r1.car_id and t2.car_id = r2.car_id;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA adaptive_executor CASCADE;
|
||||
|
|
|
@ -313,5 +313,69 @@ SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc
|
|||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
|
||||
-- single shard distributed table, which is not part of a tenant schema
|
||||
SELECT citus_stat_tenants_reset();
|
||||
|
||||
CREATE TABLE dist_tbl_text_single_shard(a text, b int);
|
||||
select create_distributed_table('dist_tbl_text_single_shard', NULL);
|
||||
|
||||
INSERT INTO dist_tbl_text_single_shard VALUES ('/b*c/de', 1);
|
||||
SELECT count(*)>=0 FROM dist_tbl_text_single_shard WHERE a = '/b*c/de';
|
||||
DELETE FROM dist_tbl_text_single_shard WHERE a = '/b*c/de';
|
||||
UPDATE dist_tbl_text_single_shard SET b = 1 WHERE a = '/b*c/de';
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
|
||||
-- schema based tenants
|
||||
SELECT citus_stat_tenants_reset();
|
||||
|
||||
SET citus.enable_schema_based_sharding TO ON;
|
||||
|
||||
CREATE SCHEMA citus_stat_tenants_t1;
|
||||
CREATE TABLE citus_stat_tenants_t1.users(id int);
|
||||
|
||||
SELECT id FROM citus_stat_tenants_t1.users WHERE id = 2;
|
||||
INSERT INTO citus_stat_tenants_t1.users VALUES (1);
|
||||
UPDATE citus_stat_tenants_t1.users SET id = 2 WHERE id = 1;
|
||||
DELETE FROM citus_stat_tenants_t1.users WHERE id = 2;
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
|
||||
PREPARE schema_tenant_insert_plan (int) AS insert into citus_stat_tenants_t1.users values ($1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
|
||||
PREPARE schema_tenant_select_plan (int) AS SELECT count(*) > 1 FROM citus_stat_tenants_t1.users where Id = $1;
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
|
||||
-- local execution & prepared statements
|
||||
\c - - - :worker_2_port
|
||||
SET search_path TO citus_stat_tenants;
|
||||
|
||||
PREPARE schema_tenant_insert_plan (int) AS insert into citus_stat_tenants_t1.users values ($1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
EXECUTE schema_tenant_insert_plan(1);
|
||||
|
||||
PREPARE schema_tenant_select_plan (int) AS SELECT count(*) > 1 FROM citus_stat_tenants_t1.users where Id = $1;
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
EXECUTE schema_tenant_select_plan(1);
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
|
||||
\c - - - :master_port
|
||||
SET search_path TO citus_stat_tenants;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA citus_stat_tenants CASCADE;
|
||||
DROP SCHEMA citus_stat_tenants_t1 CASCADE;
|
||||
|
|
Loading…
Reference in New Issue