Check for Citus updates.

Sends a request to /v1/releases/latest?flavor=$CITUS_EDITION once a day,
which returns a response similar to {"version": "7.1.0", "major": 7,
"minor": 1, "patch": 0}. Then compares it with current Citus version,
and if the latest release is newer, logs a LOG message.
pull/1751/head
Hadi Moshayedi 2017-10-31 21:23:31 -04:00 committed by Hadi Moshayedi
parent 34f3ec0961
commit 78a2cd9052
3 changed files with 292 additions and 24 deletions

View File

@ -325,6 +325,13 @@ CitusMaintenanceDaemonMain(Datum main_arg)
if (statsCollectionSuccess)
{
/*
* Checking for updates only when collecting statistics succeeds,
* so we don't log update messages twice when we retry statistics
* collection in a minute.
*/
CheckForUpdates();
nextStatsCollectionTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
STATS_COLLECTION_TIMEOUT_MILLIS);

View File

@ -34,6 +34,7 @@ PG_FUNCTION_INFO_V1(citus_server_id);
#include "distributed/statistics_collection.h"
#include "distributed/worker_manager.h"
#include "lib/stringinfo.h"
#include "utils/builtins.h"
#include "utils/json.h"
#include "utils/jsonb.h"
@ -41,10 +42,19 @@ PG_FUNCTION_INFO_V1(citus_server_id);
#include "utils/fmgrprotos.h"
#endif
static size_t CheckForUpdatesCallback(char *contents, size_t size, size_t count,
void *userData);
static bool JsonbFieldInt32(Jsonb *jsonb, const char *fieldName, int32 *result);
static bool JsonbFieldStr(Jsonb *jsonb, const char *fieldName, StringInfo result);
static uint64 NextPow2(uint64 n);
static uint64 DistributedTablesSize(List *distTableOids);
static bool SendHttpPostJsonRequest(const char *url, const char *postFields, long
timeoutSeconds);
static bool UrlEncode(StringInfo buf, const char *str);
static bool SendHttpPostJsonRequest(const char *url, const char *postFields,
long timeoutSeconds);
static bool SendHttpGetJsonRequest(const char *url, long timeoutSeconds,
curl_write_callback responseCallback);
static bool PerformHttpRequest(CURL *curl);
/* WarnIfSyncDNS warns if libcurl is compiled with synchronous DNS. */
void
@ -139,6 +149,174 @@ CollectBasicUsageStatistics(void)
}
/* CheckForUpdates queries Citus servers for newer releases of Citus. */
void
CheckForUpdates(void)
{
StringInfo url = makeStringInfo();
appendStringInfoString(url, STATS_COLLECTION_HOST "/v1/releases/latest?edition=");
if (!UrlEncode(url, CITUS_EDITION))
{
ereport(WARNING, (errmsg("url encoding '%s' failed", CITUS_EDITION)));
return;
}
if (!SendHttpGetJsonRequest(url->data, HTTP_TIMEOUT_SECONDS,
&CheckForUpdatesCallback))
{
ereport(WARNING, (errmsg("checking for updates failed")));
}
}
/*
* CheckForUpdatesCallback receives the response for the request sent by
* CheckForUpdates(). It processes the response, and if there is a newer release
* of Citus available, logs a LOG message. This function returns 0 if there are
* any errors in the received response, which means we didn't consume the data.
* Otherwise, it returns (size * count) which means we consumed all of the data.
*/
static size_t
CheckForUpdatesCallback(char *contents, size_t size, size_t count, void *userData)
{
const int32 citusVersionMajor = CITUS_VERSION_NUM / 10000;
const int32 citusVersionMinor = (CITUS_VERSION_NUM / 100) % 100;
const int32 citusVersionPatch = CITUS_VERSION_NUM % 100;
Jsonb *responseJsonb = NULL;
StringInfo releaseVersion = makeStringInfo();
int32 releaseMajor = 0;
int32 releaseMinor = 0;
int32 releasePatch = 0;
char *updateType = NULL;
MemoryContext savedContext = CurrentMemoryContext;
StringInfo responseNullTerminated = makeStringInfo();
appendBinaryStringInfo(responseNullTerminated, contents, size * count);
/* jsonb_in can throw errors */
PG_TRY();
{
Datum responseCStringDatum = CStringGetDatum(responseNullTerminated->data);
Datum responseJasonbDatum = DirectFunctionCall1(jsonb_in, responseCStringDatum);
responseJsonb = DatumGetJsonb(responseJasonbDatum);
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
FlushErrorState();
responseJsonb = NULL;
}
PG_END_TRY();
/*
* Returning here instead of in PG_CATCH() because PG_END_TRY() resets
* couple of global variables.
*/
if (responseJsonb == NULL)
{
return 0;
}
if (!JsonbFieldStr(responseJsonb, "version", releaseVersion) ||
!JsonbFieldInt32(responseJsonb, "major", &releaseMajor) ||
!JsonbFieldInt32(responseJsonb, "minor", &releaseMinor) ||
!JsonbFieldInt32(responseJsonb, "patch", &releasePatch))
{
return 0;
}
if ((releaseMajor > citusVersionMajor) ||
(releaseMajor == citusVersionMajor && releaseMinor > citusVersionMinor))
{
updateType = "major";
}
else if (releaseMajor == citusVersionMajor &&
releaseMinor == citusVersionMinor &&
releasePatch > citusVersionPatch)
{
updateType = "patch";
}
if (updateType != NULL)
{
ereport(LOG, (errmsg("a new %s release of Citus (%s) is available",
updateType, releaseVersion->data)));
}
return size * count;
}
/*
* JsonbFieldInt32 sets the given output variable to the int32 value of the given
* field in the given JSONB object. If the field doesn't exist or its value is
* not an integer that fits in 32-bits, this function returns false.
*/
static bool
JsonbFieldInt32(Jsonb *jsonb, const char *fieldName, int32 *result)
{
MemoryContext savedContext = CurrentMemoryContext;
bool success = false;
JsonbValue *fieldValue = NULL;
JsonbValue key;
memset(&key, 0, sizeof(key));
key.type = jbvString;
key.val.string.len = strlen(fieldName);
key.val.string.val = (char *) fieldName;
fieldValue = findJsonbValueFromContainer(&(jsonb->root), JB_FOBJECT, &key);
if (fieldValue == NULL || fieldValue->type != jbvNumeric)
{
return false;
}
/* numeric_int4 can throw errors */
PG_TRY();
{
Datum resultNumericDatum = NumericGetDatum(fieldValue->val.numeric);
*result = DatumGetInt32(DirectFunctionCall1(numeric_int4, resultNumericDatum));
success = true;
}
PG_CATCH();
{
MemoryContextSwitchTo(savedContext);
FlushErrorState();
success = false;
}
PG_END_TRY();
return success;
}
/*
* JsonbFieldStr appends string value of the given field in the given JSONB
* object to the given string buffer. If the field doesn't exist or its value is
* not string, this function returns false. Otherwise it returns true.
*/
static bool
JsonbFieldStr(Jsonb *jsonb, const char *fieldName, StringInfo result)
{
JsonbValue *fieldValue = NULL;
JsonbValue key;
memset(&key, 0, sizeof(key));
key.type = jbvString;
key.val.string.len = strlen(fieldName);
key.val.string.val = (char *) fieldName;
fieldValue = findJsonbValueFromContainer(&(jsonb->root), JB_FOBJECT, &key);
if (fieldValue == NULL || fieldValue->type != jbvString)
{
return false;
}
appendBinaryStringInfo(result, fieldValue->val.string.val,
fieldValue->val.string.len);
return true;
}
/*
* DistributedTablesSize returns total size of data store in the cluster consisting
* of given distributed tables. We ignore tables which we cannot get their size.
@ -214,6 +392,36 @@ NextPow2(uint64 n)
}
/*
* UrlEncode URL encodes the given string and appends it to the given buffer.
* If either libcurl initialization or encoding fails, returns false.
*/
static bool
UrlEncode(StringInfo buf, const char *str)
{
bool success = false;
CURL *curl = NULL;
curl_global_init(CURL_GLOBAL_DEFAULT);
curl = curl_easy_init();
if (curl)
{
char *urlEncodedStr = curl_easy_escape(curl, str, strlen(str));
if (urlEncodedStr)
{
appendStringInfoString(buf, urlEncodedStr);
curl_free(urlEncodedStr);
success = true;
}
curl_easy_cleanup(curl);
}
curl_global_cleanup();
return success;
}
/*
* SendHttpPostJsonRequest sends a HTTP/HTTPS POST request to the given URL with
* the given json object.
@ -222,7 +430,6 @@ static bool
SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSeconds)
{
bool success = false;
CURLcode curlCode = false;
CURL *curl = NULL;
curl_global_init(CURL_GLOBAL_DEFAULT);
@ -230,6 +437,7 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond
if (curl)
{
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
headers = curl_slist_append(headers, "Content-Type: application/json");
headers = curl_slist_append(headers, "charsets: utf-8");
@ -238,27 +446,7 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutSeconds);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curlCode = curl_easy_perform(curl);
if (curlCode == CURLE_OK)
{
int64 httpCode = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
if (httpCode == 200)
{
success = true;
}
else if (httpCode >= 400 && httpCode < 500)
{
ereport(WARNING, (errmsg("HTTP request failed."),
errhint("HTTP response code: " INT64_FORMAT,
httpCode)));
}
}
else
{
ereport(WARNING, (errmsg("Sending HTTP POST request failed."),
errhint("Error code: %s.", curl_easy_strerror(curlCode))));
}
success = PerformHttpRequest(curl);
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
@ -270,6 +458,78 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond
}
/*
* SendHttpGetJsonRequest sends an HTTP/HTTPS GET request to the given URL, and
* expects a JSON response from server. GET parameters should be added to the url.
* responseCallback is called with the content of response.
*/
static bool
SendHttpGetJsonRequest(const char *url, long timeoutSeconds,
curl_write_callback responseCallback)
{
bool success = false;
CURL *curl = NULL;
curl_global_init(CURL_GLOBAL_DEFAULT);
curl = curl_easy_init();
if (curl)
{
struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "Accept: application/json");
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, timeoutSeconds);
curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, responseCallback);
success = PerformHttpRequest(curl);
curl_slist_free_all(headers);
curl_easy_cleanup(curl);
}
curl_global_cleanup();
return success;
}
/*
* PerformHttpRequest sends the HTTP request with the parameters set in the given
* curl object, and returns if it was successful or not. If the request was not
* successful, it may log some warnings. This method expects to take place after
* curl_easy_init() but before curl_easy_cleanup().
*/
static bool
PerformHttpRequest(CURL *curl)
{
bool success = false;
CURLcode curlCode = curl_easy_perform(curl);
if (curlCode == CURLE_OK)
{
int64 httpCode = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpCode);
if (httpCode == 200)
{
success = true;
}
else if (httpCode >= 400 && httpCode < 500)
{
ereport(WARNING, (errmsg("HTTP request failed."),
errhint("HTTP response code: " INT64_FORMAT,
httpCode)));
}
}
else
{
ereport(WARNING, (errmsg("Sending HTTP request failed."),
errhint("Error code: %s.", curl_easy_strerror(curlCode))));
}
return success;
}
#endif /* HAVE_LIBCURL */
/*

View File

@ -33,6 +33,7 @@ extern bool EnableStatisticsCollection;
extern void WarnIfSyncDNS(void);
extern bool CollectBasicUsageStatistics(void);
extern void CheckForUpdates(void);
#endif /* HAVE_LIBCURL */