mirror of https://github.com/citusdata/citus.git
Do metadata sync in a separate background worker.
parent
299d3fcbc5
commit
4df723cf9b
|
@ -1138,6 +1138,10 @@ TriggerSyncMetadataToPrimaryNodes(void)
|
||||||
|
|
||||||
triggerMetadataSync = true;
|
triggerMetadataSync = true;
|
||||||
}
|
}
|
||||||
|
else if (!workerNode->metadataSynced)
|
||||||
|
{
|
||||||
|
triggerMetadataSync = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* let the maintanince deamon know about the metadata sync */
|
/* let the maintanince deamon know about the metadata sync */
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include <signal.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
|
@ -28,6 +29,7 @@
|
||||||
#include "catalog/pg_foreign_server.h"
|
#include "catalog/pg_foreign_server.h"
|
||||||
#include "catalog/pg_namespace.h"
|
#include "catalog/pg_namespace.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
#include "commands/async.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/commands.h"
|
#include "distributed/commands.h"
|
||||||
#include "distributed/deparser.h"
|
#include "distributed/deparser.h"
|
||||||
|
@ -35,6 +37,7 @@
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/metadata_utility.h"
|
#include "distributed/metadata_utility.h"
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/metadata/distobject.h"
|
#include "distributed/metadata/distobject.h"
|
||||||
|
@ -48,11 +51,15 @@
|
||||||
#include "foreign/foreign.h"
|
#include "foreign/foreign.h"
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
#include "postmaster/bgworker.h"
|
||||||
|
#include "postmaster/postmaster.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/memutils.h"
|
#include "utils/memutils.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
|
||||||
|
|
||||||
|
@ -76,11 +83,18 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
|
||||||
char *permission,
|
char *permission,
|
||||||
bool withGrantOption);
|
bool withGrantOption);
|
||||||
static char * GenerateSetRoleQuery(Oid roleOid);
|
static char * GenerateSetRoleQuery(Oid roleOid);
|
||||||
|
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
|
||||||
|
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
||||||
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
||||||
PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
|
PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
|
||||||
|
|
||||||
|
static bool got_SIGTERM = false;
|
||||||
|
static bool got_SIGALRM = false;
|
||||||
|
|
||||||
|
#define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon"
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* start_metadata_sync_to_node function sets hasmetadata column of the given
|
* start_metadata_sync_to_node function sets hasmetadata column of the given
|
||||||
|
@ -1497,7 +1511,7 @@ DetachPartitionCommandList(void)
|
||||||
* metadata workers that are out of sync. Returns the result of
|
* metadata workers that are out of sync. Returns the result of
|
||||||
* synchronization.
|
* synchronization.
|
||||||
*/
|
*/
|
||||||
MetadataSyncResult
|
static MetadataSyncResult
|
||||||
SyncMetadataToNodes(void)
|
SyncMetadataToNodes(void)
|
||||||
{
|
{
|
||||||
MetadataSyncResult result = METADATA_SYNC_SUCCESS;
|
MetadataSyncResult result = METADATA_SYNC_SUCCESS;
|
||||||
|
@ -1527,6 +1541,9 @@ SyncMetadataToNodes(void)
|
||||||
|
|
||||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||||
{
|
{
|
||||||
|
ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
|
||||||
|
workerNode->workerName,
|
||||||
|
workerNode->workerPort)));
|
||||||
result = METADATA_SYNC_FAILED_SYNC;
|
result = METADATA_SYNC_FAILED_SYNC;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -1539,3 +1556,244 @@ SyncMetadataToNodes(void)
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SyncMetadataToNodesMain is the main function for syncing metadata to
|
||||||
|
* MX nodes. It retries until success and then exits.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SyncMetadataToNodesMain(Datum main_arg)
|
||||||
|
{
|
||||||
|
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||||
|
|
||||||
|
/* extension owner is passed via bgw_extra */
|
||||||
|
Oid extensionOwner = InvalidOid;
|
||||||
|
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
||||||
|
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
||||||
|
|
||||||
|
pqsignal(SIGTERM, MetadataSyncSigTermHandler);
|
||||||
|
pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
|
||||||
|
BackgroundWorkerUnblockSignals();
|
||||||
|
|
||||||
|
/* connect to database, after that we can actually access catalogs */
|
||||||
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
||||||
|
|
||||||
|
/* make worker recognizable in pg_stat_activity */
|
||||||
|
pgstat_report_appname(METADATA_SYNC_APP_NAME);
|
||||||
|
|
||||||
|
bool syncedAllNodes = false;
|
||||||
|
|
||||||
|
while (!syncedAllNodes)
|
||||||
|
{
|
||||||
|
InvalidateMetadataSystemCache();
|
||||||
|
StartTransactionCommand();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Some functions in ruleutils.c, which we use to get the DDL for
|
||||||
|
* metadata propagation, require an active snapshot.
|
||||||
|
*/
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
if (!LockCitusExtension())
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||||
|
"skipping metadata sync")));
|
||||||
|
}
|
||||||
|
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||||
|
{
|
||||||
|
UseCoordinatedTransaction();
|
||||||
|
MetadataSyncResult result = SyncMetadataToNodes();
|
||||||
|
|
||||||
|
syncedAllNodes = (result == METADATA_SYNC_SUCCESS);
|
||||||
|
|
||||||
|
/* we use LISTEN/NOTIFY to wait for metadata syncing in tests */
|
||||||
|
if (result != METADATA_SYNC_FAILED_LOCK)
|
||||||
|
{
|
||||||
|
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PopActiveSnapshot();
|
||||||
|
CommitTransactionCommand();
|
||||||
|
ProcessCompletedNotifies();
|
||||||
|
|
||||||
|
if (syncedAllNodes)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If backend is cancelled (e.g. bacause of distributed deadlock),
|
||||||
|
* CHECK_FOR_INTERRUPTS() will raise a cancellation error which will
|
||||||
|
* result in exit(1).
|
||||||
|
*/
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SIGTERM is used for when maintenance daemon tries to clean-up
|
||||||
|
* metadata sync daemons spawned by terminated maintenance daemons.
|
||||||
|
*/
|
||||||
|
if (got_SIGTERM)
|
||||||
|
{
|
||||||
|
exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SIGALRM is used for testing purposes and it simulates an error in metadata
|
||||||
|
* sync daemon.
|
||||||
|
*/
|
||||||
|
if (got_SIGALRM)
|
||||||
|
{
|
||||||
|
elog(ERROR, "Error in metadata sync daemon");
|
||||||
|
}
|
||||||
|
|
||||||
|
pg_usleep(MetadataSyncRetryInterval * 1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MetadataSyncSigTermHandler set a flag to request termination of metadata
|
||||||
|
* sync daemon.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MetadataSyncSigTermHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
int save_errno = errno;
|
||||||
|
|
||||||
|
got_SIGTERM = true;
|
||||||
|
if (MyProc != NULL)
|
||||||
|
{
|
||||||
|
SetLatch(&MyProc->procLatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = save_errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MetadataSyncSigAlrmHandler set a flag to request error at metadata
|
||||||
|
* sync daemon. This is used for testing purposes.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
|
||||||
|
{
|
||||||
|
int save_errno = errno;
|
||||||
|
|
||||||
|
got_SIGALRM = true;
|
||||||
|
if (MyProc != NULL)
|
||||||
|
{
|
||||||
|
SetLatch(&MyProc->procLatch);
|
||||||
|
}
|
||||||
|
|
||||||
|
errno = save_errno;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SpawnSyncMetadataToNodes starts a background worker which runs metadata
|
||||||
|
* sync. On success it returns workers' handle. Otherwise it returns NULL.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *
|
||||||
|
SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner)
|
||||||
|
{
|
||||||
|
BackgroundWorker worker;
|
||||||
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
|
|
||||||
|
/* Configure a worker. */
|
||||||
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
||||||
|
"Citus Metadata Sync: %u/%u",
|
||||||
|
database, extensionOwner);
|
||||||
|
worker.bgw_flags =
|
||||||
|
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
|
/* don't restart, we manage restarts from maintenance daemon */
|
||||||
|
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||||
|
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
||||||
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
|
"SyncMetadataToNodesMain");
|
||||||
|
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
||||||
|
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
||||||
|
sizeof(Oid));
|
||||||
|
worker.bgw_notify_pid = MyProcPid;
|
||||||
|
|
||||||
|
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pid_t pid;
|
||||||
|
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||||
|
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SignalMetadataSyncDaemon signals metadata sync daemons belonging to
|
||||||
|
* the given database.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SignalMetadataSyncDaemon(Oid database, int sig)
|
||||||
|
{
|
||||||
|
int backendCount = pgstat_fetch_stat_numbackends();
|
||||||
|
for (int backend = 1; backend <= backendCount; backend++)
|
||||||
|
{
|
||||||
|
LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend);
|
||||||
|
if (!localBeEntry)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
PgBackendStatus *beStatus = &localBeEntry->backendStatus;
|
||||||
|
if (beStatus->st_databaseid == database &&
|
||||||
|
strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0)
|
||||||
|
{
|
||||||
|
kill(beStatus->st_procpid, sig);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated.
|
||||||
|
* It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the
|
||||||
|
* check.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ShouldInitiateMetadataSync(bool *lockFailure)
|
||||||
|
{
|
||||||
|
if (!IsCoordinator())
|
||||||
|
{
|
||||||
|
*lockFailure = false;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid distNodeOid = DistNodeRelationId();
|
||||||
|
if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock))
|
||||||
|
{
|
||||||
|
*lockFailure = true;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool shouldSyncMetadata = false;
|
||||||
|
|
||||||
|
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, workerList)
|
||||||
|
{
|
||||||
|
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||||
|
{
|
||||||
|
shouldSyncMetadata = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
UnlockRelationOid(distNodeOid, AccessShareLock);
|
||||||
|
|
||||||
|
*lockFailure = false;
|
||||||
|
return shouldSyncMetadata;
|
||||||
|
}
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
#include "distributed/maintenanced.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
|
@ -28,6 +29,8 @@
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
|
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
|
||||||
PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
|
PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
|
||||||
|
PG_FUNCTION_INFO_V1(trigger_metadata_sync);
|
||||||
|
PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -124,3 +127,26 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* trigger_metadata_sync triggers metadata sync for testing.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
trigger_metadata_sync(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
TriggerMetadataSync(MyDatabaseId);
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* raise_error_in_metadata_sync causes metadata sync to raise an error.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
raise_error_in_metadata_sync(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
/* metadata sync uses SIGALRM to test errors */
|
||||||
|
SignalMetadataSyncDaemon(MyDatabaseId, SIGALRM);
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
|
@ -118,7 +118,6 @@ static size_t MaintenanceDaemonShmemSize(void);
|
||||||
static void MaintenanceDaemonShmemInit(void);
|
static void MaintenanceDaemonShmemInit(void);
|
||||||
static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
||||||
static void MaintenanceDaemonErrorContext(void *arg);
|
static void MaintenanceDaemonErrorContext(void *arg);
|
||||||
static bool LockCitusExtension(void);
|
|
||||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||||
static void WarnMaintenanceDaemonNotStarted(void);
|
static void WarnMaintenanceDaemonNotStarted(void);
|
||||||
|
|
||||||
|
@ -291,6 +290,13 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
TimestampTz lastRecoveryTime = 0;
|
TimestampTz lastRecoveryTime = 0;
|
||||||
TimestampTz nextMetadataSyncTime = 0;
|
TimestampTz nextMetadataSyncTime = 0;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We do metadata sync in a separate background worker. We need its
|
||||||
|
* handle to be able to check its status.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Look up this worker's configuration.
|
* Look up this worker's configuration.
|
||||||
*/
|
*/
|
||||||
|
@ -371,6 +377,12 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
/* make worker recognizable in pg_stat_activity */
|
/* make worker recognizable in pg_stat_activity */
|
||||||
pgstat_report_appname("Citus Maintenance Daemon");
|
pgstat_report_appname("Citus Maintenance Daemon");
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||||
|
* or crashed maintenanced instances.
|
||||||
|
*/
|
||||||
|
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
||||||
|
|
||||||
/* enter main loop */
|
/* enter main loop */
|
||||||
while (!got_SIGTERM)
|
while (!got_SIGTERM)
|
||||||
{
|
{
|
||||||
|
@ -450,21 +462,42 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
if (!RecoveryInProgress() &&
|
pid_t metadataSyncBgwPid = 0;
|
||||||
(MetadataSyncTriggeredCheckAndReset(myDbData) ||
|
BgwHandleStatus metadataSyncStatus =
|
||||||
GetCurrentTimestamp() >= nextMetadataSyncTime))
|
metadataSyncBgwHandle != NULL ?
|
||||||
|
GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) :
|
||||||
|
BGWH_STOPPED;
|
||||||
|
|
||||||
|
if (metadataSyncStatus != BGWH_STOPPED &&
|
||||||
|
GetCurrentTimestamp() >= nextMetadataSyncTime)
|
||||||
{
|
{
|
||||||
bool metadataSyncFailed = false;
|
/*
|
||||||
|
* Metadata sync is still running, recheck in a short while.
|
||||||
|
*/
|
||||||
|
int nextTimeout = MetadataSyncRetryInterval;
|
||||||
|
nextMetadataSyncTime =
|
||||||
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
||||||
|
timeout = Min(timeout, nextTimeout);
|
||||||
|
}
|
||||||
|
else if (!RecoveryInProgress() &&
|
||||||
|
metadataSyncStatus == BGWH_STOPPED &&
|
||||||
|
(MetadataSyncTriggeredCheckAndReset(myDbData) ||
|
||||||
|
GetCurrentTimestamp() >= nextMetadataSyncTime))
|
||||||
|
{
|
||||||
|
if (metadataSyncBgwHandle)
|
||||||
|
{
|
||||||
|
TerminateBackgroundWorker(metadataSyncBgwHandle);
|
||||||
|
pfree(metadataSyncBgwHandle);
|
||||||
|
metadataSyncBgwHandle = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
InvalidateMetadataSystemCache();
|
InvalidateMetadataSystemCache();
|
||||||
StartTransactionCommand();
|
StartTransactionCommand();
|
||||||
|
|
||||||
/*
|
|
||||||
* Some functions in ruleutils.c, which we use to get the DDL for
|
|
||||||
* metadata propagation, require an active snapshot.
|
|
||||||
*/
|
|
||||||
PushActiveSnapshot(GetTransactionSnapshot());
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
|
||||||
|
int nextTimeout = MetadataSyncRetryInterval;
|
||||||
|
bool syncMetadata = false;
|
||||||
|
|
||||||
if (!LockCitusExtension())
|
if (!LockCitusExtension())
|
||||||
{
|
{
|
||||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||||
|
@ -472,25 +505,28 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
}
|
}
|
||||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||||
{
|
{
|
||||||
MetadataSyncResult result = SyncMetadataToNodes();
|
bool lockFailure = false;
|
||||||
metadataSyncFailed = (result != METADATA_SYNC_SUCCESS);
|
syncMetadata = ShouldInitiateMetadataSync(&lockFailure);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Notification means we had an attempt on synchronization
|
* If lock fails, we need to recheck in a short while. If we are
|
||||||
* without being blocked for pg_dist_node access.
|
* going to sync metadata, we should recheck in a short while to
|
||||||
|
* see if it failed. Otherwise, we can wait longer.
|
||||||
*/
|
*/
|
||||||
if (result != METADATA_SYNC_FAILED_LOCK)
|
nextTimeout = (lockFailure || syncMetadata) ?
|
||||||
{
|
MetadataSyncRetryInterval :
|
||||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
MetadataSyncInterval;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
ProcessCompletedNotifies();
|
|
||||||
|
|
||||||
int64 nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval :
|
if (syncMetadata)
|
||||||
MetadataSyncInterval;
|
{
|
||||||
|
metadataSyncBgwHandle =
|
||||||
|
SpawnSyncMetadataToNodes(MyDatabaseId, myDbData->userOid);
|
||||||
|
}
|
||||||
|
|
||||||
nextMetadataSyncTime =
|
nextMetadataSyncTime =
|
||||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
||||||
timeout = Min(timeout, nextTimeout);
|
timeout = Min(timeout, nextTimeout);
|
||||||
|
@ -626,6 +662,11 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
ProcessConfigFile(PGC_SIGHUP);
|
ProcessConfigFile(PGC_SIGHUP);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (metadataSyncBgwHandle)
|
||||||
|
{
|
||||||
|
TerminateBackgroundWorker(metadataSyncBgwHandle);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -786,7 +827,7 @@ MaintenanceDaemonErrorContext(void *arg)
|
||||||
* LockCitusExtension acquires a lock on the Citus extension or returns
|
* LockCitusExtension acquires a lock on the Citus extension or returns
|
||||||
* false if the extension does not exist or is being dropped.
|
* false if the extension does not exist or is being dropped.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
LockCitusExtension(void)
|
LockCitusExtension(void)
|
||||||
{
|
{
|
||||||
Oid extensionOid = get_extension_oid("citus", true);
|
Oid extensionOid = get_extension_oid("citus", true);
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern void StopMaintenanceDaemon(Oid databaseId);
|
||||||
extern void TriggerMetadataSync(Oid databaseId);
|
extern void TriggerMetadataSync(Oid databaseId);
|
||||||
extern void InitializeMaintenanceDaemon(void);
|
extern void InitializeMaintenanceDaemon(void);
|
||||||
extern void InitializeMaintenanceDaemonBackend(void);
|
extern void InitializeMaintenanceDaemonBackend(void);
|
||||||
|
extern bool LockCitusExtension(void);
|
||||||
|
|
||||||
extern void CitusMaintenanceDaemonMain(Datum main_arg);
|
extern void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||||
|
|
||||||
|
|
|
@ -50,11 +50,14 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
|
||||||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||||
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
||||||
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
||||||
extern MetadataSyncResult SyncMetadataToNodes(void);
|
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
|
||||||
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
|
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
|
||||||
nodePort,
|
nodePort,
|
||||||
const char *nodeUser,
|
const char *nodeUser,
|
||||||
List *commandList);
|
List *commandList);
|
||||||
|
extern void SyncMetadataToNodesMain(Datum main_arg);
|
||||||
|
extern void SignalMetadataSyncDaemon(Oid database, int sig);
|
||||||
|
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
|
||||||
|
|
||||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
|
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
|
||||||
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
||||||
|
|
|
@ -28,11 +28,11 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
390 389 f
|
395 394 f
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
389
|
394
|
||||||
390 389
|
395 394
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
@ -75,14 +75,14 @@ step detector-dump-wait-edges:
|
||||||
|
|
||||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||||
|
|
||||||
394 393 f
|
399 398 f
|
||||||
395 393 f
|
400 398 f
|
||||||
395 394 t
|
400 399 t
|
||||||
transactionnumberwaitingtransactionnumbers
|
transactionnumberwaitingtransactionnumbers
|
||||||
|
|
||||||
393
|
398
|
||||||
394 393
|
399 398
|
||||||
395 393,394
|
400 398,399
|
||||||
step s1-abort:
|
step s1-abort:
|
||||||
ABORT;
|
ABORT;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,198 @@
|
||||||
|
Parsed test spec with 3 sessions
|
||||||
|
|
||||||
|
starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step enable-deadlock-detection:
|
||||||
|
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
|
||||||
|
|
||||||
|
step reload-conf:
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
pg_reload_conf
|
||||||
|
|
||||||
|
t
|
||||||
|
step s2-start-session-level-connection:
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||||
|
|
||||||
|
start_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-update-1:
|
||||||
|
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
|
||||||
|
|
||||||
|
step s2-begin-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-update-2-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-truncate-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s3-invalidate-metadata:
|
||||||
|
update pg_dist_node SET metadatasynced = false;
|
||||||
|
|
||||||
|
step s3-resync:
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
|
||||||
|
trigger_metadata_sync
|
||||||
|
|
||||||
|
|
||||||
|
pg_sleep
|
||||||
|
|
||||||
|
|
||||||
|
step s2-update-1-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-update-2:
|
||||||
|
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
|
||||||
|
<waiting ...>
|
||||||
|
step s1-update-2: <... completed>
|
||||||
|
step s2-update-1-on-worker: <... completed>
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
error in steps s1-update-2 s2-update-1-on-worker: ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-commit-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step disable-deadlock-detection:
|
||||||
|
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||||
|
|
||||||
|
step reload-conf:
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
pg_reload_conf
|
||||||
|
|
||||||
|
t
|
||||||
|
step s2-stop-connection:
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
|
||||||
|
stop_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
restore_isolation_tester_func
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync
|
||||||
|
create_distributed_table
|
||||||
|
|
||||||
|
|
||||||
|
step increase-retry-interval:
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
|
||||||
|
|
||||||
|
step reload-conf:
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
pg_reload_conf
|
||||||
|
|
||||||
|
t
|
||||||
|
step s2-start-session-level-connection:
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||||
|
|
||||||
|
start_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-begin-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-truncate-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s3-invalidate-metadata:
|
||||||
|
update pg_dist_node SET metadatasynced = false;
|
||||||
|
|
||||||
|
step s3-resync:
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
|
||||||
|
trigger_metadata_sync
|
||||||
|
|
||||||
|
|
||||||
|
pg_sleep
|
||||||
|
|
||||||
|
|
||||||
|
step s1-count-daemons:
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
1
|
||||||
|
step s1-cancel-metadata-sync:
|
||||||
|
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
|
||||||
|
pg_cancel_backend
|
||||||
|
|
||||||
|
t
|
||||||
|
pg_sleep
|
||||||
|
|
||||||
|
|
||||||
|
step s1-count-daemons:
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
|
||||||
|
count
|
||||||
|
|
||||||
|
0
|
||||||
|
step reset-retry-interval:
|
||||||
|
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
|
||||||
|
|
||||||
|
step reload-conf:
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
pg_reload_conf
|
||||||
|
|
||||||
|
t
|
||||||
|
step s2-commit-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s2-stop-connection:
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
|
||||||
|
stop_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s3-resync:
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
|
||||||
|
trigger_metadata_sync
|
||||||
|
|
||||||
|
|
||||||
|
pg_sleep
|
||||||
|
|
||||||
|
|
||||||
|
restore_isolation_tester_func
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,27 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
|
||||||
master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||||
ARRAY['SELECT pg_reload_conf()'], false);
|
ARRAY['SELECT pg_reload_conf()'], false);
|
||||||
$$;
|
$$;
|
||||||
|
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
|
||||||
|
declare
|
||||||
|
counter integer := -1;
|
||||||
|
begin
|
||||||
|
while counter != target_count loop
|
||||||
|
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
|
||||||
|
-- Look it up in a new connection to get latest updates.
|
||||||
|
SELECT result::int into counter FROM
|
||||||
|
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
|
||||||
|
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
|
||||||
|
PERFORM pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$ LANGUAGE plpgsql;
|
||||||
-- add a node to the cluster
|
-- add a node to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||||
|
@ -152,6 +173,142 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||||
2 | t | f
|
2 | t | f
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- verify that metadata sync daemon has started
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- terminate maintenance daemon, and verify that we don't spawn multiple
|
||||||
|
-- metadata sync daemons
|
||||||
|
--
|
||||||
|
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
pg_terminate_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
|
||||||
|
select trigger_metadata_sync();
|
||||||
|
trigger_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- cancel metadata sync daemon, and verify that it exits and restarts.
|
||||||
|
--
|
||||||
|
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
pg_cancel_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
|
||||||
|
metadata_sync_restarted
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- cancel metadata sync daemon so it exits and restarts, but at the
|
||||||
|
-- same time tell maintenanced to trigger a new metadata sync. One
|
||||||
|
-- of these should exit to avoid multiple metadata syncs.
|
||||||
|
--
|
||||||
|
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
pg_cancel_backend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select trigger_metadata_sync();
|
||||||
|
trigger_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
|
||||||
|
select pg_sleep(1.2);
|
||||||
|
pg_sleep
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- error in metadata sync daemon, and verify it exits and restarts.
|
||||||
|
--
|
||||||
|
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select raise_error_in_metadata_sync();
|
||||||
|
raise_error_in_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select wait_until_metadata_sync(30000);
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
|
||||||
|
metadata_sync_restarted
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
trigger_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT wait_until_metadata_sync(30000);
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- update it back to :worker_1_port, now metadata should be synced
|
-- update it back to :worker_1_port, now metadata should be synced
|
||||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||||
?column?
|
?column?
|
||||||
|
@ -594,6 +751,59 @@ SELECT verify_metadata('localhost', :worker_1_port);
|
||||||
t
|
t
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- verify that metadata sync daemon exits
|
||||||
|
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
|
||||||
|
-- verify that DROP DATABASE terminates metadata sync
|
||||||
|
SELECT current_database() datname \gset
|
||||||
|
CREATE DATABASE db_to_drop;
|
||||||
|
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||||
|
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
|
||||||
|
run_command_on_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(localhost,57637,t,"CREATE DATABASE")
|
||||||
|
(localhost,57638,t,"CREATE DATABASE")
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
\c db_to_drop - - :worker_1_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
\c db_to_drop - - :master_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
|
master_add_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
UPDATE pg_dist_node SET hasmetadata = true;
|
||||||
|
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
|
||||||
|
master_update_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
trigger_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\c :datname - - :master_port
|
||||||
|
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
datname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
db_to_drop
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP DATABASE db_to_drop;
|
||||||
|
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
datname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
TRUNCATE pg_dist_colocation;
|
TRUNCATE pg_dist_colocation;
|
||||||
|
|
|
@ -26,7 +26,7 @@ WITH dist_node_summary AS (
|
||||||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||||
false)
|
false)
|
||||||
), dist_placement_summary AS (
|
), dist_placement_summary AS (
|
||||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||||
), dist_placement_check AS (
|
), dist_placement_check AS (
|
||||||
SELECT count(distinct result) = 1 AS matches
|
SELECT count(distinct result) = 1 AS matches
|
||||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||||
|
|
|
@ -80,3 +80,4 @@ test: isolation_insert_select_vs_all_on_mx
|
||||||
test: isolation_ref_select_for_update_vs_all_on_mx
|
test: isolation_ref_select_for_update_vs_all_on_mx
|
||||||
test: isolation_ref_update_delete_upsert_vs_all_on_mx
|
test: isolation_ref_update_delete_upsert_vs_all_on_mx
|
||||||
test: isolation_dis2ref_foreign_keys_on_mx
|
test: isolation_dis2ref_foreign_keys_on_mx
|
||||||
|
test: isolation_metadata_sync_deadlock
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
#include "isolation_mx_common.include.spec"
|
||||||
|
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
|
CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int);
|
||||||
|
INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i;
|
||||||
|
SELECT create_distributed_table('deadlock_detection_test', 'user_id');
|
||||||
|
|
||||||
|
CREATE TABLE t2(a int);
|
||||||
|
SELECT create_distributed_table('t2', 'a');
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP FUNCTION trigger_metadata_sync();
|
||||||
|
DROP TABLE deadlock_detection_test;
|
||||||
|
DROP TABLE t2;
|
||||||
|
SET citus.shard_replication_factor = 1;
|
||||||
|
SELECT citus_internal.restore_isolation_tester_func();
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "increase-retry-interval"
|
||||||
|
{
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "reset-retry-interval"
|
||||||
|
{
|
||||||
|
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "enable-deadlock-detection"
|
||||||
|
{
|
||||||
|
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "disable-deadlock-detection"
|
||||||
|
{
|
||||||
|
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "reload-conf"
|
||||||
|
{
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-update-1"
|
||||||
|
{
|
||||||
|
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-update-2"
|
||||||
|
{
|
||||||
|
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-commit"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-count-daemons"
|
||||||
|
{
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-cancel-metadata-sync"
|
||||||
|
{
|
||||||
|
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-start-session-level-connection"
|
||||||
|
{
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-stop-connection"
|
||||||
|
{
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-begin-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-1-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-update-2-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-truncate-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-commit-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s3"
|
||||||
|
|
||||||
|
step "s3-invalidate-metadata"
|
||||||
|
{
|
||||||
|
update pg_dist_node SET metadatasynced = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-resync"
|
||||||
|
{
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
SELECT pg_sleep(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Backends can block metadata sync. The following test verifies that if this happens,
|
||||||
|
// we still do distributed deadlock detection. In the following, s2-truncate-on-worker
|
||||||
|
// causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are
|
||||||
|
// themselves involved in a distributed deadlock.
|
||||||
|
// See https://github.com/citusdata/citus/issues/4393 for more details.
|
||||||
|
permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection"
|
||||||
|
|
||||||
|
// Test that when metadata sync is waiting for locks, cancelling it terminates it.
|
||||||
|
// This is important in cases where the metadata sync daemon itself is involved in a deadlock.
|
||||||
|
permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync"
|
|
@ -27,6 +27,30 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
|
||||||
ARRAY['SELECT pg_reload_conf()'], false);
|
ARRAY['SELECT pg_reload_conf()'], false);
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
|
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
|
||||||
|
declare
|
||||||
|
counter integer := -1;
|
||||||
|
begin
|
||||||
|
while counter != target_count loop
|
||||||
|
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
|
||||||
|
-- Look it up in a new connection to get latest updates.
|
||||||
|
SELECT result::int into counter FROM
|
||||||
|
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
|
||||||
|
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
|
||||||
|
PERFORM pg_sleep(0.1);
|
||||||
|
end loop;
|
||||||
|
end$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
-- add a node to the cluster
|
-- add a node to the cluster
|
||||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||||
|
@ -79,6 +103,54 @@ END;
|
||||||
SELECT wait_until_metadata_sync(30000);
|
SELECT wait_until_metadata_sync(30000);
|
||||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||||
|
|
||||||
|
-- verify that metadata sync daemon has started
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
|
||||||
|
--
|
||||||
|
-- terminate maintenance daemon, and verify that we don't spawn multiple
|
||||||
|
-- metadata sync daemons
|
||||||
|
--
|
||||||
|
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||||
|
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
|
||||||
|
select trigger_metadata_sync();
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
|
||||||
|
--
|
||||||
|
-- cancel metadata sync daemon, and verify that it exits and restarts.
|
||||||
|
--
|
||||||
|
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- cancel metadata sync daemon so it exits and restarts, but at the
|
||||||
|
-- same time tell maintenanced to trigger a new metadata sync. One
|
||||||
|
-- of these should exit to avoid multiple metadata syncs.
|
||||||
|
--
|
||||||
|
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
select trigger_metadata_sync();
|
||||||
|
select wait_until_metadata_sync();
|
||||||
|
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
|
||||||
|
select pg_sleep(1.2);
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
|
||||||
|
--
|
||||||
|
-- error in metadata sync daemon, and verify it exits and restarts.
|
||||||
|
--
|
||||||
|
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select raise_error_in_metadata_sync();
|
||||||
|
select wait_until_metadata_sync(30000);
|
||||||
|
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||||
|
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
|
||||||
|
|
||||||
|
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
SELECT wait_until_metadata_sync(30000);
|
||||||
|
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||||
|
|
||||||
-- update it back to :worker_1_port, now metadata should be synced
|
-- update it back to :worker_1_port, now metadata should be synced
|
||||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||||
SELECT wait_until_metadata_sync(30000);
|
SELECT wait_until_metadata_sync(30000);
|
||||||
|
@ -249,6 +321,39 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
SELECT verify_metadata('localhost', :worker_1_port);
|
SELECT verify_metadata('localhost', :worker_1_port);
|
||||||
|
|
||||||
|
-- verify that metadata sync daemon exits
|
||||||
|
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
|
||||||
|
|
||||||
|
-- verify that DROP DATABASE terminates metadata sync
|
||||||
|
SELECT current_database() datname \gset
|
||||||
|
CREATE DATABASE db_to_drop;
|
||||||
|
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
|
||||||
|
|
||||||
|
\c db_to_drop - - :worker_1_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
|
||||||
|
\c db_to_drop - - :master_port
|
||||||
|
CREATE EXTENSION citus;
|
||||||
|
SELECT master_add_node('localhost', :worker_1_port);
|
||||||
|
UPDATE pg_dist_node SET hasmetadata = true;
|
||||||
|
|
||||||
|
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
|
SELECT trigger_metadata_sync();
|
||||||
|
|
||||||
|
\c :datname - - :master_port
|
||||||
|
|
||||||
|
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
|
||||||
|
DROP DATABASE db_to_drop;
|
||||||
|
|
||||||
|
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
DROP TABLE ref_table;
|
DROP TABLE ref_table;
|
||||||
TRUNCATE pg_dist_colocation;
|
TRUNCATE pg_dist_colocation;
|
||||||
|
|
|
@ -23,7 +23,7 @@ WITH dist_node_summary AS (
|
||||||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||||
false)
|
false)
|
||||||
), dist_placement_summary AS (
|
), dist_placement_summary AS (
|
||||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||||
), dist_placement_check AS (
|
), dist_placement_check AS (
|
||||||
SELECT count(distinct result) = 1 AS matches
|
SELECT count(distinct result) = 1 AS matches
|
||||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||||
|
|
Loading…
Reference in New Issue