Attribute local queries and cached plans on local execution (#6797)

multi-tenant-monitoring-pgbench
Gokhan Gulbiz 2023-03-30 18:42:40 +03:00 committed by Halil Ozan Akgul
parent 1b7d075da0
commit d2f8066adb
5 changed files with 303 additions and 17 deletions

View File

@ -104,6 +104,7 @@
#include "optimizer/optimizer.h"
#include "nodes/params.h"
#include "utils/snapmgr.h"
#include "distributed/utils/attribute.h"
/* controlled via a GUC */
bool EnableLocalExecution = true;
@ -647,6 +648,8 @@ LocallyExecuteTaskPlan(PlannedStmt *taskPlan, char *queryString,
LocalExecutorShardId = task->anchorShardId;
}
AttributeTask(task->partitionColumn, task->colocationId, taskPlan->commandType);
PG_TRY();
{
processedRows = ExecuteTaskPlan(taskPlan, queryString, tupleDest, task,

View File

@ -42,8 +42,8 @@ ExecutorEnd_hook_type prev_ExecutorEnd = NULL;
/* TODO maybe needs to be a stack */
char attributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = "";
CmdType attributeCommandType = CMD_UNKNOWN;
int colocationGroupId = -1;
CmdType attributeToCommandType = CMD_UNKNOWN;
int attributeToColocationGroupId = INVALID_COLOCATION_ID;
const char *SharedMemoryNameForMultiTenantMonitor =
"Shared memory for multi tenant monitor";
@ -197,8 +197,6 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
{
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
attributeCommandType = commandType;
if (query_string == NULL)
{
return;
@ -212,21 +210,36 @@ AttributeQueryIfAnnotated(const char *query_string, CmdType commandType)
Datum jsonbDatum = DirectFunctionCall1(jsonb_in, PointerGetDatum(annotation));
text *tenantIdTextP = ExtractFieldTextP(jsonbDatum, "tId");
char *tenantId = NULL;
if (tenantIdTextP != NULL)
{
char *tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
tenantId = UnescapeCommentChars(text_to_cstring(tenantIdTextP));
}
colocationGroupId = ExtractFieldInt32(jsonbDatum, "cId",
int colocationId = ExtractFieldInt32(jsonbDatum, "cId",
INVALID_COLOCATION_ID);
AttributeTask(tenantId, colocationId, commandType);
}
}
else
}
/*
* AttributeTask assigns the given attributes of a tenant and starts a timer
*/
void
AttributeTask(char *tenantId, int colocationId, CmdType commandType)
{
strcpy_s(attributeToTenant, sizeof(attributeToTenant), "");
if (tenantId == NULL || colocationId == INVALID_COLOCATION_ID)
{
return;
}
attributeToColocationGroupId = colocationId;
strncpy_s(attributeToTenant, MAX_TENANT_ATTRIBUTE_LENGTH, tenantId,
MAX_TENANT_ATTRIBUTE_LENGTH - 1);
attributeToCommandType = commandType;
}
@ -493,13 +506,13 @@ RecordTenantStats(TenantStats *tenantStats)
tenantStats->score = LLONG_MAX;
}
if (attributeCommandType == CMD_SELECT)
if (attributeToCommandType == CMD_SELECT)
{
tenantStats->readsInThisPeriod++;
}
else if (attributeCommandType == CMD_UPDATE ||
attributeCommandType == CMD_INSERT ||
attributeCommandType == CMD_DELETE)
else if (attributeToCommandType == CMD_UPDATE ||
attributeToCommandType == CMD_INSERT ||
attributeToCommandType == CMD_DELETE)
{
tenantStats->writesInThisPeriod++;
}
@ -609,7 +622,7 @@ CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime)
strcpy_s(monitor->tenants[tenantIndex].tenantAttribute,
sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant);
monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId;
monitor->tenants[tenantIndex].colocationGroupId = attributeToColocationGroupId;
monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId();
monitor->tenants[tenantIndex].namedLockTranche.trancheName = tenantTrancheName;
@ -635,7 +648,7 @@ FindTenantStats(MultiTenantMonitor *monitor)
{
TenantStats *tenantStats = &monitor->tenants[i];
if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 &&
tenantStats->colocationGroupId == colocationGroupId)
tenantStats->colocationGroupId == attributeToColocationGroupId)
{
return i;
}

View File

@ -92,6 +92,7 @@ extern void CitusAttributeToEnd(QueryDesc *queryDesc);
extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType);
extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId);
extern void InitializeMultiTenantMonitorSMHandleManagement(void);
extern void AttributeTask(char *tenantId, int colocationGroupId, CmdType commandType);
extern ExecutorEnd_hook_type prev_ExecutorEnd;

View File

@ -401,6 +401,7 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
(10 rows)
\c - - - :worker_2_port
SET search_path TO citus_stats_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
@ -416,6 +417,217 @@ SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, q
bcde*/ | 1 | 0 | 1 | 0
(10 rows)
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
result
---------------------------------------------------------------------
(2 rows)
-- test local queries
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 1 | 0 | 1 | 0
/bcde | 1 | 0 | 1 | 0
äbc | 1 | 0 | 1 | 0
bcde* | 1 | 0 | 1 | 0
(4 rows)
-- test local cached queries & prepared statements
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 4 | 0 | 4 | 0
/bcde | 4 | 0 | 4 | 0
äbc | 4 | 0 | 4 | 0
bcde* | 4 | 0 | 4 | 0
(4 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/b*c/de');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('/bcde');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
?column?
---------------------------------------------------------------------
t
(1 row)
EXECUTE dist_tbl_text_select_plan('bcde*');
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_2_port
SET search_path TO citus_stats_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period
---------------------------------------------------------------------
/b*c/de | 7 | 0 | 7 | 0
/bcde | 7 | 0 | 7 | 0
äbc | 7 | 0 | 7 | 0
bcde* | 7 | 0 | 7 | 0
(4 rows)
\c - - - :master_port
SET search_path TO citus_stats_tenants;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');

View File

@ -129,7 +129,64 @@ SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
\c - - - :worker_1_port
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :worker_2_port
SET search_path TO citus_stats_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()');
-- test local queries
-- all of these distribution column values are from second worker
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/b*c/de';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = '/bcde';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = U&'\0061\0308bc';
SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde*';
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
-- test local cached queries & prepared statements
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants_local ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;
PREPARE dist_tbl_text_select_plan (text) AS SELECT count(*)>=0 FROM dist_tbl_text WHERE a = $1;
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
EXECUTE dist_tbl_text_select_plan('/b*c/de');
EXECUTE dist_tbl_text_select_plan('/bcde');
EXECUTE dist_tbl_text_select_plan(U&'\0061\0308bc');
EXECUTE dist_tbl_text_select_plan('bcde*');
\c - - - :worker_2_port
SET search_path TO citus_stats_tenants;
SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute;
\c - - - :master_port
SET search_path TO citus_stats_tenants;