diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7cdc896e3..c9f4c8fb9 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -129,6 +129,8 @@ static void LogLocalCommand(Task *task); static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings, TupleDestination *tupleDest, Task *task); +static void SetColocationIdAndPartitionKeyValueForTasks(List *taskList, + Job *distributedPlan); static void LocallyExecuteUtilityTask(Task *task); static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery); static void EnsureTransitionPossible(LocalExecutionStatus from, @@ -228,6 +230,17 @@ ExecuteLocalTaskListExtended(List *taskList, EnsureTaskExecutionAllowed(isRemote); } + /* + * If workerJob has a partitionKeyValue, we need to set the colocation id + * and partition key value for each task before we start executing them + * because tenant stats are collected based on these values of a task. + */ + if (distributedPlan != NULL && distributedPlan->workerJob != NULL && taskList != NIL) + { + SetJobColocationId(distributedPlan->workerJob); + SetColocationIdAndPartitionKeyValueForTasks(taskList, distributedPlan->workerJob); + } + /* * Use a new memory context that gets reset after every task to free * the deparsed query string and query plan. @@ -367,6 +380,26 @@ ExecuteLocalTaskListExtended(List *taskList, } +/* + * SetColocationIdAndPartitionKeyValueForTasks sets colocationId and partitionKeyValue + * for the tasks in the taskList if workerJob has a colocationId and partitionKeyValue. + */ +static void +SetColocationIdAndPartitionKeyValueForTasks(List *taskList, Job *workerJob) +{ + if (workerJob->colocationId != 0 && + workerJob->partitionKeyValue != NULL) + { + Task *task = NULL; + foreach_ptr(task, taskList) + { + task->colocationId = workerJob->colocationId; + task->partitionKeyValue = workerJob->partitionKeyValue; + } + } +} + + /* * LocallyPlanAndExecuteMultipleQueries plans and executes the given query strings * one by one. diff --git a/src/test/regress/expected/citus_stat_tenants.out b/src/test/regress/expected/citus_stat_tenants.out index 7e75b67fe..c1f07ccaa 100644 --- a/src/test/regress/expected/citus_stat_tenants.out +++ b/src/test/regress/expected/citus_stat_tenants.out @@ -255,7 +255,7 @@ ORDER BY tenant_attribute; (2 rows) -- simulate passing the period -SET citus.stat_tenants_period TO 2; +SET citus.stat_tenants_period TO 5; SELECT sleep_until_next_period(); sleep_until_next_period --------------------------------------------------------------------- @@ -503,13 +503,17 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; t (1 row) +DELETE FROM dist_tbl_text WHERE a = '/b*c/de'; +DELETE FROM dist_tbl_text WHERE a = '/bcde'; +DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; +DELETE FROM dist_tbl_text WHERE a = 'bcde*'; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period --------------------------------------------------------------------- - /b*c/de | 1 | 0 | 1 | 0 - /bcde | 1 | 0 | 1 | 0 - äbc | 1 | 0 | 1 | 0 - bcde* | 1 | 0 | 1 | 0 + /b*c/de | 1 | 0 | 2 | 0 + /bcde | 1 | 0 | 2 | 0 + äbc | 1 | 0 | 2 | 0 + bcde* | 1 | 0 | 2 | 0 (4 rows) -- test local cached queries & prepared statements @@ -589,10 +593,10 @@ EXECUTE dist_tbl_text_select_plan('bcde*'); SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period --------------------------------------------------------------------- - /b*c/de | 4 | 0 | 4 | 0 - /bcde | 4 | 0 | 4 | 0 - äbc | 4 | 0 | 4 | 0 - bcde* | 4 | 0 | 4 | 0 + /b*c/de | 4 | 0 | 5 | 0 + /bcde | 4 | 0 | 5 | 0 + äbc | 4 | 0 | 5 | 0 + bcde* | 4 | 0 | 5 | 0 (4 rows) \c - - - :master_port @@ -675,10 +679,10 @@ SET search_path TO citus_stat_tenants; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants ORDER BY tenant_attribute; tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period --------------------------------------------------------------------- - /b*c/de | 7 | 0 | 7 | 0 - /bcde | 7 | 0 | 7 | 0 - äbc | 7 | 0 | 7 | 0 - bcde* | 7 | 0 | 7 | 0 + /b*c/de | 7 | 0 | 8 | 0 + /bcde | 7 | 0 | 8 | 0 + äbc | 7 | 0 | 8 | 0 + bcde* | 7 | 0 | 8 | 0 (4 rows) \c - - - :master_port @@ -741,5 +745,131 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local(); RESET ROLE; DROP ROLE stats_non_superuser; +-- test function push down +CREATE OR REPLACE FUNCTION + select_from_dist_tbl_text(p_keyword text) +RETURNS boolean LANGUAGE plpgsql AS $fn$ +BEGIN + RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1); +END; +$fn$; +SELECT create_distributed_function( + 'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text' +); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_stat_tenants_reset(); + citus_stat_tenants_reset +--------------------------------------------------------------------- + +(1 row) + +SELECT select_from_dist_tbl_text('/b*c/de'); + select_from_dist_tbl_text +--------------------------------------------------------------------- + t +(1 row) + +SELECT select_from_dist_tbl_text('/b*c/de'); + select_from_dist_tbl_text +--------------------------------------------------------------------- + t +(1 row) + +SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); + select_from_dist_tbl_text +--------------------------------------------------------------------- + t +(1 row) + +SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); + select_from_dist_tbl_text +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + /b*c/de | 2 + äbc | 2 +(2 rows) + +CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc( + p_keyword text +) +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM select_from_dist_tbl_text(p_keyword); + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0; + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text; + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword; + COMMIT; +END;$$; +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL); +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + /b*c/de | 8 + äbc | 8 +(2 rows) + +CREATE OR REPLACE VIEW + select_from_dist_tbl_text_view +AS + SELECT * FROM citus_stat_tenants.dist_tbl_text; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + /b*c/de | 11 + äbc | 11 +(2 rows) + SET client_min_messages TO ERROR; DROP SCHEMA citus_stat_tenants CASCADE; diff --git a/src/test/regress/sql/citus_stat_tenants.sql b/src/test/regress/sql/citus_stat_tenants.sql index f327aefa6..af44c7f1e 100644 --- a/src/test/regress/sql/citus_stat_tenants.sql +++ b/src/test/regress/sql/citus_stat_tenants.sql @@ -93,7 +93,7 @@ FROM citus_stat_tenants_local ORDER BY tenant_attribute; -- simulate passing the period -SET citus.stat_tenants_period TO 2; +SET citus.stat_tenants_period TO 5; SELECT sleep_until_next_period(); SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period, @@ -174,6 +174,11 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; +DELETE FROM dist_tbl_text WHERE a = '/b*c/de'; +DELETE FROM dist_tbl_text WHERE a = '/bcde'; +DELETE FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; +DELETE FROM dist_tbl_text WHERE a = 'bcde*'; + SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stat_tenants_local ORDER BY tenant_attribute; -- test local cached queries & prepared statements @@ -247,5 +252,64 @@ SELECT count(*)>=0 FROM citus_stat_tenants_local(); RESET ROLE; DROP ROLE stats_non_superuser; +-- test function push down +CREATE OR REPLACE FUNCTION + select_from_dist_tbl_text(p_keyword text) +RETURNS boolean LANGUAGE plpgsql AS $fn$ +BEGIN + RETURN(SELECT count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = $1); +END; +$fn$; + +SELECT create_distributed_function( + 'select_from_dist_tbl_text(text)', 'p_keyword', colocate_with => 'dist_tbl_text' +); + +SELECT citus_stat_tenants_reset(); + +SELECT select_from_dist_tbl_text('/b*c/de'); +SELECT select_from_dist_tbl_text('/b*c/de'); +SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); +SELECT select_from_dist_tbl_text(U&'\0061\0308bc'); + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + +CREATE OR REPLACE PROCEDURE select_from_dist_tbl_text_proc( + p_keyword text +) +LANGUAGE plpgsql +AS $$ +BEGIN + PERFORM select_from_dist_tbl_text(p_keyword); + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE b < 0; + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text; + PERFORM count(*)>=0 FROM citus_stat_tenants.dist_tbl_text WHERE a = p_keyword; + COMMIT; +END;$$; + +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc('/b*c/de'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(U&'\0061\0308bc'); +CALL citus_stat_tenants.select_from_dist_tbl_text_proc(NULL); + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + +CREATE OR REPLACE VIEW + select_from_dist_tbl_text_view +AS + SELECT * FROM citus_stat_tenants.dist_tbl_text; + +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; +SELECT count(*)>=0 FROM select_from_dist_tbl_text_view WHERE a = U&'\0061\0308bc'; + +SELECT tenant_attribute, query_count_in_this_period FROM citus_stat_tenants; + SET client_min_messages TO ERROR; DROP SCHEMA citus_stat_tenants CASCADE;