diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 04d8123e6..391432b32 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -325,6 +325,13 @@ CitusMaintenanceDaemonMain(Datum main_arg) if (statsCollectionSuccess) { + /* + * Checking for updates only when collecting statistics succeeds, + * so we don't log update messages twice when we retry statistics + * collection in a minute. + */ + CheckForUpdates(); + nextStatsCollectionTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), STATS_COLLECTION_TIMEOUT_MILLIS); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index 55881c584..9577a534c 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -34,6 +34,7 @@ PG_FUNCTION_INFO_V1(citus_server_id); #include "distributed/statistics_collection.h" #include "distributed/worker_manager.h" #include "lib/stringinfo.h" +#include "utils/builtins.h" #include "utils/json.h" #include "utils/jsonb.h" @@ -41,10 +42,19 @@ PG_FUNCTION_INFO_V1(citus_server_id); #include "utils/fmgrprotos.h" #endif +static size_t CheckForUpdatesCallback(char *contents, size_t size, size_t count, + void *userData); +static bool JsonbFieldInt32(Jsonb *jsonb, const char *fieldName, int32 *result); +static bool JsonbFieldStr(Jsonb *jsonb, const char *fieldName, StringInfo result); static uint64 NextPow2(uint64 n); static uint64 DistributedTablesSize(List *distTableOids); -static bool SendHttpPostJsonRequest(const char *url, const char *postFields, long - timeoutSeconds); +static bool UrlEncode(StringInfo buf, const char *str); +static bool SendHttpPostJsonRequest(const char *url, const char *postFields, + long timeoutSeconds); +static bool SendHttpGetJsonRequest(const char *url, long timeoutSeconds, + curl_write_callback responseCallback); +static bool PerformHttpRequest(CURL *curl); + /* WarnIfSyncDNS warns if libcurl is compiled with synchronous DNS. */ void @@ -139,6 +149,174 @@ CollectBasicUsageStatistics(void) } +/* CheckForUpdates queries Citus servers for newer releases of Citus. */ +void +CheckForUpdates(void) +{ + StringInfo url = makeStringInfo(); + appendStringInfoString(url, STATS_COLLECTION_HOST "/v1/releases/latest?edition="); + + if (!UrlEncode(url, CITUS_EDITION)) + { + ereport(WARNING, (errmsg("url encoding '%s' failed", CITUS_EDITION))); + return; + } + + if (!SendHttpGetJsonRequest(url->data, HTTP_TIMEOUT_SECONDS, + &CheckForUpdatesCallback)) + { + ereport(WARNING, (errmsg("checking for updates failed"))); + } +} + + +/* + * CheckForUpdatesCallback receives the response for the request sent by + * CheckForUpdates(). It processes the response, and if there is a newer release + * of Citus available, logs a LOG message. This function returns 0 if there are + * any errors in the received response, which means we didn't consume the data. + * Otherwise, it returns (size * count) which means we consumed all of the data. + */ +static size_t +CheckForUpdatesCallback(char *contents, size_t size, size_t count, void *userData) +{ + const int32 citusVersionMajor = CITUS_VERSION_NUM / 10000; + const int32 citusVersionMinor = (CITUS_VERSION_NUM / 100) % 100; + const int32 citusVersionPatch = CITUS_VERSION_NUM % 100; + Jsonb *responseJsonb = NULL; + StringInfo releaseVersion = makeStringInfo(); + int32 releaseMajor = 0; + int32 releaseMinor = 0; + int32 releasePatch = 0; + char *updateType = NULL; + MemoryContext savedContext = CurrentMemoryContext; + + StringInfo responseNullTerminated = makeStringInfo(); + appendBinaryStringInfo(responseNullTerminated, contents, size * count); + + /* jsonb_in can throw errors */ + PG_TRY(); + { + Datum responseCStringDatum = CStringGetDatum(responseNullTerminated->data); + Datum responseJasonbDatum = DirectFunctionCall1(jsonb_in, responseCStringDatum); + responseJsonb = DatumGetJsonb(responseJasonbDatum); + } + PG_CATCH(); + { + MemoryContextSwitchTo(savedContext); + FlushErrorState(); + responseJsonb = NULL; + } + PG_END_TRY(); + + /* + * Returning here instead of in PG_CATCH() because PG_END_TRY() resets + * couple of global variables. + */ + if (responseJsonb == NULL) + { + return 0; + } + + if (!JsonbFieldStr(responseJsonb, "version", releaseVersion) || + !JsonbFieldInt32(responseJsonb, "major", &releaseMajor) || + !JsonbFieldInt32(responseJsonb, "minor", &releaseMinor) || + !JsonbFieldInt32(responseJsonb, "patch", &releasePatch)) + { + return 0; + } + + if ((releaseMajor > citusVersionMajor) || + (releaseMajor == citusVersionMajor && releaseMinor > citusVersionMinor)) + { + updateType = "major"; + } + else if (releaseMajor == citusVersionMajor && + releaseMinor == citusVersionMinor && + releasePatch > citusVersionPatch) + { + updateType = "patch"; + } + + if (updateType != NULL) + { + ereport(LOG, (errmsg("a new %s release of Citus (%s) is available", + updateType, releaseVersion->data))); + } + + return size * count; +} + + +/* + * JsonbFieldInt32 sets the given output variable to the int32 value of the given + * field in the given JSONB object. If the field doesn't exist or its value is + * not an integer that fits in 32-bits, this function returns false. + */ +static bool +JsonbFieldInt32(Jsonb *jsonb, const char *fieldName, int32 *result) +{ + MemoryContext savedContext = CurrentMemoryContext; + bool success = false; + JsonbValue *fieldValue = NULL; + JsonbValue key; + memset(&key, 0, sizeof(key)); + key.type = jbvString; + key.val.string.len = strlen(fieldName); + key.val.string.val = (char *) fieldName; + + fieldValue = findJsonbValueFromContainer(&(jsonb->root), JB_FOBJECT, &key); + if (fieldValue == NULL || fieldValue->type != jbvNumeric) + { + return false; + } + + /* numeric_int4 can throw errors */ + PG_TRY(); + { + Datum resultNumericDatum = NumericGetDatum(fieldValue->val.numeric); + *result = DatumGetInt32(DirectFunctionCall1(numeric_int4, resultNumericDatum)); + success = true; + } + PG_CATCH(); + { + MemoryContextSwitchTo(savedContext); + FlushErrorState(); + success = false; + } + PG_END_TRY(); + + return success; +} + + +/* + * JsonbFieldStr appends string value of the given field in the given JSONB + * object to the given string buffer. If the field doesn't exist or its value is + * not string, this function returns false. Otherwise it returns true. + */ +static bool +JsonbFieldStr(Jsonb *jsonb, const char *fieldName, StringInfo result) +{ + JsonbValue *fieldValue = NULL; + JsonbValue key; + memset(&key, 0, sizeof(key)); + key.type = jbvString; + key.val.string.len = strlen(fieldName); + key.val.string.val = (char *) fieldName; + + fieldValue = findJsonbValueFromContainer(&(jsonb->root), JB_FOBJECT, &key); + if (fieldValue == NULL || fieldValue->type != jbvString) + { + return false; + } + + appendBinaryStringInfo(result, fieldValue->val.string.val, + fieldValue->val.string.len); + return true; +} + + /* * DistributedTablesSize returns total size of data store in the cluster consisting * of given distributed tables. We ignore tables which we cannot get their size. @@ -214,6 +392,36 @@ NextPow2(uint64 n) } +/* + * UrlEncode URL encodes the given string and appends it to the given buffer. + * If either libcurl initialization or encoding fails, returns false. + */ +static bool +UrlEncode(StringInfo buf, const char *str) +{ + bool success = false; + CURL *curl = NULL; + + curl_global_init(CURL_GLOBAL_DEFAULT); + curl = curl_easy_init(); + if (curl) + { + char *urlEncodedStr = curl_easy_escape(curl, str, strlen(str)); + if (urlEncodedStr) + { + appendStringInfoString(buf, urlEncodedStr); + curl_free(urlEncodedStr); + success = true; + } + + curl_easy_cleanup(curl); + } + + curl_global_cleanup(); + return success; +} + + /* * SendHttpPostJsonRequest sends a HTTP/HTTPS POST request to the given URL with * the given json object. @@ -222,7 +430,6 @@ static bool SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSeconds) { bool success = false; - CURLcode curlCode = false; CURL *curl = NULL; curl_global_init(CURL_GLOBAL_DEFAULT); @@ -230,6 +437,7 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond if (curl) { struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Accept: application/json"); headers = curl_slist_append(headers, "Content-Type: application/json"); headers = curl_slist_append(headers, "charsets: utf-8"); @@ -238,27 +446,7 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutSeconds); curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - curlCode = curl_easy_perform(curl); - if (curlCode == CURLE_OK) - { - int64 httpCode = 0; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); - if (httpCode == 200) - { - success = true; - } - else if (httpCode >= 400 && httpCode < 500) - { - ereport(WARNING, (errmsg("HTTP request failed."), - errhint("HTTP response code: " INT64_FORMAT, - httpCode))); - } - } - else - { - ereport(WARNING, (errmsg("Sending HTTP POST request failed."), - errhint("Error code: %s.", curl_easy_strerror(curlCode)))); - } + success = PerformHttpRequest(curl); curl_slist_free_all(headers); curl_easy_cleanup(curl); @@ -270,6 +458,78 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond } +/* + * SendHttpGetJsonRequest sends an HTTP/HTTPS GET request to the given URL, and + * expects a JSON response from server. GET parameters should be added to the url. + * responseCallback is called with the content of response. + */ +static bool +SendHttpGetJsonRequest(const char *url, long timeoutSeconds, + curl_write_callback responseCallback) +{ + bool success = false; + CURL *curl = NULL; + + curl_global_init(CURL_GLOBAL_DEFAULT); + curl = curl_easy_init(); + if (curl) + { + struct curl_slist *headers = NULL; + headers = curl_slist_append(headers, "Accept: application/json"); + + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutSeconds); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, responseCallback); + + success = PerformHttpRequest(curl); + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + } + + curl_global_cleanup(); + + return success; +} + + +/* + * PerformHttpRequest sends the HTTP request with the parameters set in the given + * curl object, and returns if it was successful or not. If the request was not + * successful, it may log some warnings. This method expects to take place after + * curl_easy_init() but before curl_easy_cleanup(). + */ +static bool +PerformHttpRequest(CURL *curl) +{ + bool success = false; + CURLcode curlCode = curl_easy_perform(curl); + if (curlCode == CURLE_OK) + { + int64 httpCode = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode); + if (httpCode == 200) + { + success = true; + } + else if (httpCode >= 400 && httpCode < 500) + { + ereport(WARNING, (errmsg("HTTP request failed."), + errhint("HTTP response code: " INT64_FORMAT, + httpCode))); + } + } + else + { + ereport(WARNING, (errmsg("Sending HTTP request failed."), + errhint("Error code: %s.", curl_easy_strerror(curlCode)))); + } + + return success; +} + + #endif /* HAVE_LIBCURL */ /* diff --git a/src/include/distributed/statistics_collection.h b/src/include/distributed/statistics_collection.h index b148fe47d..265837de3 100644 --- a/src/include/distributed/statistics_collection.h +++ b/src/include/distributed/statistics_collection.h @@ -33,6 +33,7 @@ extern bool EnableStatisticsCollection; extern void WarnIfSyncDNS(void); extern bool CollectBasicUsageStatistics(void); +extern void CheckForUpdates(void); #endif /* HAVE_LIBCURL */