diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index d946e15c8..bffecb4ac 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -104,6 +104,7 @@ #include "optimizer/optimizer.h" #include "nodes/params.h" #include "utils/snapmgr.h" +#include "distributed/utils/attribute.h" /* controlled via a GUC */ bool EnableLocalExecution = true; @@ -647,6 +648,8 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString, LocalExecutorShardId = task->anchorShardId; } + AttributeTask(task->partitionColumn, task->colocationId, taskPlan->commandType); + PG_TRY(); { processedRows = ExecuteTaskPlan(taskPlan, queryString, tupleDest, task, diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c index 7b697ae06..8c34400e5 100644 --- a/src/backend/distributed/utils/attribute.c +++ b/src/backend/distributed/utils/attribute.c @@ -42,8 +42,8 @@ ExecutorEnd_hook_type prev_ExecutorEnd = NULL; /* TODO maybe needs to be a stack */ char attributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = ""; -CmdType attributeCommandType = CMD_UNKNOWN; -int colocationGroupId = -1; +CmdType attributeToCommandType = CMD_UNKNOWN; +int attributeToColocationGroupId = INVALID_COLOCATION_ID; const char *SharedMemoryNameForMultiTenantMonitor = "Shared memory for multi tenant monitor"; @@ -197,8 +197,6 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) { strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); - attributeCommandType = commandType; - if (query_string == NULL) { return; @@ -212,21 +210,36 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation)); text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId"); + char *tenantId = NULL; if (tenantIdTextP != NULL) { - char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); - strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId, - MAX_TENANT_ATTRIBUTE_LENGTH - 1); + tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP)); } - colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId", - INVALID_COLOCATION_ID); + int colocationId = ExtractFieldInt32(jsonbDatum, "cId", + INVALID_COLOCATION_ID); + + AttributeTask(tenantId, colocationId, commandType); } } - else +} + + +/* + * AttributeTask assigns the given attributes of a tenant and starts a timer + */ +void +AttributeTask(char *tenantId, int colocationId, CmdType commandType) +{ + if (tenantId == NULL || colocationId == INVALID_COLOCATION_ID) { - strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); + return; } + + attributeToColocationGroupId = colocationId; + strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId, + MAX_TENANT_ATTRIBUTE_LENGTH - 1); + attributeToCommandType = commandType; } @@ -493,13 +506,13 @@ RecordTenantStats(TenantStats *tenantStats) tenantStats->score = LLONG_MAX; } - if (attributeCommandType == CMD_SELECT) + if (attributeToCommandType == CMD_SELECT) { tenantStats->readsInThisPeriod++; } - else if (attributeCommandType == CMD_UPDATE || - attributeCommandType == CMD_INSERT || - attributeCommandType == CMD_DELETE) + else if (attributeToCommandType == CMD_UPDATE || + attributeToCommandType == CMD_INSERT || + attributeToCommandType == CMD_DELETE) { tenantStats->writesInThisPeriod++; } @@ -609,7 +622,7 @@ CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime) strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant); - monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId; + monitor->tenants[tenantIndex].colocationGroupId = attributeToColocationGroupId; monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId(); monitor->tenants[tenantIndex].namedLockTranche.trancheName = tenantTrancheName; @@ -635,7 +648,7 @@ FindTenantStats(MultiTenantMonitor *monitor) { TenantStats *tenantStats = &monitor->tenants[i]; if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && - tenantStats->colocationGroupId == colocationGroupId) + tenantStats->colocationGroupId == attributeToColocationGroupId) { return i; } diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h index b4d8bb607..f56fec4f2 100644 --- a/src/include/distributed/utils/attribute.h +++ b/src/include/distributed/utils/attribute.h @@ -92,6 +92,7 @@ extern void CitusAttributeToEnd(QueryDesc *queryDesc); extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId); extern void InitializeMultiTenantMonitorSMHandleManagement(void); +extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType); extern ExecutorEnd_hook_type prev_ExecutorEnd; diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out index c0a8c896e..34269ca05 100644 --- a/src/test/regress/expected/citus_stats_tenants.out +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -401,6 +401,7 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q (10 rows) \c - - - :worker_2_port +SET search_path TO citus_stats_tenants; SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period --------------------------------------------------------------------- @@ -416,6 +417,217 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q bcde*/ | 1 | 0 | 1 | 0 (10 rows) +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + +(2 rows) + +-- test local queries +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 1 | 0 | 1 | 0 + /bcde | 1 | 0 | 1 | 0 + äbc | 1 | 0 | 1 | 0 + bcde* | 1 | 0 | 1 | 0 +(4 rows) + +-- test local cached queries & prepared statements +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 4 | 0 | 4 | 0 + /bcde | 4 | 0 | 4 | 0 + äbc | 4 | 0 | 4 | 0 + bcde* | 4 | 0 | 4 | 0 +(4 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('/bcde'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +EXECUTE dist_tbl_text_select_plan('bcde*'); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SET search_path TO citus_stats_tenants; +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + /b*c/de | 7 | 0 | 7 | 0 + /bcde | 7 | 0 | 7 | 0 + äbc | 7 | 0 | 7 | 0 + bcde* | 7 | 0 | 7 | 0 +(4 rows) + \c - - - :master_port SET search_path TO citus_stats_tenants; SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql index daafea712..451989be4 100644 --- a/src/test/regress/sql/citus_stats_tenants.sql +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -129,7 +129,64 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; \c - - - :worker_1_port SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; \c - - - :worker_2_port +SET search_path TO citus_stats_tenants; + SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + +-- test local queries +-- all of these distribution column values are from second worker + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*'; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; + +-- test local cached queries & prepared statements + +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute; + +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1; + +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); +EXECUTE dist_tbl_text_select_plan('/b*c/de'); +EXECUTE dist_tbl_text_select_plan('/bcde'); +EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc'); +EXECUTE dist_tbl_text_select_plan('bcde*'); + +\c - - - :worker_2_port +SET search_path TO citus_stats_tenants; + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + \c - - - :master_port SET search_path TO citus_stats_tenants;