Follow the patterns used in Deadlock Detection in Stats Collection.

This includes:

(1) Wrap everything inside a StartTransactionCommand()/CommitTransactionCommand().
This is so we can access the database. This also switches to a new memory context
and releases it, so we don't have to do our own memory management.

(2) LockCitusExtension() so the extension cannot be dropped or created concurrently.

(3) Check CitusHasBeenLoaded() && CheckCitusVersion() before doing any work.

(4) Do not PG_TRY() inside a loop.
pull/1751/head
Hadi Moshayedi 2017-10-31 21:18:14 -04:00 committed by Hadi Moshayedi
parent 100aaeb3f5
commit 97d544b75c
4 changed files with 122 additions and 88 deletions

View File

@ -214,8 +214,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
{ {
Oid databaseOid = DatumGetObjectId(main_arg); Oid databaseOid = DatumGetObjectId(main_arg);
MaintenanceDaemonDBData *myDbData = NULL; MaintenanceDaemonDBData *myDbData = NULL;
time_t prevStatsCollection = 0; TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = GetCurrentTimestamp();
bool prevStatsCollectionFailed = false;
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
/* /*
@ -278,30 +277,17 @@ CitusMaintenanceDaemonMain(Datum main_arg)
int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH; int latchFlags = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
double timeout = 10000.0; /* use this if the deadlock detection is disabled */ double timeout = 10000.0; /* use this if the deadlock detection is disabled */
bool foundDeadlock = false; bool foundDeadlock = false;
time_t currentTime = time(NULL);
double secondsSincePrevStatsCollection = difftime(currentTime,
prevStatsCollection);
bool citusHasBeenLoaded = false;
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
StartTransactionCommand();
citusHasBeenLoaded = CitusHasBeenLoaded();
CommitTransactionCommand();
if (!citusHasBeenLoaded)
{
continue;
}
/* /*
* XXX: We clear the metadata cache before every iteration because otherwise * XXX: Each task should clear the metadata cache before every iteration
* it might contain stale OIDs. It appears that in some cases invalidation * by calling InvalidateMetadataSystemCache(), because otherwise it
* messages for a DROP EXTENSION may arrive during deadlock detection and * might contain stale OIDs. It appears that in some cases invalidation
* messages for a DROP EXTENSION may arrive during these tasks and
* this causes us to cache a stale pg_dist_node OID. We'd actually expect * this causes us to cache a stale pg_dist_node OID. We'd actually expect
* all invalidations to arrive after obtaining a lock in LockCitusExtension. * all invalidations to arrive after obtaining a lock in LockCitusExtension.
*/ */
InvalidateMetadataSystemCache();
/* /*
* Perform Work. If a specific task needs to be called sooner than * Perform Work. If a specific task needs to be called sooner than
@ -309,35 +295,54 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* tasks should do their own time math about whether to re-run checks. * tasks should do their own time math about whether to re-run checks.
*/ */
if (secondsSincePrevStatsCollection >= STATISTICS_COLLECTION_INTERVAL ||
(prevStatsCollectionFailed &&
secondsSincePrevStatsCollection >= STATISTICS_COLLECTION_RETRY_INTERVAL))
{
#ifdef HAVE_LIBCURL #ifdef HAVE_LIBCURL
if (EnableStatisticsCollection) if (EnableStatisticsCollection &&
GetCurrentTimestamp() >= nextStatsCollectionTime)
{
bool statsCollectionSuccess = false;
InvalidateMetadataSystemCache();
StartTransactionCommand();
/*
* Lock the extension such that it cannot be dropped or created
* concurrently. Skip statistics collection if citus extension is
* not accessible.
*
* Similarly, we skip statistics collection if there exists any
* version mismatch or the extension is not fully created yet.
*/
if (!LockCitusExtension())
{ {
MemoryContext statsCollectionContext = ereport(DEBUG1, (errmsg("could not lock the citus extension, "
AllocSetContextCreate(CurrentMemoryContext, "skipping statistics collection")));
"StatsCollection",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext oldContext =
MemoryContextSwitchTo(statsCollectionContext);
WarnIfSyncDNS();
prevStatsCollectionFailed = !CollectBasicUsageStatistics();
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(statsCollectionContext);
prevStatsCollection = currentTime;
} }
#endif else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
WarnIfSyncDNS();
statsCollectionSuccess = CollectBasicUsageStatistics();
}
if (statsCollectionSuccess)
{
nextStatsCollectionTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
STATS_COLLECTION_TIMEOUT_MILLIS);
}
else
{
nextStatsCollectionTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
STATS_COLLECTION_RETRY_TIMEOUT_MILLIS);
}
CommitTransactionCommand();
} }
#endif
/* the config value -1 disables the distributed deadlock detection */ /* the config value -1 disables the distributed deadlock detection */
if (DistributedDeadlockDetectionTimeoutFactor != -1.0) if (DistributedDeadlockDetectionTimeoutFactor != -1.0)
{ {
InvalidateMetadataSystemCache();
StartTransactionCommand(); StartTransactionCommand();
/* /*

View File

@ -20,6 +20,8 @@
bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus */ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus */
PG_FUNCTION_INFO_V1(citus_server_id);
#ifdef HAVE_LIBCURL #ifdef HAVE_LIBCURL
#include <curl/curl.h> #include <curl/curl.h>
@ -27,6 +29,8 @@ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus
#include "access/xact.h" #include "access/xact.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/statistics_collection.h" #include "distributed/statistics_collection.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
@ -38,7 +42,7 @@ bool EnableStatisticsCollection = true; /* send basic usage statistics to Citus
#endif #endif
static uint64 NextPow2(uint64 n); static uint64 NextPow2(uint64 n);
static uint64 ClusterSize(List *distributedTableList); static uint64 DistributedTablesSize(List *distTableOids);
static bool SendHttpPostJsonRequest(const char *url, const char *postFields, long static bool SendHttpPostJsonRequest(const char *url, const char *postFields, long
timeoutSeconds); timeoutSeconds);
@ -67,36 +71,54 @@ WarnIfSyncDNS(void)
bool bool
CollectBasicUsageStatistics(void) CollectBasicUsageStatistics(void)
{ {
List *distributedTables = NIL; List *distTableOids = NIL;
uint64 roundedDistTableCount = 0; uint64 roundedDistTableCount = 0;
uint64 roundedClusterSize = 0; uint64 roundedClusterSize = 0;
uint32 workerNodeCount = 0; uint32 workerNodeCount = 0;
StringInfo fields = makeStringInfo(); StringInfo fields = makeStringInfo();
Datum metadataJsonbDatum = 0; Datum metadataJsonbDatum = 0;
char *metadataJsonbStr = NULL; char *metadataJsonbStr = NULL;
MemoryContext savedContext = CurrentMemoryContext;
struct utsname unameData; struct utsname unameData;
int unameResult PG_USED_FOR_ASSERTS_ONLY = 0;
bool metadataCollectionFailed = false;
memset(&unameData, 0, sizeof(unameData)); memset(&unameData, 0, sizeof(unameData));
StartTransactionCommand(); PG_TRY();
{
distTableOids = DistTableOidList();
roundedDistTableCount = NextPow2(list_length(distTableOids));
roundedClusterSize = NextPow2(DistributedTablesSize(distTableOids));
workerNodeCount = ActivePrimaryNodeCount();
metadataJsonbDatum = DistNodeMetadata();
metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out,
metadataJsonbDatum));
}
PG_CATCH();
{
ErrorData *edata = NULL;
MemoryContextSwitchTo(savedContext);
edata = CopyErrorData();
FlushErrorState();
/* rethrow as WARNING */
edata->elevel = WARNING;
ThrowErrorData(edata);
metadataCollectionFailed = true;
}
PG_END_TRY();
/* /*
* If there is a version mismatch between loaded version and available * Returning here instead of in PG_CATCH() since PG_END_TRY() resets couple
* version, metadata functions will fail. We return early to avoid crashing. * of global variables.
* This can happen when updating the Citus extension.
*/ */
if (!CheckCitusVersion(LOG_SERVER_ONLY)) if (metadataCollectionFailed)
{ {
CommitTransactionCommand();
return false; return false;
} }
distributedTables = DistributedTableList();
roundedDistTableCount = NextPow2(list_length(distributedTables)); unameResult = uname(&unameData);
roundedClusterSize = NextPow2(ClusterSize(distributedTables)); Assert(unameResult == 0); /* uname() always succeeds if we pass valid buffer */
workerNodeCount = ActivePrimaryNodeCount();
metadataJsonbDatum = DistNodeMetadata();
metadataJsonbStr = DatumGetCString(DirectFunctionCall1(jsonb_out,
metadataJsonbDatum));
uname(&unameData);
appendStringInfoString(fields, "{\"citus_version\": "); appendStringInfoString(fields, "{\"citus_version\": ");
escape_json(fields, CITUS_VERSION); escape_json(fields, CITUS_VERSION);
@ -112,46 +134,42 @@ CollectBasicUsageStatistics(void)
appendStringInfo(fields, ",\"node_metadata\": %s", metadataJsonbStr); appendStringInfo(fields, ",\"node_metadata\": %s", metadataJsonbStr);
appendStringInfoString(fields, "}"); appendStringInfoString(fields, "}");
CommitTransactionCommand();
return SendHttpPostJsonRequest(STATS_COLLECTION_HOST "/v1/usage_reports", return SendHttpPostJsonRequest(STATS_COLLECTION_HOST "/v1/usage_reports",
fields->data, HTTP_TIMEOUT_SECONDS); fields->data, HTTP_TIMEOUT_SECONDS);
} }
/* /*
* ClusterSize returns total size of data store in the cluster consisting of * DistributedTablesSize returns total size of data store in the cluster consisting
* given distributed tables. We ignore tables which we cannot get their size. * of given distributed tables. We ignore tables which we cannot get their size.
*/ */
static uint64 static uint64
ClusterSize(List *distributedTableList) DistributedTablesSize(List *distTableOids)
{ {
uint64 clusterSize = 0; uint64 totalSize = 0;
ListCell *distTableCacheEntryCell = NULL; ListCell *distTableOidCell = NULL;
foreach(distTableCacheEntryCell, distributedTableList) foreach(distTableOidCell, distTableOids)
{ {
DistTableCacheEntry *distTableCacheEntry = lfirst(distTableCacheEntryCell); Oid relationId = lfirst_oid(distTableOidCell);
Oid relationId = distTableCacheEntry->relationId; Datum tableSizeDatum = 0;
MemoryContext savedContext = CurrentMemoryContext;
PG_TRY(); /*
* Ignore hash partitioned tables with size greater than 1, since
* citus_table_size() doesn't work on them.
*/
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH &&
!SingleReplicatedTable(relationId))
{ {
Datum distTableSizeDatum = DirectFunctionCall1(citus_table_size, continue;
ObjectIdGetDatum(relationId));
clusterSize += DatumGetInt64(distTableSizeDatum);
} }
PG_CATCH();
{
FlushErrorState();
/* citus_table_size() throws an error while the memory context is changed */ tableSizeDatum = DirectFunctionCall1(citus_table_size,
MemoryContextSwitchTo(savedContext); ObjectIdGetDatum(relationId));
} totalSize += DatumGetInt64(tableSizeDatum);
PG_END_TRY();
} }
return clusterSize; return totalSize;
} }
@ -242,8 +260,6 @@ SendHttpPostJsonRequest(const char *url, const char *jsonObj, long timeoutSecond
#endif /* HAVE_LIBCURL */ #endif /* HAVE_LIBCURL */
PG_FUNCTION_INFO_V1(citus_server_id);
/* /*
* citus_server_id returns a random UUID value as server identifier. This is * citus_server_id returns a random UUID value as server identifier. This is
* modeled after PostgreSQL's pg_random_uuid(). * modeled after PostgreSQL's pg_random_uuid().
@ -254,11 +270,14 @@ citus_server_id(PG_FUNCTION_ARGS)
uint8 *buf = (uint8 *) palloc(UUID_LEN); uint8 *buf = (uint8 *) palloc(UUID_LEN);
#if PG_VERSION_NUM >= 100000 #if PG_VERSION_NUM >= 100000
/*
* If pg_backend_random() fails, fall-back to using random(). In previous
* versions of postgres we don't have pg_backend_random(), so use it by
* default in that case.
*/
if (!pg_backend_random((char *) buf, UUID_LEN)) if (!pg_backend_random((char *) buf, UUID_LEN))
{ #endif
ereport(ERROR, (errmsg("failed to generate server identifier")));
}
#else
{ {
int bufIdx = 0; int bufIdx = 0;
for (bufIdx = 0; bufIdx < UUID_LEN; bufIdx++) for (bufIdx = 0; bufIdx < UUID_LEN; bufIdx++)
@ -266,7 +285,6 @@ citus_server_id(PG_FUNCTION_ARGS)
buf[bufIdx] = (uint8) (random() & 0xFF); buf[bufIdx] = (uint8) (random() & 0xFF);
} }
} }
#endif
/* /*
* Set magic numbers for a "version 4" (pseudorandom) UUID, see * Set magic numbers for a "version 4" (pseudorandom) UUID, see

View File

@ -13,10 +13,10 @@
#define MAINTENANCED_H #define MAINTENANCED_H
/* collect statistics every 24 hours */ /* collect statistics every 24 hours */
#define STATISTICS_COLLECTION_INTERVAL 86400 #define STATS_COLLECTION_TIMEOUT_MILLIS (24 * 60 * 60 * 1000)
/* if statistics collection fails, retry in 1 minute */ /* if statistics collection fails, retry in 1 minute */
#define STATISTICS_COLLECTION_RETRY_INTERVAL 60 #define STATS_COLLECTION_RETRY_TIMEOUT_MILLIS (60 * 1000)
/* config variable for */ /* config variable for */
extern double DistributedDeadlockDetectionTimeoutFactor; extern double DistributedDeadlockDetectionTimeoutFactor;

View File

@ -12,6 +12,17 @@
#include "citus_version.h" #include "citus_version.h"
/*
* Append USED_WITH_LIBCURL_ONLY to definitions of variables that are only used
* when compiled with libcurl, to avoid compiler warnings about unused variables
* when built without libcurl.
*/
#ifdef HAVE_LIBCURL
#define USED_WITH_LIBCURL_ONLY
#else
#define USED_WITH_LIBCURL_ONLY pg_attribute_unused()
#endif
/* Config variables managed via guc.c */ /* Config variables managed via guc.c */
extern bool EnableStatisticsCollection; extern bool EnableStatisticsCollection;