mirror of https://github.com/citusdata/citus.git
Ensure partitionKeyValue and colocationId are set for proper tenant stats gathering (#6834)
This PR updates the tenant stats implementation to set partitionKeyValue and colocationId in ExecuteLocalTaskListExtended, in addition to LocallyExecuteTaskPlan. This ensures that tenant stats can be properly gathered regardless of the code path taken. The changes were initially made while testing stored procedure calls for tenant stats.test_br
parent
f87a2d02b0
commit
8782ea1582
|
@ -129,6 +129,8 @@ static void LogLocalCommand(Task *task);
|
|||
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
|
||||
TupleDestination *tupleDest,
|
||||
Task *task);
|
||||
static void SetColocationIdAndPartitionKeyValueForTasks(List *taskList,
|
||||
Job *distributedPlan);
|
||||
static void LocallyExecuteUtilityTask(Task *task);
|
||||
static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery);
|
||||
static void EnsureTransitionPossible(LocalExecutionStatus from,
|
||||
|
@ -228,6 +230,17 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
EnsureTaskExecutionAllowed(isRemote);
|
||||
}
|
||||
|
||||
/*
|
||||
* If workerJob has a partitionKeyValue, we need to set the colocation id
|
||||
* and partition key value for each task before we start executing them
|
||||
* because tenant stats are collected based on these values of a task.
|
||||
*/
|
||||
if (distributedPlan != NULL && distributedPlan->workerJob != NULL && taskList != NIL)
|
||||
{
|
||||
SetJobColocationId(distributedPlan->workerJob);
|
||||
SetColocationIdAndPartitionKeyValueForTasks(taskList, distributedPlan->workerJob);
|
||||
}
|
||||
|
||||
/*
|
||||
* Use a new memory context that gets reset after every task to free
|
||||
* the deparsed query string and query plan.
|
||||
|
@ -367,6 +380,26 @@ ExecuteLocalTaskListExtended(List *taskList,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetColocationIdAndPartitionKeyValueForTasks sets colocationId and partitionKeyValue
|
||||
* for the tasks in the taskList if workerJob has a colocationId and partitionKeyValue.
|
||||
*/
|
||||
static void
|
||||
SetColocationIdAndPartitionKeyValueForTasks(List *taskList, Job *workerJob)
|
||||
{
|
||||
if (workerJob->colocationId != 0 &&
|
||||
workerJob->partitionKeyValue != NULL)
|
||||
{
|
||||
Task *task = NULL;
|
||||
foreach_ptr(task, taskList)
|
||||
{
|
||||
task->colocationId = workerJob->colocationId;
|
||||
task->partitionKeyValue = workerJob->partitionKeyValue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LocallyPlanAndExecuteMultipleQueries plans and executes the given query strings
|
||||
* one by one.
|
||||
|
|
|
@ -255,7 +255,7 @@ ORDER BY tenant_attribute;
|
|||
(2 rows)
|
||||
|
||||
-- simulate passing the period
|
||||
SET citus.stat_tenants_period TO 2;
|
||||
SET citus.stat_tenants_period TO 5;
|
||||
SELECT sleep_until_next_period();
|
||||
sleep_until_next_period
|
||||
---------------------------------------------------------------------
|
||||
|
@ -503,13 +503,17 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
|
|||
t
|
||||
(1 row)
|
||||
|
||||
DELETE FROM dist_tbl_text WHERE a = '/b*c/de';
|
||||
DELETE FROM dist_tbl_text WHERE a = '/bcde';
|
||||
DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
|
||||
DELETE FROM dist_tbl_text WHERE a = 'bcde*';
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 1 | 0 | 1 | 0
|
||||
/bcde | 1 | 0 | 1 | 0
|
||||
äbc | 1 | 0 | 1 | 0
|
||||
bcde* | 1 | 0 | 1 | 0
|
||||
/b*c/de | 1 | 0 | 2 | 0
|
||||
/bcde | 1 | 0 | 2 | 0
|
||||
äbc | 1 | 0 | 2 | 0
|
||||
bcde* | 1 | 0 | 2 | 0
|
||||
(4 rows)
|
||||
|
||||
-- test local cached queries & prepared statements
|
||||
|
@ -589,10 +593,10 @@ EXECUTE dist_tbl_text_select_plan('bcde*');
|
|||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 4 | 0 | 4 | 0
|
||||
/bcde | 4 | 0 | 4 | 0
|
||||
äbc | 4 | 0 | 4 | 0
|
||||
bcde* | 4 | 0 | 4 | 0
|
||||
/b*c/de | 4 | 0 | 5 | 0
|
||||
/bcde | 4 | 0 | 5 | 0
|
||||
äbc | 4 | 0 | 5 | 0
|
||||
bcde* | 4 | 0 | 5 | 0
|
||||
(4 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
|
@ -675,10 +679,10 @@ SET search_path TO citus_stat_tenants;
|
|||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute;
|
||||
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 7 | 0 | 7 | 0
|
||||
/bcde | 7 | 0 | 7 | 0
|
||||
äbc | 7 | 0 | 7 | 0
|
||||
bcde* | 7 | 0 | 7 | 0
|
||||
/b*c/de | 7 | 0 | 8 | 0
|
||||
/bcde | 7 | 0 | 8 | 0
|
||||
äbc | 7 | 0 | 8 | 0
|
||||
bcde* | 7 | 0 | 8 | 0
|
||||
(4 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
|
@ -741,5 +745,131 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local();
|
|||
|
||||
RESET ROLE;
|
||||
DROP ROLE stats_non_superuser;
|
||||
-- test function push down
|
||||
CREATE OR REPLACE FUNCTION
|
||||
select_from_dist_tbl_text(p_keyword text)
|
||||
RETURNS boolean LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1);
|
||||
END;
|
||||
$fn$;
|
||||
SELECT create_distributed_function(
|
||||
'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text'
|
||||
);
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
citus_stat_tenants_reset
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT select_from_dist_tbl_text('/b*c/de');
|
||||
select_from_dist_tbl_text
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT select_from_dist_tbl_text('/b*c/de');
|
||||
select_from_dist_tbl_text
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||
select_from_dist_tbl_text
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||
select_from_dist_tbl_text
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
tenant_attribute | query_count_in_this_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 2
|
||||
äbc | 2
|
||||
(2 rows)
|
||||
|
||||
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
|
||||
p_keyword text
|
||||
)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
PERFORM select_from_dist_tbl_text(p_keyword);
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0;
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text;
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
tenant_attribute | query_count_in_this_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 8
|
||||
äbc | 8
|
||||
(2 rows)
|
||||
|
||||
CREATE OR REPLACE VIEW
|
||||
select_from_dist_tbl_text_view
|
||||
AS
|
||||
SELECT * FROM citus_stat_tenants.dist_tbl_text;
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
tenant_attribute | query_count_in_this_period
|
||||
---------------------------------------------------------------------
|
||||
/b*c/de | 11
|
||||
äbc | 11
|
||||
(2 rows)
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA citus_stat_tenants CASCADE;
|
||||
|
|
|
@ -93,7 +93,7 @@ FROM citus_stat_tenants_local
|
|||
ORDER BY tenant_attribute;
|
||||
|
||||
-- simulate passing the period
|
||||
SET citus.stat_tenants_period TO 2;
|
||||
SET citus.stat_tenants_period TO 5;
|
||||
SELECT sleep_until_next_period();
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period,
|
||||
|
@ -174,6 +174,11 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
|
|||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
|
||||
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
|
||||
|
||||
DELETE FROM dist_tbl_text WHERE a = '/b*c/de';
|
||||
DELETE FROM dist_tbl_text WHERE a = '/bcde';
|
||||
DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
|
||||
DELETE FROM dist_tbl_text WHERE a = 'bcde*';
|
||||
|
||||
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute;
|
||||
|
||||
-- test local cached queries & prepared statements
|
||||
|
@ -247,5 +252,64 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local();
|
|||
RESET ROLE;
|
||||
DROP ROLE stats_non_superuser;
|
||||
|
||||
-- test function push down
|
||||
CREATE OR REPLACE FUNCTION
|
||||
select_from_dist_tbl_text(p_keyword text)
|
||||
RETURNS boolean LANGUAGE plpgsql AS $fn$
|
||||
BEGIN
|
||||
RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1);
|
||||
END;
|
||||
$fn$;
|
||||
|
||||
SELECT create_distributed_function(
|
||||
'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text'
|
||||
);
|
||||
|
||||
SELECT citus_stat_tenants_reset();
|
||||
|
||||
SELECT select_from_dist_tbl_text('/b*c/de');
|
||||
SELECT select_from_dist_tbl_text('/b*c/de');
|
||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||
SELECT select_from_dist_tbl_text(U&'\0061\0308bc');
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
|
||||
CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc(
|
||||
p_keyword text
|
||||
)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
PERFORM select_from_dist_tbl_text(p_keyword);
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0;
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text;
|
||||
PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc');
|
||||
CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL);
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
|
||||
CREATE OR REPLACE VIEW
|
||||
select_from_dist_tbl_text_view
|
||||
AS
|
||||
SELECT * FROM citus_stat_tenants.dist_tbl_text;
|
||||
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de';
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc';
|
||||
|
||||
SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA citus_stat_tenants CASCADE;
|
||||
|
|
Loading…
Reference in New Issue