diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 5e4afd1a7..28486f23d 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -66,7 +66,6 @@ static DistributedPlan * CopyDistributedPlanWithoutCache( DistributedPlan *originalDistributedPlan); static void CitusEndScan(CustomScanState *node); static void CitusReScan(CustomScanState *node); -static void SetJobColocationId(Job *job); static void EnsureForceDelegationDistributionKey(Job *job); static void EnsureAnchorShardsInJobExist(Job *job); static bool AnchorShardsInTaskListExist(List *taskList); @@ -892,7 +891,7 @@ IsCitusCustomScan(Plan *plan) * colocation group, the Job's colocation ID is set to the group ID, else, * it will be set to INVALID_COLOCATION_ID. */ -static void +void SetJobColocationId(Job *job) { uint32 jobColocationId = INVALID_COLOCATION_ID; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index e62821ad0..136268ba0 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -26,6 +26,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/shard_utils.h" +#include "distributed/utils/attribute.h" #include "distributed/version_compat.h" #include "lib/stringinfo.h" #include "nodes/makefuncs.h" @@ -141,6 +142,21 @@ RebuildQueryStrings(Job *workerJob) ? "(null)" : TaskQueryString(task)))); + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (workerJob->partitionKeyValue != NULL) + { + partitionColumnValue = workerJob->partitionKeyValue->constvalue; + partitionColumnType = workerJob->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); + } + + task->partitionColumn = partitionColumnString; + SetJobColocationId(workerJob); + task->colocationId = workerJob->colocationId; + UpdateTaskQueryString(query, task); /* @@ -387,7 +403,8 @@ SetTaskQueryIfShouldLazyDeparse(Task *task, Query *query) return; } - SetTaskQueryString(task, DeparseTaskQuery(task, query)); + SetTaskQueryString(task, AnnotateQuery(DeparseTaskQuery(task, query), + task->partitionColumn, task->colocationId)); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index eb9e21786..b395e5b24 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -49,6 +49,7 @@ #include "distributed/recursive_planning.h" #include "distributed/shardinterval_utils.h" #include "distributed/shard_utils.h" +#include "distributed/utils/attribute.h" #include "distributed/version_compat.h" #include "distributed/worker_shard_visibility.h" #include "executor/executor.h" @@ -157,6 +158,8 @@ distributed_planner(Query *parse, bool fastPathRouterQuery = false; Node *distributionKeyValue = NULL; + AttributeQueryIfAnnotated(query_string, parse->commandType); + List *rangeTableList = ExtractRangeTableEntryList(parse); if (cursorOptions & CURSOR_OPT_FORCE_DISTRIBUTED) diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 407aeaf65..9603ccc2d 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -165,7 +165,8 @@ static int CompareInsertValuesByShardId(const void *leftElement, static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification); + bool isLocalTableModification, char *partitionColumn, + int colocationId); static bool RowLocksOnRelations(Node *node, List **rtiLockList); static void ReorderTaskPlacementsByTaskAssignmentPolicy(Job *job, TaskAssignmentPolicyType @@ -1951,11 +1952,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, if (originalQuery->commandType == CMD_SELECT) { + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (job->partitionKeyValue != NULL) + { + partitionColumnValue = job->partitionKeyValue->constvalue; + partitionColumnType = job->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); + } + + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + partitionColumnString, job->colocationId); /* * Queries to reference tables, or distributed tables with multiple replica's have @@ -1979,11 +1994,25 @@ GenerateSingleShardRouterTaskList(Job *job, List *relationShardList, } else { + Datum partitionColumnValue; + Oid partitionColumnType = 0; + char *partitionColumnString = NULL; + if (job->partitionKeyValue != NULL) + { + partitionColumnValue = job->partitionKeyValue->constvalue; + partitionColumnType = job->partitionKeyValue->consttype; + partitionColumnString = DatumToString(partitionColumnValue, + partitionColumnType); + } + + SetJobColocationId(job); + job->taskList = SingleShardTaskList(originalQuery, job->jobId, relationShardList, placementList, shardId, job->parametersInJobQueryResolved, - isLocalTableModification); + isLocalTableModification, + partitionColumnString, job->colocationId); } } @@ -2077,7 +2106,8 @@ static List * SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, List *placementList, uint64 shardId, bool parametersInQueryResolved, - bool isLocalTableModification) + bool isLocalTableModification, char *partitionColumn, + int colocationId) { TaskType taskType = READ_TASK; char replicationModel = 0; @@ -2147,6 +2177,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, * that the query cannot be executed locally. */ task->taskPlacementList = placementList; + task->partitionColumn = partitionColumn; + task->colocationId = colocationId; SetTaskQueryIfShouldLazyDeparse(task, query); task->anchorShardId = shardId; task->jobId = jobId; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 3c67d9b78..c390ee1c0 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -90,6 +90,7 @@ #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/transaction_recovery.h" +#include "distributed/utils/attribute.h" #include "distributed/utils/directory.h" #include "distributed/worker_log_messages.h" #include "distributed/worker_manager.h" @@ -439,6 +440,8 @@ _PG_init(void) ExecutorStart_hook = CitusExecutorStart; ExecutorRun_hook = CitusExecutorRun; ExplainOneQuery_hook = CitusExplainOneQuery; + prev_ExecutorEnd = ExecutorEnd_hook; + ExecutorEnd_hook = CitusAttributeToEnd; /* register hook for error messages */ emit_log_hook = multi_log_hook; @@ -472,6 +475,8 @@ _PG_init(void) /* initialize shard split shared memory handle management */ InitializeShardSplitSMHandleManagement(); + InitializeMultiTenantMonitorSMHandleManagement(); + /* enable modification of pg_catalog tables during pg_upgrade */ if (IsBinaryUpgrade) { @@ -1899,6 +1904,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.multi_tenant_monitoring_log_level", + gettext_noop("Sets the level of multi tenant monitoring log messages"), + NULL, + &MultiTenantMonitoringLogLevel, + CITUS_LOG_LEVEL_OFF, log_level_options, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.next_cleanup_record_id", gettext_noop("Set the next cleanup record ID to use in operation creation."), @@ -2283,6 +2298,26 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.stats_tenants_limit", + gettext_noop("monitor limit"), + NULL, + &CitusStatsTenantsLimit, + 10, 1, 100, + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); + + DefineCustomIntVariable( + "citus.stats_tenants_period", + gettext_noop("monitor period"), + NULL, + &CitusStatsTenantsPeriod, + 60, 1, 1000000000, + PGC_USERSET, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.subquery_pushdown", gettext_noop("Usage of this GUC is highly discouraged, please read the long " diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index 981c5f375..73a2bf8a9 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -2,3 +2,4 @@ -- bump version to 11.3-1 +#include "udfs/citus_stats_tenants/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index 7d71235d7..47d2701ac 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -1,2 +1,4 @@ -- citus--11.3-1--11.2-1 --- this is an empty downgrade path since citus--11.2-1--11.3-1.sql is empty for now + +DROP VIEW pg_catalog.citus_stats_tenants; +DROP FUNCTION pg_catalog.citus_stats_tenants(boolean); diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql new file mode 100644 index 000000000..f476a9c28 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants/11.3-1.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stats_tenants$$; + + +CREATE OR REPLACE VIEW citus.citus_stats_tenants AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stats_tenants() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql b/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql new file mode 100644 index 000000000..f476a9c28 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_stats_tenants/latest.sql @@ -0,0 +1,27 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_stats_tenants( + return_all_tenants BOOLEAN DEFAULT FALSE, + OUT colocation_id INT, + OUT tenant_attribute TEXT, + OUT read_count_in_this_period INT, + OUT read_count_in_last_period INT, + OUT query_count_in_this_period INT, + OUT query_count_in_last_period INT, + OUT score BIGINT) +RETURNS SETOF RECORD +LANGUAGE C +AS 'citus', $$citus_stats_tenants$$; + + +CREATE OR REPLACE VIEW citus.citus_stats_tenants AS +SELECT + colocation_id, + tenant_attribute, + read_count_in_this_period, + read_count_in_last_period, + query_count_in_this_period, + query_count_in_last_period +FROM pg_catalog.citus_stats_tenants() +ORDER BY score DESC; + +ALTER VIEW citus.citus_stats_tenants SET SCHEMA pg_catalog; +GRANT SELECT ON pg_catalog.citus_stats_tenants TO PUBLIC; diff --git a/src/backend/distributed/utils/attribute.c b/src/backend/distributed/utils/attribute.c new file mode 100644 index 000000000..9b2d65e95 --- /dev/null +++ b/src/backend/distributed/utils/attribute.c @@ -0,0 +1,688 @@ +/*------------------------------------------------------------------------- + * + * attribute.c + * Routines related to the multi tenant monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "unistd.h" + +#include "distributed/citus_safe_lib.h" +#include "distributed/log_utils.h" +#include "distributed/listutils.h" +#include "distributed/metadata_cache.h" +#include "distributed/tuplestore.h" +#include "executor/execdesc.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include +#include "utils/builtins.h" + +#include "distributed/utils/attribute.h" + +#include + +static void AttributeMetricsIfApplicable(void); + +ExecutorEnd_hook_type prev_ExecutorEnd = NULL; + +#define ATTRIBUTE_PREFIX "/* attributeTo: " +#define ATTRIBUTE_STRING_FORMAT "/* attributeTo: %s,%d */" +#define CITUS_STATS_TENANTS_COLUMNS 7 +#define ONE_QUERY_SCORE 1000000000 + +/* TODO maybe needs to be a stack */ +char attributeToTenant[MAX_TENANT_ATTRIBUTE_LENGTH] = ""; +CmdType attributeCommandType = CMD_UNKNOWN; +int colocationGroupId = -1; +clock_t attributeToTenantStart = { 0 }; + +const char *SharedMemoryNameForMultiTenantMonitor = + "Shared memory for multi tenant monitor"; + +char *tenantTrancheName = "Tenant Tranche"; +char *monitorTrancheName = "Multi Tenant Monitor Tranche"; + +static shmem_startup_hook_type prev_shmem_startup_hook = NULL; + +static int CompareTenantScore(const void *leftElement, const void *rightElement); +static void UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime); +static void ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime); +static void EvictTenantsIfNecessary(time_t queryTime); +static void RecordTenantStats(TenantStats *tenantStats); +static void CreateMultiTenantMonitor(void); +static MultiTenantMonitor * CreateSharedMemoryForMultiTenantMonitor(void); +static MultiTenantMonitor * GetMultiTenantMonitor(void); +static void MultiTenantMonitorSMInit(void); +static int CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime); +static int FindTenantStats(MultiTenantMonitor *monitor); +static size_t MultiTenantMonitorshmemSize(void); + +int MultiTenantMonitoringLogLevel = CITUS_LOG_LEVEL_OFF; +int CitusStatsTenantsPeriod = (time_t) 60; +int CitusStatsTenantsLimit = 10; + + +PG_FUNCTION_INFO_V1(citus_stats_tenants); +PG_FUNCTION_INFO_V1(clean_citus_stats_tenants); +PG_FUNCTION_INFO_V1(sleep_until_next_period); + + +/* + * citus_stats_tenants finds, updates and returns the statistics for tenants. + */ +Datum +citus_stats_tenants(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + /* + * We keep more than CitusStatsTenantsLimit tenants in our monitor. + * We do this to not lose data if a tenant falls out of top CitusStatsTenantsLimit in case they need to return soon. + * Normally we return CitusStatsTenantsLimit tenants but if returnAllTenants is true we return all of them. + */ + bool returnAllTenants = PG_GETARG_BOOL(0); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + time_t monitoringTime = time(0); + + Datum values[CITUS_STATS_TENANTS_COLUMNS]; + bool isNulls[CITUS_STATS_TENANTS_COLUMNS]; + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + if (monitor == NULL) + { + PG_RETURN_VOID(); + } + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + + int numberOfRowsToReturn = 0; + if (returnAllTenants) + { + numberOfRowsToReturn = monitor->tenantCount; + } + else + { + numberOfRowsToReturn = Min(monitor->tenantCount, CitusStatsTenantsLimit); + } + + for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + { + UpdatePeriodsIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); + ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], monitoringTime); + } + SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), + CompareTenantScore); + + for (int i = 0; i < numberOfRowsToReturn; i++) + { + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + TenantStats *tenantStats = &monitor->tenants[i]; + + values[0] = Int32GetDatum(tenantStats->colocationGroupId); + values[1] = PointerGetDatum(cstring_to_text(tenantStats->tenantAttribute)); + values[2] = Int32GetDatum(tenantStats->readsInThisPeriod); + values[3] = Int32GetDatum(tenantStats->readsInLastPeriod); + values[4] = Int32GetDatum(tenantStats->readsInThisPeriod + + tenantStats->writesInThisPeriod); + values[5] = Int32GetDatum(tenantStats->readsInLastPeriod + + tenantStats->writesInLastPeriod); + values[6] = Int64GetDatum(tenantStats->score); + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + } + + LWLockRelease(&monitor->lock); + + PG_RETURN_VOID(); +} + + +/* + * clean_citus_stats_tenants cleans the citus_stats_tenants monitor. + */ +Datum +clean_citus_stats_tenants(PG_FUNCTION_ARGS) +{ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + monitor->tenantCount = 0; + + PG_RETURN_VOID(); +} + + +/* + * sleep_until_next_period sleeps until the next monitoring period starts. + */ +Datum +sleep_until_next_period(PG_FUNCTION_ARGS) +{ + struct timeval currentTime; + gettimeofday(¤tTime, NULL); + + long int nextPeriodStart = currentTime.tv_sec - + (currentTime.tv_sec % CitusStatsTenantsPeriod) + + CitusStatsTenantsPeriod; + + long int sleepTime = (nextPeriodStart - currentTime.tv_sec) * 1000000 - + currentTime.tv_usec + 100000; + pg_usleep(sleepTime); + + PG_RETURN_VOID(); +} + + +/* + * AttributeQueryIfAnnotated checks the query annotation and if the query is annotated + * for the tenant statistics monitoring this function records the tenant attributes. + */ +void +AttributeQueryIfAnnotated(const char *query_string, CmdType commandType) +{ + strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); + + attributeCommandType = commandType; + + if (query_string == NULL) + { + return; + } + + if (strncmp(ATTRIBUTE_PREFIX, query_string, strlen(ATTRIBUTE_PREFIX)) == 0) + { + /* TODO create a function to safely parse the tenant identifier from the query comment */ + /* query is attributed to a tenant */ + char *tenantId = (char *) query_string + strlen(ATTRIBUTE_PREFIX); + char *tenantEnd = tenantId; + while (true && tenantEnd[0] != '\0') + { + if (tenantEnd[0] == ' ' && tenantEnd[1] == '*' && tenantEnd[2] == '/') + { + break; + } + + tenantEnd++; + } + tenantEnd--; + + colocationGroupId = 0; + while (*tenantEnd != ',') + { + colocationGroupId *= 10; + colocationGroupId += *tenantEnd - '0'; + tenantEnd--; + } + + int t = colocationGroupId; + colocationGroupId = 0; + while (t) + { + colocationGroupId *= 10; + colocationGroupId += t % 10; + t /= 10; + } + + /* hack to get a clean copy of the tenant id string */ + char tenantEndTmp = *tenantEnd; + *tenantEnd = '\0'; + tenantId = pstrdup(tenantId); + *tenantEnd = tenantEndTmp; + + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg("attributing query to tenant: %s", + quote_literal_cstr(tenantId)))); + } + + strcpy_s(attributeToTenant, sizeof(attributeToTenant), tenantId); + attributeToTenantStart = clock(); + } + else + { + strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); + } + + /*DetachSegment(); */ +} + + +/* + * AnnotateQuery annotates the query with tenant attributes. + */ +char * +AnnotateQuery(char *queryString, char *partitionColumn, int colocationId) +{ + if (partitionColumn == NULL) + { + return queryString; + } + StringInfo newQuery = makeStringInfo(); + appendStringInfo(newQuery, ATTRIBUTE_STRING_FORMAT, partitionColumn, colocationId); + + appendStringInfoString(newQuery, queryString); + + return newQuery->data; +} + + +/* + * CitusAttributeToEnd keeps the statistics for the tenant and calls the previously installed end hook + * or the standard executor end function. + */ +void +CitusAttributeToEnd(QueryDesc *queryDesc) +{ + /* + * At the end of the Executor is the last moment we have to attribute the previous + * attribution to a tenant, if applicable + */ + AttributeMetricsIfApplicable(); + + /* now call in to the previously installed hook, or the standard implementation */ + if (prev_ExecutorEnd) + { + prev_ExecutorEnd(queryDesc); + } + else + { + standard_ExecutorEnd(queryDesc); + } +} + + +/* + * CompareTenantScore is used to sort the tenant statistics by score + * in descending order. + */ +static int +CompareTenantScore(const void *leftElement, const void *rightElement) +{ + const TenantStats *leftTenant = (const TenantStats *) leftElement; + const TenantStats *rightTenant = (const TenantStats *) rightElement; + + if (leftTenant->score > rightTenant->score) + { + return -1; + } + else if (leftTenant->score < rightTenant->score) + { + return 1; + } + return 0; +} + + +/* + * AttributeMetricsIfApplicable updates the metrics for current tenant's statistics + */ +static void +AttributeMetricsIfApplicable() +{ + if (strcmp(attributeToTenant, "") != 0) + { + clock_t end = { 0 }; + + end = clock(); + time_t queryTime = time(0); + double cpu_time_used = ((double) (end - attributeToTenantStart)) / CLOCKS_PER_SEC; + + if (MultiTenantMonitoringLogLevel != CITUS_LOG_LEVEL_OFF) + { + ereport(NOTICE, (errmsg("attribute cpu counter (%f) to tenant: %s", + cpu_time_used, attributeToTenant))); + } + + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + LWLockAcquire(&monitor->lock, LW_SHARED); + + int currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex != -1) + { + TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); + + UpdatePeriodsIfNecessary(tenantStats, queryTime); + ReduceScoreIfNecessary(tenantStats, queryTime); + RecordTenantStats(tenantStats); + + LWLockRelease(&tenantStats->lock); + } + else + { + LWLockRelease(&monitor->lock); + + LWLockAcquire(&monitor->lock, LW_EXCLUSIVE); + currentTenantIndex = FindTenantStats(monitor); + + if (currentTenantIndex == -1) + { + currentTenantIndex = CreateTenantStats(monitor, queryTime); + } + + LWLockRelease(&monitor->lock); + + LWLockAcquire(&monitor->lock, LW_SHARED); + currentTenantIndex = FindTenantStats(monitor); + if (currentTenantIndex != -1) + { + TenantStats *tenantStats = &monitor->tenants[currentTenantIndex]; + LWLockAcquire(&tenantStats->lock, LW_EXCLUSIVE); + + UpdatePeriodsIfNecessary(tenantStats, queryTime); + ReduceScoreIfNecessary(tenantStats, queryTime); + RecordTenantStats(tenantStats); + + LWLockRelease(&tenantStats->lock); + } + } + LWLockRelease(&monitor->lock); + } + + strcpy_s(attributeToTenant, sizeof(attributeToTenant), ""); +} + + +/* + * UpdatePeriodsIfNecessary moves the query counts to previous periods if a enough time has passed. + * + * If 1 period has passed after the latest query, this function moves this period's counts to the last period + * and cleans this period's statistics. + * + * If 2 or more periods has passed after the last query, this function cleans all both this and last period's + * statistics. + */ +static void +UpdatePeriodsIfNecessary(TenantStats *tenantStats, time_t queryTime) +{ + time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod); + + /* + * If the last query in this tenant was before the start of current period + * but there are some query count for this period we move them to the last period. + */ + if (tenantStats->lastQueryTime < periodStart && + (tenantStats->writesInThisPeriod || tenantStats->readsInThisPeriod)) + { + tenantStats->writesInLastPeriod = tenantStats->writesInThisPeriod; + tenantStats->writesInThisPeriod = 0; + + tenantStats->readsInLastPeriod = tenantStats->readsInThisPeriod; + tenantStats->readsInThisPeriod = 0; + } + + /* + * If the last query is more than two periods ago, we clean the last period counts too. + */ + if (tenantStats->lastQueryTime < periodStart - CitusStatsTenantsPeriod) + { + tenantStats->writesInLastPeriod = 0; + + tenantStats->readsInLastPeriod = 0; + } + + tenantStats->lastQueryTime = queryTime; +} + + +/* + * ReduceScoreIfNecessary reduces the tenant score only if it is necessary. + * + * We halve the tenants' scores after each period. This function checks the number of + * periods that passed after the lsat score reduction and reduces the score accordingly. + */ +static void +ReduceScoreIfNecessary(TenantStats *tenantStats, time_t queryTime) +{ + time_t periodStart = queryTime - (queryTime % CitusStatsTenantsPeriod); + + /* + * With each query we increase the score of tenant by ONE_QUERY_SCORE. + * After one period we halve the scores. + * + * Here we calculate how many periods passed after the last time we did score reduction + * If the latest score reduction was in this period this number should be 0, + * if it was in the last period this number should be 1 and so on. + */ + int periodCountAfterLastScoreReduction = (periodStart - + tenantStats->lastScoreReduction + + CitusStatsTenantsPeriod - 1) / + CitusStatsTenantsPeriod; + + /* + * This should not happen but let's make sure + */ + if (periodCountAfterLastScoreReduction < 0) + { + periodCountAfterLastScoreReduction = 0; + } + + /* + * If the last score reduction was not in this period we do score reduction now. + */ + if (periodCountAfterLastScoreReduction > 0) + { + tenantStats->score >>= periodCountAfterLastScoreReduction; + tenantStats->lastScoreReduction = queryTime; + } +} + + +/* + * EvictTenantsIfNecessary sorts and evicts the tenants if the tenant count is more than or + * equal to 3 * CitusStatsTenantsLimit. + */ +static void +EvictTenantsIfNecessary(time_t queryTime) +{ + MultiTenantMonitor *monitor = GetMultiTenantMonitor(); + + /* + * We keep up to CitusStatsTenantsLimit * 3 tenants instead of CitusStatsTenantsLimit, + * so we don't lose data immediately after a tenant is out of top CitusStatsTenantsLimit + * + * Every time tenant count hits CitusStatsTenantsLimit * 3, we reduce it back to CitusStatsTenantsLimit * 2. + */ + if (monitor->tenantCount >= CitusStatsTenantsLimit * 3) + { + for (int tenantIndex = 0; tenantIndex < monitor->tenantCount; tenantIndex++) + { + ReduceScoreIfNecessary(&monitor->tenants[tenantIndex], queryTime); + } + SafeQsort(monitor->tenants, monitor->tenantCount, sizeof(TenantStats), + CompareTenantScore); + monitor->tenantCount = CitusStatsTenantsLimit * 2; + } +} + + +/* + * RecordTenantStats records the query statistics for the tenant. + */ +static void +RecordTenantStats(TenantStats *tenantStats) +{ + if (tenantStats->score < LLONG_MAX - ONE_QUERY_SCORE) + { + tenantStats->score += ONE_QUERY_SCORE; + } + else + { + tenantStats->score = LLONG_MAX; + } + + if (attributeCommandType == CMD_SELECT) + { + tenantStats->readsInThisPeriod++; + } + else if (attributeCommandType == CMD_UPDATE || + attributeCommandType == CMD_INSERT || + attributeCommandType == CMD_DELETE) + { + tenantStats->writesInThisPeriod++; + } +} + + +/* + * CreateMultiTenantMonitor creates the data structure for multi tenant monitor. + */ +static void +CreateMultiTenantMonitor() +{ + MultiTenantMonitor *monitor = CreateSharedMemoryForMultiTenantMonitor(); + monitor->tenantCount = 0; +} + + +/* + * CreateSharedMemoryForMultiTenantMonitor creates a dynamic shared memory segment for multi tenant monitor. + */ +static MultiTenantMonitor * +CreateSharedMemoryForMultiTenantMonitor() +{ + bool found = false; + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); + if (found) + { + return monitor; + } + + monitor->namedLockTranche.trancheId = LWLockNewTrancheId(); + monitor->namedLockTranche.trancheName = monitorTrancheName; + + LWLockRegisterTranche(monitor->namedLockTranche.trancheId, + monitor->namedLockTranche.trancheName); + LWLockInitialize(&monitor->lock, monitor->namedLockTranche.trancheId); + + return monitor; +} + + +/* + * GetMultiTenantMonitor returns the data structure for multi tenant monitor. + */ +static MultiTenantMonitor * +GetMultiTenantMonitor() +{ + bool found = false; + MultiTenantMonitor *monitor = ShmemInitStruct(SharedMemoryNameForMultiTenantMonitor, + MultiTenantMonitorshmemSize(), + &found); + + if (!found) + { + elog(WARNING, "monitor not found"); + return NULL; + } + + return monitor; +} + + +/* + * InitializeMultiTenantMonitorSMHandleManagement sets up the shared memory startup hook + * so that the multi tenant monitor can be initialized and stored in shared memory. + */ +void +InitializeMultiTenantMonitorSMHandleManagement() +{ + prev_shmem_startup_hook = shmem_startup_hook; + shmem_startup_hook = MultiTenantMonitorSMInit; +} + + +/* + * MultiTenantMonitorSMInit initializes the shared memory for MultiTenantMonitorSMData. + */ +static void +MultiTenantMonitorSMInit() +{ + CreateMultiTenantMonitor(); + + if (prev_shmem_startup_hook != NULL) + { + prev_shmem_startup_hook(); + } +} + + +/* + * CreateTenantStats creates the data structure for a tenant's statistics. + */ +static int +CreateTenantStats(MultiTenantMonitor *monitor, time_t queryTime) +{ + /* + * If the tenant count reached 3 * CitusStatsTenantsLimit, we evict the tenants + * with the lowest score. + */ + EvictTenantsIfNecessary(queryTime); + + int tenantIndex = monitor->tenantCount; + + memset(&monitor->tenants[tenantIndex], 0, sizeof(monitor->tenants[tenantIndex])); + + strcpy_s(monitor->tenants[tenantIndex].tenantAttribute, + sizeof(monitor->tenants[tenantIndex].tenantAttribute), attributeToTenant); + monitor->tenants[tenantIndex].colocationGroupId = colocationGroupId; + + monitor->tenants[tenantIndex].namedLockTranche.trancheId = LWLockNewTrancheId(); + monitor->tenants[tenantIndex].namedLockTranche.trancheName = tenantTrancheName; + + LWLockRegisterTranche(monitor->tenants[tenantIndex].namedLockTranche.trancheId, + monitor->tenants[tenantIndex].namedLockTranche.trancheName); + LWLockInitialize(&monitor->tenants[tenantIndex].lock, + monitor->tenants[tenantIndex].namedLockTranche.trancheId); + + monitor->tenantCount++; + + return tenantIndex; +} + + +/* + * FindTenantStats finds the index for the current tenant's statistics. + */ +static int +FindTenantStats(MultiTenantMonitor *monitor) +{ + for (int i = 0; i < monitor->tenantCount; i++) + { + TenantStats *tenantStats = &monitor->tenants[i]; + if (strcmp(tenantStats->tenantAttribute, attributeToTenant) == 0 && + tenantStats->colocationGroupId == colocationGroupId) + { + return i; + } + } + + return -1; +} + + +/* + * MultiTenantMonitorshmemSize calculates the size of the multi tenant monitor using + * CitusStatsTenantsLimit parameter. + */ +static size_t +MultiTenantMonitorshmemSize(void) +{ + Size size = sizeof(MultiTenantMonitor); + size = add_size(size, mul_size(sizeof(TenantStats), CitusStatsTenantsLimit * 3)); + + return size; +} diff --git a/src/include/distributed/citus_custom_scan.h b/src/include/distributed/citus_custom_scan.h index 92301fceb..f31138ac2 100644 --- a/src/include/distributed/citus_custom_scan.h +++ b/src/include/distributed/citus_custom_scan.h @@ -46,4 +46,6 @@ extern CustomScan * FetchCitusCustomScanIfExists(Plan *plan); extern bool IsCitusPlan(Plan *plan); extern bool IsCitusCustomScan(Plan *plan); +extern void SetJobColocationId(Job *job); + #endif /* CITUS_CUSTOM_SCAN_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index d6ad4c248..49fe28f1d 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -330,6 +330,9 @@ typedef struct Task * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ bool cannotBeExecutedInTransction; + + char *partitionColumn; + int colocationId; } Task; diff --git a/src/include/distributed/utils/attribute.h b/src/include/distributed/utils/attribute.h new file mode 100644 index 000000000..b4d8bb607 --- /dev/null +++ b/src/include/distributed/utils/attribute.h @@ -0,0 +1,102 @@ +/*------------------------------------------------------------------------- + * + * attribute.h + * Routines related to the multi tenant monitor. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_ATTRIBUTE_H +#define CITUS_ATTRIBUTE_H + +#include "executor/execdesc.h" +#include "executor/executor.h" +#include "storage/lwlock.h" + +#define MAX_TENANT_ATTRIBUTE_LENGTH 100 + +/* + * TenantStats is the struct that keeps statistics about one tenant. + */ +typedef struct TenantStats +{ + /* + * The attribute value, e.g distribution column, and colocation group id + * of the tenant. + */ + char tenantAttribute[MAX_TENANT_ATTRIBUTE_LENGTH]; + int colocationGroupId; + + /* + * Number of SELECT queries this tenant ran in this and last periods. + */ + int readsInLastPeriod; + int readsInThisPeriod; + + /* + * Number of INSERT, UPDATE, and DELETE queries this tenant ran in this and last periods. + */ + int writesInLastPeriod; + int writesInThisPeriod; + + /* + * The latest time this tenant ran a query. This value is used to update the score later. + */ + time_t lastQueryTime; + + /* + * The tenant monitoring score of this tenant. This value is increased by ONE_QUERY_SCORE at every query + * and halved after every period. + */ + long long score; + + /* + * The latest time the score of this tenant is halved. This value is used to correctly calculate the reduction later. + */ + time_t lastScoreReduction; + + /* + * Locks needed to update this tenant's statistics. + */ + NamedLWLockTranche namedLockTranche; + LWLock lock; +} TenantStats; + +/* + * MultiTenantMonitor is the struct for keeping the statistics + * of the tenants + */ +typedef struct MultiTenantMonitor +{ + /* + * Lock mechanism for the monitor. + * Each tenant update acquires the lock in shared mode and + * the tenant number reduction and monitor view acquires in exclusive mode. + */ + NamedLWLockTranche namedLockTranche; + LWLock lock; + + /* + * tenantCount is the number of items in the tenants array. + * The total length of tenants array is set up at CreateSharedMemoryForMultiTenantMonitor + * and is 3 * citus.stats_tenants_limit + */ + int tenantCount; + TenantStats tenants[FLEXIBLE_ARRAY_MEMBER]; +} MultiTenantMonitor; + + +extern void CitusAttributeToEnd(QueryDesc *queryDesc); +extern void AttributeQueryIfAnnotated(const char *queryString, CmdType commandType); +extern char * AnnotateQuery(char *queryString, char *partitionColumn, int colocationId); +extern void InitializeMultiTenantMonitorSMHandleManagement(void); + +extern ExecutorEnd_hook_type prev_ExecutorEnd; + +extern int MultiTenantMonitoringLogLevel; +extern int CitusStatsTenantsPeriod; +extern int CitusStatsTenantsLimit; + +#endif /*CITUS_ATTRIBUTE_H */ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 2ebb31f47..33a35f286 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -307,3 +307,5 @@ s/(NOTICE: issuing SET LOCAL application_name TO 'citus_rebalancer gpid=)[0-9]+ # shard_rebalancer output, flaky improvement number s/improvement of 0.1[0-9]* is lower/improvement of 0.1xxxxx is lower/g + +s/\/\* attributeTo.*\*\///g diff --git a/src/test/regress/expected/citus_stats_tenants.out b/src/test/regress/expected/citus_stats_tenants.out new file mode 100644 index 000000000..dbd525aab --- /dev/null +++ b/src/test/regress/expected/citus_stats_tenants.out @@ -0,0 +1,290 @@ +CREATE SCHEMA citus_stats_tenants; +SET search_path TO citus_stats_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; +CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() +RETURNS VOID +LANGUAGE C +AS 'citus', $$clean_citus_stats_tenants$$; +CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() +RETURNS VOID +LANGUAGE C +AS 'citus', $$sleep_until_next_period$$; +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +-- set period to a high number to prevent stats from being reset +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000'); + result +--------------------------------------------------------------------- + ALTER SYSTEM + ALTER SYSTEM + ALTER SYSTEM +(3 rows) + +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + result +--------------------------------------------------------------------- + t + t + t +(3 rows) + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 0 | 1 | 0 + 5 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 2 | 0 | 0 | 1 | 0 + 3 | 0 | 0 | 1 | 0 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- +(0 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1'; + tenant_attribute | query_count_in_this_period +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +-- test scoring +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + abcd | 1 | 1000000000 +(4 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + 4 | 1 | 1000000000 + bcde | 1 | 1000000000 + cdef | 1 | 1000000000 +(6 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; + tenant_attribute | query_count_in_this_period | score +--------------------------------------------------------------------- + abcd | 3 | 3000000000 + bcde | 3 | 3000000000 + 2 | 1 | 1000000000 + 3 | 1 | 1000000000 + defg | 1 | 1000000000 +(5 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +-- test period passing +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + result +--------------------------------------------------------------------- + + + +(3 rows) + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +INSERT INTO dist_tbl VALUES (5, 'abcd'); +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 1 | 0 | 1 | 0 + 5 | 0 | 0 | 1 | 0 +(2 rows) + +-- simulate passing the period +SET citus.stats_tenants_period TO 2; +SELECT sleep_until_next_period(); + sleep_until_next_period +--------------------------------------------------------------------- + +(1 row) + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + tenant_attribute | read_count_in_this_period | read_count_in_last_period | query_count_in_this_period | query_count_in_last_period +--------------------------------------------------------------------- + 1 | 0 | 1 | 0 | 1 + 5 | 0 | 0 | 0 | 1 +(2 rows) + +\c - - - :master_port +SET search_path TO citus_stats_tenants; +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stats_tenants CASCADE; diff --git a/src/test/regress/expected/failure_multi_dml.out b/src/test/regress/expected/failure_multi_dml.out index 7ca8a8f91..bbea2c999 100644 --- a/src/test/regress/expected/failure_multi_dml.out +++ b/src/test/regress/expected/failure_multi_dml.out @@ -25,7 +25,7 @@ SELECT citus.clear_network_traffic(); ---- test multiple statements spanning multiple shards, ---- at each significant point. These transactions are 2pc -- fail at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -54,7 +54,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -83,7 +83,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- fail at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -110,7 +110,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -137,7 +137,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- fail at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -163,7 +163,7 @@ SELECT * FROM dml_test ORDER BY id ASC; (4 rows) -- cancel at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_multi_row_insert.out b/src/test/regress/expected/failure_multi_row_insert.out index 8948be94e..f3cd4919a 100644 --- a/src/test/regress/expected/failure_multi_row_insert.out +++ b/src/test/regress/expected/failure_multi_row_insert.out @@ -36,7 +36,7 @@ SELECT create_reference_table('reference_table'); -- (d) multi-row INSERT that hits multiple shards in multiple workers -- (e) multi-row INSERT to a reference table -- Failure and cancellation on multi-row INSERT that hits the same shard with the same value -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_ref_tables.out b/src/test/regress/expected/failure_ref_tables.out index 6485691af..4984cc1bf 100644 --- a/src/test/regress/expected/failure_ref_tables.out +++ b/src/test/regress/expected/failure_ref_tables.out @@ -26,7 +26,7 @@ SELECT COUNT(*) FROM ref_table; (1 row) -- verify behavior of single INSERT; should fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -41,7 +41,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=5; (1 row) -- verify behavior of UPDATE ... RETURNING; should not execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -56,7 +56,7 @@ SELECT COUNT(*) FROM ref_table WHERE key=7; (1 row) -- verify fix to #2214; should raise error and fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_replicated_partitions.out b/src/test/regress/expected/failure_replicated_partitions.out index 4ae2d604c..7294df98b 100644 --- a/src/test/regress/expected/failure_replicated_partitions.out +++ b/src/test/regress/expected/failure_replicated_partitions.out @@ -21,7 +21,7 @@ CREATE TABLE partitioned_table_0 PARTITION OF partitioned_table (dist_key, partition_id) FOR VALUES IN ( 0 ); INSERT INTO partitioned_table VALUES (0, 0); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_single_mod.out b/src/test/regress/expected/failure_single_mod.out index 54db33ff6..2a6ed2d77 100644 --- a/src/test/regress/expected/failure_single_mod.out +++ b/src/test/regress/expected/failure_single_mod.out @@ -20,7 +20,7 @@ SELECT create_distributed_table('mod_test', 'key'); (1 row) -- verify behavior of single INSERT; should mark shard as failed -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); mitmproxy --------------------------------------------------------------------- @@ -52,7 +52,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) INSERT INTO mod_test VALUES (2, 6); -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- @@ -78,7 +78,7 @@ WHERE shardid IN ( TRUNCATE mod_test; -- verify behavior of multi-statement modifications to a single shard -- should fail the transaction and never mark placements inactive -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index 5d17cc4ad..1b60f3125 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -23,7 +23,7 @@ SELECT create_distributed_table('select_test', 'key'); -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); mitmproxy --------------------------------------------------------------------- @@ -45,7 +45,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin -- kill after first SELECT; txn should fail as INSERT triggers -- 2PC (and placementis not marked bad) -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); mitmproxy --------------------------------------------------------------------- @@ -66,7 +66,7 @@ TRUNCATE select_test; -- now the same tests with query cancellation -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -77,7 +77,7 @@ ERROR: canceling statement due to user request SELECT * FROM select_test WHERE key = 3; ERROR: canceling statement due to user request -- cancel after first SELECT; txn should fail and nothing should be marked as invalid -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -107,7 +107,7 @@ SELECT citus.mitmproxy('conn.allow()'); TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- @@ -126,7 +126,7 @@ SELECT * FROM select_test WHERE key = 3; ERROR: canceling statement due to user request COMMIT; -- error after second SELECT; txn should fails the transaction -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()'); mitmproxy --------------------------------------------------------------------- @@ -144,7 +144,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open COMMIT; -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); mitmproxy --------------------------------------------------------------------- @@ -173,7 +173,7 @@ SELECT create_distributed_table('select_test', 'key'); SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()'); mitmproxy --------------------------------------------------------------------- @@ -188,7 +188,7 @@ SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1; ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open -- now the same test with query cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index ead2a5b85..2bb0c29e7 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1360,9 +1360,11 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.3-1 ALTER EXTENSION citus UPDATE TO '11.3-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- -(0 rows) + | function citus_stats_tenants(boolean) SETOF record + | view citus_stats_tenants +(2 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 7cd2f63c8..8005c0c42 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -121,6 +121,7 @@ ORDER BY 1; function citus_stat_activity() function citus_stat_statements() function citus_stat_statements_reset() + function citus_stats_tenants(boolean) function citus_table_is_visible(oid) function citus_table_size(regclass) function citus_task_wait(bigint,citus_task_status) @@ -316,7 +317,8 @@ ORDER BY 1; view citus_shards_on_worker view citus_stat_activity view citus_stat_statements + view citus_stats_tenants view pg_dist_shard_placement view time_partitions -(310 rows) +(312 rows) diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index ee81bde38..4091b7a63 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -102,6 +102,11 @@ test: pg13_propagate_statistics # ---------- test: citus_update_table_statistics +# ---------- +# Test for tenant statistics +# ---------- +test: citus_stats_tenants + # ---------- # Parallel TPC-H tests to check our distributed execution behavior # ---------- diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index af594c1d4..f4e85ab61 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -487,6 +487,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.allow_unsafe_locks_from_workers=on"); push(@pgOptions, "citus.stat_statements_track = 'all'"); +push(@pgOptions, "citus.stats_tenants_limit = 2"); # Some tests look at shards in pg_class, make sure we can usually see them: push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); diff --git a/src/test/regress/sql/citus_stats_tenants.sql b/src/test/regress/sql/citus_stats_tenants.sql new file mode 100644 index 000000000..056f1902b --- /dev/null +++ b/src/test/regress/sql/citus_stats_tenants.sql @@ -0,0 +1,118 @@ +CREATE SCHEMA citus_stats_tenants; +SET search_path TO citus_stats_tenants; +SET citus.next_shard_id TO 5797500; +SET citus.shard_replication_factor TO 1; + +CREATE OR REPLACE FUNCTION pg_catalog.clean_citus_stats_tenants() +RETURNS VOID +LANGUAGE C +AS 'citus', $$clean_citus_stats_tenants$$; + +CREATE OR REPLACE FUNCTION pg_catalog.sleep_until_next_period() +RETURNS VOID +LANGUAGE C +AS 'citus', $$sleep_until_next_period$$; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + +-- set period to a high number to prevent stats from being reset +SELECT result FROM run_command_on_all_nodes('ALTER SYSTEM SET citus.stats_tenants_period TO 1000000000'); +SELECT result FROM run_command_on_all_nodes('SELECT pg_reload_conf()'); + +CREATE TABLE dist_tbl (a INT, b TEXT); +SELECT create_distributed_table('dist_tbl', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE dist_tbl_2 (a INT, b INT); +SELECT create_distributed_table('dist_tbl_2', 'a', colocate_with:='dist_tbl'); + +CREATE TABLE dist_tbl_text (a TEXT, b INT); +SELECT create_distributed_table('dist_tbl_text', 'a', shard_count:=4, colocate_with:='none'); + +CREATE TABLE ref_tbl (a INT, b INT); +SELECT create_reference_table('ref_tbl'); + +INSERT INTO dist_tbl VALUES (1, 'abcd'); +INSERT INTO dist_tbl VALUES (2, 'abcd'); +UPDATE dist_tbl SET b = a + 1 WHERE a = 3; +UPDATE dist_tbl SET b = a + 1 WHERE a = 4; +DELETE FROM dist_tbl WHERE a = 5; + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :worker_2_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + +-- queries with multiple tenants should not be counted +SELECT count(*)>=0 FROM dist_tbl WHERE a IN (1, 5); + +-- queries with reference tables should not be counted +SELECT count(*)>=0 FROM ref_tbl WHERE a = 1; + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +-- queries with multiple tables but one tenant should be counted +SELECT count(*)>=0 FROM dist_tbl, dist_tbl_2 WHERE dist_tbl.a = 1 AND dist_tbl_2.a = 1; +SELECT count(*)>=0 FROM dist_tbl JOIN dist_tbl_2 ON dist_tbl.a = dist_tbl_2.a WHERE dist_tbl.a = 1; + +\c - - - :worker_1_port +SELECT tenant_attribute, query_count_in_this_period FROM citus_stats_tenants WHERE tenant_attribute = '1'; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +-- test scoring +-- all of these distribution column values are from second worker +SELECT count(*)>=0 FROM dist_tbl WHERE a = 2; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 3; +SELECT count(*)>=0 FROM dist_tbl WHERE a = 4; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'abcd'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'cdef'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'bcde'; +SELECT count(*)>=0 FROM dist_tbl_text WHERE a = 'defg'; + +\c - - - :worker_2_port +SELECT tenant_attribute, query_count_in_this_period, score FROM citus_stats_tenants(true) ORDER BY score DESC; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +-- test period passing +SELECT result FROM run_command_on_all_nodes('SELECT clean_citus_stats_tenants()'); + +SELECT count(*)>=0 FROM dist_tbl WHERE a = 1; +INSERT INTO dist_tbl VALUES (5, 'abcd'); + +\c - - - :worker_1_port +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; + +-- simulate passing the period +SET citus.stats_tenants_period TO 2; +SELECT sleep_until_next_period(); + +SELECT tenant_attribute, read_count_in_this_period, read_count_in_last_period, query_count_in_this_period, query_count_in_last_period FROM citus_stats_tenants ORDER BY tenant_attribute; +\c - - - :master_port +SET search_path TO citus_stats_tenants; + +SET client_min_messages TO ERROR; +DROP SCHEMA citus_stats_tenants CASCADE; diff --git a/src/test/regress/sql/failure_multi_dml.sql b/src/test/regress/sql/failure_multi_dml.sql index 390c01461..f62ede4d5 100644 --- a/src/test/regress/sql/failure_multi_dml.sql +++ b/src/test/regress/sql/failure_multi_dml.sql @@ -21,7 +21,7 @@ SELECT citus.clear_network_traffic(); ---- at each significant point. These transactions are 2pc -- fail at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -35,7 +35,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at DELETE -SELECT citus.mitmproxy('conn.onQuery(query="^DELETE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="DELETE").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -49,7 +49,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- fail at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -63,7 +63,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at INSERT -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -77,7 +77,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- fail at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; DELETE FROM dml_test WHERE id = 1; @@ -91,7 +91,7 @@ COMMIT; SELECT * FROM dml_test ORDER BY id ASC; -- cancel at UPDATE -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").cancel(' || pg_backend_pid() || ')'); BEGIN; DELETE FROM dml_test WHERE id = 1; diff --git a/src/test/regress/sql/failure_multi_row_insert.sql b/src/test/regress/sql/failure_multi_row_insert.sql index 53ab8a84d..cfc98f719 100644 --- a/src/test/regress/sql/failure_multi_row_insert.sql +++ b/src/test/regress/sql/failure_multi_row_insert.sql @@ -30,7 +30,7 @@ SELECT create_reference_table('reference_table'); -- Failure and cancellation on multi-row INSERT that hits the same shard with the same value -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO distributed_table VALUES (1,1), (1,2), (1,3); -- this test is broken, see https://github.com/citusdata/citus/issues/2460 diff --git a/src/test/regress/sql/failure_ref_tables.sql b/src/test/regress/sql/failure_ref_tables.sql index 0088a375e..29b90dc22 100644 --- a/src/test/regress/sql/failure_ref_tables.sql +++ b/src/test/regress/sql/failure_ref_tables.sql @@ -17,19 +17,19 @@ SELECT citus.clear_network_traffic(); SELECT COUNT(*) FROM ref_table; -- verify behavior of single INSERT; should fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO ref_table VALUES (5, 6); SELECT COUNT(*) FROM ref_table WHERE key=5; -- verify behavior of UPDATE ... RETURNING; should not execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); UPDATE ref_table SET key=7 RETURNING value; SELECT COUNT(*) FROM ref_table WHERE key=7; -- verify fix to #2214; should raise error and fail to execute -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; DELETE FROM ref_table WHERE key=5; diff --git a/src/test/regress/sql/failure_replicated_partitions.sql b/src/test/regress/sql/failure_replicated_partitions.sql index 1ea79fc83..fbe6ec7a0 100644 --- a/src/test/regress/sql/failure_replicated_partitions.sql +++ b/src/test/regress/sql/failure_replicated_partitions.sql @@ -19,7 +19,7 @@ CREATE TABLE partitioned_table_0 INSERT INTO partitioned_table VALUES (0, 0); -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO partitioned_table VALUES (0, 0); diff --git a/src/test/regress/sql/failure_single_mod.sql b/src/test/regress/sql/failure_single_mod.sql index e4dfc8f9f..48fdddcc6 100644 --- a/src/test/regress/sql/failure_single_mod.sql +++ b/src/test/regress/sql/failure_single_mod.sql @@ -8,7 +8,7 @@ CREATE TABLE mod_test (key int, value text); SELECT create_distributed_table('mod_test', 'key'); -- verify behavior of single INSERT; should mark shard as failed -SELECT citus.mitmproxy('conn.onQuery(query="^INSERT").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="INSERT").kill()'); INSERT INTO mod_test VALUES (2, 6); SELECT COUNT(*) FROM mod_test WHERE key=2; @@ -24,7 +24,7 @@ TRUNCATE mod_test; SELECT citus.mitmproxy('conn.allow()'); INSERT INTO mod_test VALUES (2, 6); -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); UPDATE mod_test SET value='ok' WHERE key=2 RETURNING key; SELECT COUNT(*) FROM mod_test WHERE value='ok'; @@ -38,7 +38,7 @@ TRUNCATE mod_test; -- verify behavior of multi-statement modifications to a single shard -- should fail the transaction and never mark placements inactive -SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="UPDATE").kill()'); BEGIN; INSERT INTO mod_test VALUES (2, 6); diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index 8dfb33d3e..c8218c950 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -13,13 +13,13 @@ SELECT create_distributed_table('select_test', 'key'); -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3; -- kill after first SELECT; txn should fail as INSERT triggers -- 2PC (and placementis not marked bad) -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").kill()'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -35,12 +35,12 @@ TRUNCATE select_test; -- put data in shard for which mitm node is first placement INSERT INTO select_test VALUES (3, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); SELECT * FROM select_test WHERE key = 3; SELECT * FROM select_test WHERE key = 3; -- cancel after first SELECT; txn should fail and nothing should be marked as invalid -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").cancel(' || pg_backend_pid() || ')'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -58,7 +58,7 @@ TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -68,7 +68,7 @@ SELECT * FROM select_test WHERE key = 3; COMMIT; -- error after second SELECT; txn should fails the transaction -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).reset()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).reset()'); BEGIN; INSERT INTO select_test VALUES (3, 'more data'); @@ -77,7 +77,7 @@ INSERT INTO select_test VALUES (3, 'even more data'); SELECT * FROM select_test WHERE key = 3; COMMIT; -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*pg_prepared_xacts").after(2).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*pg_prepared_xacts").after(2).kill()'); SELECT recover_prepared_transactions(); SELECT recover_prepared_transactions(); @@ -93,12 +93,12 @@ SELECT create_distributed_table('select_test', 'key'); SET citus.max_cached_conns_per_worker TO 1; -- allow connection to be cached INSERT INTO select_test VALUES (1, 'test data'); -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).kill()'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).kill()'); SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1; -- now the same test with query cancellation -SELECT citus.mitmproxy('conn.onQuery(query="^SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT citus.mitmproxy('conn.onQuery(query="SELECT.*select_test").after(1).cancel(' || pg_backend_pid() || ')'); SELECT * FROM select_test WHERE key = 1; SELECT * FROM select_test WHERE key = 1;