diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 67051ce18..ceba41de9 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -214,8 +214,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) { Oid databaseOid = DatumGetObjectId(main_arg); MaintenanceDaemonDBData *myDbData = NULL; - time_t prevStatsCollection = 0; - bool prevStatsCollectionFailed = false; + TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = GetCurrentTimestamp(); ErrorContextCallback errorCallback; /* @@ -278,30 +277,17 @@ CitusMaintenanceDaemonMain(Datum main_arg) int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; double timeout = 10000.0; /* use this if the deadlock detection is disabled */ bool foundDeadlock = false; - time_t currentTime = time(NULL); - double secondsSincePrevStatsCollection = difftime(currentTime, - prevStatsCollection); - bool citusHasBeenLoaded = false; CHECK_FOR_INTERRUPTS(); - StartTransactionCommand(); - citusHasBeenLoaded = CitusHasBeenLoaded(); - CommitTransactionCommand(); - - if (!citusHasBeenLoaded) - { - continue; - } - /* - * XXX: We clear the metadata cache before every iteration because otherwise - * it might contain stale OIDs. It appears that in some cases invalidation - * messages for a DROP EXTENSION may arrive during deadlock detection and + * XXX: Each task should clear the metadata cache before every iteration + * by calling InvalidateMetadataSystemCache(), because otherwise it + * might contain stale OIDs. It appears that in some cases invalidation + * messages for a DROP EXTENSION may arrive during these tasks and * this causes us to cache a stale pg_dist_node OID. We'd actually expect * all invalidations to arrive after obtaining a lock in LockCitusExtension. */ - InvalidateMetadataSystemCache(); /* * Perform Work. If a specific task needs to be called sooner than @@ -309,35 +295,54 @@ CitusMaintenanceDaemonMain(Datum main_arg) * tasks should do their own time math about whether to re-run checks. */ - if (secondsSincePrevStatsCollection >= STATISTICS_COLLECTION_INTERVAL || - (prevStatsCollectionFailed && - secondsSincePrevStatsCollection >= STATISTICS_COLLECTION_RETRY_INTERVAL)) - { #ifdef HAVE_LIBCURL - if (EnableStatisticsCollection) + if (EnableStatisticsCollection && + GetCurrentTimestamp() >= nextStatsCollectionTime) + { + bool statsCollectionSuccess = false; + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + + /* + * Lock the extension such that it cannot be dropped or created + * concurrently. Skip statistics collection if citus extension is + * not accessible. + * + * Similarly, we skip statistics collection if there exists any + * version mismatch or the extension is not fully created yet. + */ + if (!LockCitusExtension()) { - MemoryContext statsCollectionContext = - AllocSetContextCreate(CurrentMemoryContext, - "StatsCollection", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContext oldContext = - MemoryContextSwitchTo(statsCollectionContext); - - WarnIfSyncDNS(); - prevStatsCollectionFailed = !CollectBasicUsageStatistics(); - - MemoryContextSwitchTo(oldContext); - MemoryContextDelete(statsCollectionContext); - prevStatsCollection = currentTime; + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping statistics collection"))); } -#endif + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + WarnIfSyncDNS(); + statsCollectionSuccess = CollectBasicUsageStatistics(); + } + + if (statsCollectionSuccess) + { + nextStatsCollectionTime = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + STATS_COLLECTION_TIMEOUT_MILLIS); + } + else + { + nextStatsCollectionTime = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), + STATS_COLLECTION_RETRY_TIMEOUT_MILLIS); + } + + CommitTransactionCommand(); } +#endif /* the config value -1 disables the distributed deadlock detection */ if (DistributedDeadlockDetectionTimeoutFactor != -1.0) { + InvalidateMetadataSystemCache(); StartTransactionCommand(); /* diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index e0855ca2e..91a05252a 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -20,6 +20,8 @@ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus */ +PG_FUNCTION_INFO_V1(citus_server_id); + #ifdef HAVE_LIBCURL #include @@ -27,6 +29,8 @@ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus #include "access/xact.h" #include "distributed/metadata_cache.h" +#include "distributed/multi_join_order.h" +#include "distributed/shardinterval_utils.h" #include "distributed/statistics_collection.h" #include "distributed/worker_manager.h" #include "lib/stringinfo.h" @@ -38,7 +42,7 @@ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus #endif static uint64 NextPow2(uint64 n); -static uint64 ClusterSize(List *distributedTableList); +static uint64 DistributedTablesSize(List *distTableOids); static bool SendHttpPostJsonRequest(const char *url, const char *postFields, long timeoutSeconds); @@ -67,36 +71,54 @@ WarnIfSyncDNS(void) bool CollectBasicUsageStatistics(void) { - List *distributedTables = NIL; + List *distTableOids = NIL; uint64 roundedDistTableCount = 0; uint64 roundedClusterSize = 0; uint32 workerNodeCount = 0; StringInfo fields = makeStringInfo(); Datum metadataJsonbDatum = 0; char *metadataJsonbStr = NULL; + MemoryContext savedContext = CurrentMemoryContext; struct utsname unameData; + int unameResult PG_USED_FOR_ASSERTS_ONLY = 0; + bool metadataCollectionFailed = false; memset(&unameData, 0, sizeof(unameData)); - StartTransactionCommand(); + PG_TRY(); + { + distTableOids = DistTableOidList(); + roundedDistTableCount = NextPow2(list_length(distTableOids)); + roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids)); + workerNodeCount = ActivePrimaryNodeCount(); + metadataJsonbDatum = DistNodeMetadata(); + metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, + metadataJsonbDatum)); + } + PG_CATCH(); + { + ErrorData *edata = NULL; + MemoryContextSwitchTo(savedContext); + edata = CopyErrorData(); + FlushErrorState(); + + /* rethrow as WARNING */ + edata->elevel = WARNING; + ThrowErrorData(edata); + metadataCollectionFailed = true; + } + PG_END_TRY(); /* - * If there is a version mismatch between loaded version and available - * version, metadata functions will fail. We return early to avoid crashing. - * This can happen when updating the Citus extension. + * Returning here instead of in PG_CATCH() since PG_END_TRY() resets couple + * of global variables. */ - if (!CheckCitusVersion(LOG_SERVER_ONLY)) + if (metadataCollectionFailed) { - CommitTransactionCommand(); return false; } - distributedTables = DistributedTableList(); - roundedDistTableCount = NextPow2(list_length(distributedTables)); - roundedClusterSize = NextPow2(ClusterSize(distributedTables)); - workerNodeCount = ActivePrimaryNodeCount(); - metadataJsonbDatum = DistNodeMetadata(); - metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out, - metadataJsonbDatum)); - uname(&unameData); + + unameResult = uname(&unameData); + Assert(unameResult == 0); /* uname() always succeeds if we pass valid buffer */ appendStringInfoString(fields, "{\"citus_version\": "); escape_json(fields, CITUS_VERSION); @@ -112,46 +134,42 @@ CollectBasicUsageStatistics(void) appendStringInfo(fields, ",\"node_metadata\": %s", metadataJsonbStr); appendStringInfoString(fields, "}"); - CommitTransactionCommand(); - return SendHttpPostJsonRequest(STATS_COLLECTION_HOST "/v1/usage_reports", fields->data, HTTP_TIMEOUT_SECONDS); } /* - * ClusterSize returns total size of data store in the cluster consisting of - * given distributed tables. We ignore tables which we cannot get their size. + * DistributedTablesSize returns total size of data store in the cluster consisting + * of given distributed tables. We ignore tables which we cannot get their size. */ static uint64 -ClusterSize(List *distributedTableList) +DistributedTablesSize(List *distTableOids) { - uint64 clusterSize = 0; - ListCell *distTableCacheEntryCell = NULL; + uint64 totalSize = 0; + ListCell *distTableOidCell = NULL; - foreach(distTableCacheEntryCell, distributedTableList) + foreach(distTableOidCell, distTableOids) { - DistTableCacheEntry *distTableCacheEntry = lfirst(distTableCacheEntryCell); - Oid relationId = distTableCacheEntry->relationId; - MemoryContext savedContext = CurrentMemoryContext; + Oid relationId = lfirst_oid(distTableOidCell); + Datum tableSizeDatum = 0; - PG_TRY(); + /* + * Ignore hash partitioned tables with size greater than 1, since + * citus_table_size() doesn't work on them. + */ + if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + !SingleReplicatedTable(relationId)) { - Datum distTableSizeDatum = DirectFunctionCall1(citus_table_size, - ObjectIdGetDatum(relationId)); - clusterSize += DatumGetInt64(distTableSizeDatum); + continue; } - PG_CATCH(); - { - FlushErrorState(); - /* citus_table_size() throws an error while the memory context is changed */ - MemoryContextSwitchTo(savedContext); - } - PG_END_TRY(); + tableSizeDatum = DirectFunctionCall1(citus_table_size, + ObjectIdGetDatum(relationId)); + totalSize += DatumGetInt64(tableSizeDatum); } - return clusterSize; + return totalSize; } @@ -242,8 +260,6 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond #endif /* HAVE_LIBCURL */ -PG_FUNCTION_INFO_V1(citus_server_id); - /* * citus_server_id returns a random UUID value as server identifier. This is * modeled after PostgreSQL's pg_random_uuid(). @@ -254,11 +270,14 @@ citus_server_id(PG_FUNCTION_ARGS) uint8 *buf = (uint8 *) palloc(UUID_LEN); #if PG_VERSION_NUM >= 100000 + + /* + * If pg_backend_random() fails, fall-back to using random(). In previous + * versions of postgres we don't have pg_backend_random(), so use it by + * default in that case. + */ if (!pg_backend_random((char *) buf, UUID_LEN)) - { - ereport(ERROR, (errmsg("failed to generate server identifier"))); - } -#else +#endif { int bufIdx = 0; for (bufIdx = 0; bufIdx < UUID_LEN; bufIdx++) @@ -266,7 +285,6 @@ citus_server_id(PG_FUNCTION_ARGS) buf[bufIdx] = (uint8) (random() & 0xFF); } } -#endif /* * Set magic numbers for a "version 4" (pseudorandom) UUID, see diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index da17fe4ea..8f2a308ff 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -13,10 +13,10 @@ #define MAINTENANCED_H /* collect statistics every 24 hours */ -#define STATISTICS_COLLECTION_INTERVAL 86400 +#define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000) /* if statistics collection fails, retry in 1 minute */ -#define STATISTICS_COLLECTION_RETRY_INTERVAL 60 +#define STATS_COLLECTION_RETRY_TIMEOUT_MILLIS (60 * 1000) /* config variable for */ extern double DistributedDeadlockDetectionTimeoutFactor; diff --git a/src/include/distributed/statistics_collection.h b/src/include/distributed/statistics_collection.h index 848227891..b148fe47d 100644 --- a/src/include/distributed/statistics_collection.h +++ b/src/include/distributed/statistics_collection.h @@ -12,6 +12,17 @@ #include "citus_version.h" +/* + * Append USED_WITH_LIBCURL_ONLY to definitions of variables that are only used + * when compiled with libcurl, to avoid compiler warnings about unused variables + * when built without libcurl. + */ +#ifdef HAVE_LIBCURL +#define USED_WITH_LIBCURL_ONLY +#else +#define USED_WITH_LIBCURL_ONLY pg_attribute_unused() +#endif + /* Config variables managed via guc.c */ extern bool EnableStatisticsCollection;