Merge pull request #4419 from citusdata/metadata_sync

Do metadata sync in a separate background worker.
pull/4409/head^2
Hadi Moshayedi 2020-12-24 09:15:16 -08:00 committed by GitHub
commit 52164450eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1071 additions and 38 deletions

View File

@ -1138,12 +1138,16 @@ TriggerSyncMetadataToPrimaryNodes(void)
triggerMetadataSync = true; triggerMetadataSync = true;
} }
else if (!workerNode->metadataSynced)
{
triggerMetadataSync = true;
}
} }
/* let the maintanince deamon know about the metadata sync */ /* let the maintanince deamon know about the metadata sync */
if (triggerMetadataSync) if (triggerMetadataSync)
{ {
TriggerMetadataSync(MyDatabaseId); TriggerMetadataSyncOnCommit();
} }
} }

View File

@ -14,6 +14,7 @@
#include "postgres.h" #include "postgres.h"
#include "miscadmin.h" #include "miscadmin.h"
#include <signal.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <unistd.h> #include <unistd.h>
@ -28,6 +29,7 @@
#include "catalog/pg_foreign_server.h" #include "catalog/pg_foreign_server.h"
#include "catalog/pg_namespace.h" #include "catalog/pg_namespace.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/async.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/deparser.h" #include "distributed/deparser.h"
@ -35,6 +37,7 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_utility.h" #include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
@ -48,11 +51,15 @@
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "pgstat.h"
#include "postmaster/bgworker.h"
#include "postmaster/postmaster.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h" #include "utils/syscache.h"
@ -76,11 +83,18 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
char *permission, char *permission,
bool withGrantOption); bool withGrantOption);
static char * GenerateSetRoleQuery(Oid roleOid); static char * GenerateSetRoleQuery(Oid roleOid);
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
static bool got_SIGTERM = false;
static bool got_SIGALRM = false;
#define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon"
/* /*
* start_metadata_sync_to_node function sets hasmetadata column of the given * start_metadata_sync_to_node function sets hasmetadata column of the given
@ -1497,7 +1511,7 @@ DetachPartitionCommandList(void)
* metadata workers that are out of sync. Returns the result of * metadata workers that are out of sync. Returns the result of
* synchronization. * synchronization.
*/ */
MetadataSyncResult static MetadataSyncResult
SyncMetadataToNodes(void) SyncMetadataToNodes(void)
{ {
MetadataSyncResult result = METADATA_SYNC_SUCCESS; MetadataSyncResult result = METADATA_SYNC_SUCCESS;
@ -1527,6 +1541,9 @@ SyncMetadataToNodes(void)
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
{ {
ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
workerNode->workerName,
workerNode->workerPort)));
result = METADATA_SYNC_FAILED_SYNC; result = METADATA_SYNC_FAILED_SYNC;
} }
else else
@ -1539,3 +1556,244 @@ SyncMetadataToNodes(void)
return result; return result;
} }
/*
* SyncMetadataToNodesMain is the main function for syncing metadata to
* MX nodes. It retries until success and then exits.
*/
void
SyncMetadataToNodesMain(Datum main_arg)
{
Oid databaseOid = DatumGetObjectId(main_arg);
/* extension owner is passed via bgw_extra */
Oid extensionOwner = InvalidOid;
memcpy_s(&extensionOwner, sizeof(extensionOwner),
MyBgworkerEntry->bgw_extra, sizeof(Oid));
pqsignal(SIGTERM, MetadataSyncSigTermHandler);
pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
BackgroundWorkerUnblockSignals();
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
/* make worker recognizable in pg_stat_activity */
pgstat_report_appname(METADATA_SYNC_APP_NAME);
bool syncedAllNodes = false;
while (!syncedAllNodes)
{
InvalidateMetadataSystemCache();
StartTransactionCommand();
/*
* Some functions in ruleutils.c, which we use to get the DDL for
* metadata propagation, require an active snapshot.
*/
PushActiveSnapshot(GetTransactionSnapshot());
if (!LockCitusExtension())
{
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
"skipping metadata sync")));
}
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{
UseCoordinatedTransaction();
MetadataSyncResult result = SyncMetadataToNodes();
syncedAllNodes = (result == METADATA_SYNC_SUCCESS);
/* we use LISTEN/NOTIFY to wait for metadata syncing in tests */
if (result != METADATA_SYNC_FAILED_LOCK)
{
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
}
}
PopActiveSnapshot();
CommitTransactionCommand();
ProcessCompletedNotifies();
if (syncedAllNodes)
{
break;
}
/*
* If backend is cancelled (e.g. bacause of distributed deadlock),
* CHECK_FOR_INTERRUPTS() will raise a cancellation error which will
* result in exit(1).
*/
CHECK_FOR_INTERRUPTS();
/*
* SIGTERM is used for when maintenance daemon tries to clean-up
* metadata sync daemons spawned by terminated maintenance daemons.
*/
if (got_SIGTERM)
{
exit(0);
}
/*
* SIGALRM is used for testing purposes and it simulates an error in metadata
* sync daemon.
*/
if (got_SIGALRM)
{
elog(ERROR, "Error in metadata sync daemon");
}
pg_usleep(MetadataSyncRetryInterval * 1000);
}
}
/*
* MetadataSyncSigTermHandler set a flag to request termination of metadata
* sync daemon.
*/
static void
MetadataSyncSigTermHandler(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGTERM = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}
errno = save_errno;
}
/*
* MetadataSyncSigAlrmHandler set a flag to request error at metadata
* sync daemon. This is used for testing purposes.
*/
static void
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
{
int save_errno = errno;
got_SIGALRM = true;
if (MyProc != NULL)
{
SetLatch(&MyProc->procLatch);
}
errno = save_errno;
}
/*
* SpawnSyncMetadataToNodes starts a background worker which runs metadata
* sync. On success it returns workers' handle. Otherwise it returns NULL.
*/
BackgroundWorkerHandle *
SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner)
{
BackgroundWorker worker;
BackgroundWorkerHandle *handle = NULL;
/* Configure a worker. */
memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
"Citus Metadata Sync: %u/%u",
database, extensionOwner);
worker.bgw_flags =
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* don't restart, we manage restarts from maintenance daemon */
worker.bgw_restart_time = BGW_NEVER_RESTART;
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"SyncMetadataToNodesMain");
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
sizeof(Oid));
worker.bgw_notify_pid = MyProcPid;
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
{
return NULL;
}
pid_t pid;
WaitForBackgroundWorkerStartup(handle, &pid);
return handle;
}
/*
* SignalMetadataSyncDaemon signals metadata sync daemons belonging to
* the given database.
*/
void
SignalMetadataSyncDaemon(Oid database, int sig)
{
int backendCount = pgstat_fetch_stat_numbackends();
for (int backend = 1; backend <= backendCount; backend++)
{
LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend);
if (!localBeEntry)
{
continue;
}
PgBackendStatus *beStatus = &localBeEntry->backendStatus;
if (beStatus->st_databaseid == database &&
strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0)
{
kill(beStatus->st_procpid, sig);
}
}
}
/*
* ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated.
* It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the
* check.
*/
bool
ShouldInitiateMetadataSync(bool *lockFailure)
{
if (!IsCoordinator())
{
*lockFailure = false;
return false;
}
Oid distNodeOid = DistNodeRelationId();
if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock))
{
*lockFailure = true;
return false;
}
bool shouldSyncMetadata = false;
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
shouldSyncMetadata = true;
break;
}
}
UnlockRelationOid(distNodeOid, AccessShareLock);
*lockFailure = false;
return shouldSyncMetadata;
}

View File

@ -444,7 +444,7 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
{ {
MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort, MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort,
true); true);
TriggerMetadataSync(MyDatabaseId); TriggerMetadataSyncOnCommit();
} }
} }
} }
@ -810,7 +810,7 @@ master_update_node(PG_FUNCTION_ARGS)
*/ */
if (UnsetMetadataSyncedForAll()) if (UnsetMetadataSyncedForAll())
{ {
TriggerMetadataSync(MyDatabaseId); TriggerMetadataSyncOnCommit();
} }
if (handle != NULL) if (handle != NULL)

View File

@ -16,6 +16,7 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "postmaster/postmaster.h" #include "postmaster/postmaster.h"
@ -28,6 +29,8 @@
/* declarations for dynamic loading */ /* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_metadata_snapshot); PG_FUNCTION_INFO_V1(master_metadata_snapshot);
PG_FUNCTION_INFO_V1(wait_until_metadata_sync); PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
PG_FUNCTION_INFO_V1(trigger_metadata_sync);
PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync);
/* /*
@ -124,3 +127,26 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
/*
* trigger_metadata_sync triggers metadata sync for testing.
*/
Datum
trigger_metadata_sync(PG_FUNCTION_ARGS)
{
TriggerMetadataSyncOnCommit();
PG_RETURN_VOID();
}
/*
* raise_error_in_metadata_sync causes metadata sync to raise an error.
*/
Datum
raise_error_in_metadata_sync(PG_FUNCTION_ARGS)
{
/* metadata sync uses SIGALRM to test errors */
SignalMetadataSyncDaemon(MyDatabaseId, SIGALRM);
PG_RETURN_VOID();
}

View File

@ -28,6 +28,7 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
#include "distributed/locally_reserved_shared_connections.h" #include "distributed/locally_reserved_shared_connections.h"
#include "distributed/maintenanced.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_explain.h" #include "distributed/multi_explain.h"
#include "distributed/repartition_join_execution.h" #include "distributed/repartition_join_execution.h"
@ -102,6 +103,9 @@ bool CoordinatedTransactionUses2PC = false;
/* if disabled, distributed statements in a function may run as separate transactions */ /* if disabled, distributed statements in a function may run as separate transactions */
bool FunctionOpensTransactionBlock = true; bool FunctionOpensTransactionBlock = true;
/* if true, we should trigger metadata sync on commit */
bool MetadataSyncOnCommit = false;
/* transaction management functions */ /* transaction management functions */
static void CoordinatedTransactionCallback(XactEvent event, void *arg); static void CoordinatedTransactionCallback(XactEvent event, void *arg);
@ -262,6 +266,15 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
AfterXactConnectionHandling(true); AfterXactConnectionHandling(true);
} }
/*
* Changes to catalog tables are now visible to the metadata sync
* daemon, so we can trigger metadata sync if necessary.
*/
if (MetadataSyncOnCommit)
{
TriggerMetadataSync(MyDatabaseId);
}
ResetGlobalVariables(); ResetGlobalVariables();
/* /*
@ -474,6 +487,7 @@ ResetGlobalVariables()
activeSetStmts = NULL; activeSetStmts = NULL;
CoordinatedTransactionUses2PC = false; CoordinatedTransactionUses2PC = false;
TransactionModifiedNodeMetadata = false; TransactionModifiedNodeMetadata = false;
MetadataSyncOnCommit = false;
ResetWorkerErrorIndication(); ResetWorkerErrorIndication();
} }
@ -728,3 +742,15 @@ MaybeExecutingUDF(void)
{ {
return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0); return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
} }
/*
* TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit.
* This is because new metadata only becomes visible to the metadata sync
* daemon after commit happens.
*/
void
TriggerMetadataSyncOnCommit(void)
{
MetadataSyncOnCommit = true;
}

View File

@ -118,7 +118,6 @@ static size_t MaintenanceDaemonShmemSize(void);
static void MaintenanceDaemonShmemInit(void); static void MaintenanceDaemonShmemInit(void);
static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg); static void MaintenanceDaemonErrorContext(void *arg);
static bool LockCitusExtension(void);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void); static void WarnMaintenanceDaemonNotStarted(void);
@ -291,6 +290,13 @@ CitusMaintenanceDaemonMain(Datum main_arg)
TimestampTz lastRecoveryTime = 0; TimestampTz lastRecoveryTime = 0;
TimestampTz nextMetadataSyncTime = 0; TimestampTz nextMetadataSyncTime = 0;
/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
/* /*
* Look up this worker's configuration. * Look up this worker's configuration.
*/ */
@ -371,6 +377,12 @@ CitusMaintenanceDaemonMain(Datum main_arg)
/* make worker recognizable in pg_stat_activity */ /* make worker recognizable in pg_stat_activity */
pgstat_report_appname("Citus Maintenance Daemon"); pgstat_report_appname("Citus Maintenance Daemon");
/*
* Terminate orphaned metadata sync daemons spawned from previously terminated
* or crashed maintenanced instances.
*/
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
/* enter main loop */ /* enter main loop */
while (!got_SIGTERM) while (!got_SIGTERM)
{ {
@ -450,21 +462,42 @@ CitusMaintenanceDaemonMain(Datum main_arg)
} }
#endif #endif
if (!RecoveryInProgress() && pid_t metadataSyncBgwPid = 0;
BgwHandleStatus metadataSyncStatus =
metadataSyncBgwHandle != NULL ?
GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) :
BGWH_STOPPED;
if (metadataSyncStatus != BGWH_STOPPED &&
GetCurrentTimestamp() >= nextMetadataSyncTime)
{
/*
* Metadata sync is still running, recheck in a short while.
*/
int nextTimeout = MetadataSyncRetryInterval;
nextMetadataSyncTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
timeout = Min(timeout, nextTimeout);
}
else if (!RecoveryInProgress() &&
metadataSyncStatus == BGWH_STOPPED &&
(MetadataSyncTriggeredCheckAndReset(myDbData) || (MetadataSyncTriggeredCheckAndReset(myDbData) ||
GetCurrentTimestamp() >= nextMetadataSyncTime)) GetCurrentTimestamp() >= nextMetadataSyncTime))
{ {
bool metadataSyncFailed = false; if (metadataSyncBgwHandle)
{
TerminateBackgroundWorker(metadataSyncBgwHandle);
pfree(metadataSyncBgwHandle);
metadataSyncBgwHandle = NULL;
}
InvalidateMetadataSystemCache(); InvalidateMetadataSystemCache();
StartTransactionCommand(); StartTransactionCommand();
/*
* Some functions in ruleutils.c, which we use to get the DDL for
* metadata propagation, require an active snapshot.
*/
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
int nextTimeout = MetadataSyncRetryInterval;
bool syncMetadata = false;
if (!LockCitusExtension()) if (!LockCitusExtension())
{ {
ereport(DEBUG1, (errmsg("could not lock the citus extension, " ereport(DEBUG1, (errmsg("could not lock the citus extension, "
@ -472,25 +505,28 @@ CitusMaintenanceDaemonMain(Datum main_arg)
} }
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
{ {
MetadataSyncResult result = SyncMetadataToNodes(); bool lockFailure = false;
metadataSyncFailed = (result != METADATA_SYNC_SUCCESS); syncMetadata = ShouldInitiateMetadataSync(&lockFailure);
/* /*
* Notification means we had an attempt on synchronization * If lock fails, we need to recheck in a short while. If we are
* without being blocked for pg_dist_node access. * going to sync metadata, we should recheck in a short while to
* see if it failed. Otherwise, we can wait longer.
*/ */
if (result != METADATA_SYNC_FAILED_LOCK) nextTimeout = (lockFailure || syncMetadata) ?
{ MetadataSyncRetryInterval :
Async_Notify(METADATA_SYNC_CHANNEL, NULL); MetadataSyncInterval;
}
} }
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
ProcessCompletedNotifies();
int64 nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval : if (syncMetadata)
MetadataSyncInterval; {
metadataSyncBgwHandle =
SpawnSyncMetadataToNodes(MyDatabaseId, myDbData->userOid);
}
nextMetadataSyncTime = nextMetadataSyncTime =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
timeout = Min(timeout, nextTimeout); timeout = Min(timeout, nextTimeout);
@ -626,6 +662,11 @@ CitusMaintenanceDaemonMain(Datum main_arg)
ProcessConfigFile(PGC_SIGHUP); ProcessConfigFile(PGC_SIGHUP);
} }
} }
if (metadataSyncBgwHandle)
{
TerminateBackgroundWorker(metadataSyncBgwHandle);
}
} }
@ -786,7 +827,7 @@ MaintenanceDaemonErrorContext(void *arg)
* LockCitusExtension acquires a lock on the Citus extension or returns * LockCitusExtension acquires a lock on the Citus extension or returns
* false if the extension does not exist or is being dropped. * false if the extension does not exist or is being dropped.
*/ */
static bool bool
LockCitusExtension(void) LockCitusExtension(void)
{ {
Oid extensionOid = get_extension_oid("citus", true); Oid extensionOid = get_extension_oid("citus", true);

View File

@ -25,6 +25,7 @@ extern void StopMaintenanceDaemon(Oid databaseId);
extern void TriggerMetadataSync(Oid databaseId); extern void TriggerMetadataSync(Oid databaseId);
extern void InitializeMaintenanceDaemon(void); extern void InitializeMaintenanceDaemon(void);
extern void InitializeMaintenanceDaemonBackend(void); extern void InitializeMaintenanceDaemonBackend(void);
extern bool LockCitusExtension(void);
extern void CitusMaintenanceDaemonMain(Datum main_arg); extern void CitusMaintenanceDaemonMain(Datum main_arg);

View File

@ -50,11 +50,14 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
extern void CreateTableMetadataOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId);
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced); extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
extern MetadataSyncResult SyncMetadataToNodes(void); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
nodePort, nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SyncMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE" #define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \

View File

@ -121,6 +121,7 @@ extern void InitializeTransactionManagement(void);
/* other functions */ /* other functions */
extern List * ActiveSubXactContexts(void); extern List * ActiveSubXactContexts(void);
extern StringInfo BeginAndSetDistributedTransactionIdCommand(void); extern StringInfo BeginAndSetDistributedTransactionIdCommand(void);
extern void TriggerMetadataSyncOnCommit(void);
#endif /* TRANSACTION_MANAGMENT_H */ #endif /* TRANSACTION_MANAGMENT_H */

View File

@ -28,11 +28,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
390 389 f 395 394 f
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
389 394
390 389 395 394
step s1-abort: step s1-abort:
ABORT; ABORT;
@ -75,14 +75,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
394 393 f 399 398 f
395 393 f 400 398 f
395 394 t 400 399 t
transactionnumberwaitingtransactionnumbers transactionnumberwaitingtransactionnumbers
393 398
394 393 399 398
395 393,394 400 398,399
step s1-abort: step s1-abort:
ABORT; ABORT;

View File

@ -0,0 +1,204 @@
Parsed test spec with 3 sessions
starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection
create_distributed_table
step enable-deadlock-detection:
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
step reload-conf:
SELECT pg_reload_conf();
pg_reload_conf
t
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s1-begin:
BEGIN;
step s1-update-1:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-update-2-on-worker:
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
run_commands_on_session_level_connection_to_node
step s2-truncate-on-worker:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
run_commands_on_session_level_connection_to_node
step s3-invalidate-metadata:
update pg_dist_node SET metadatasynced = false;
step s3-resync:
SELECT trigger_metadata_sync();
trigger_metadata_sync
step s3-wait:
SELECT pg_sleep(2);
pg_sleep
step s2-update-1-on-worker:
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
<waiting ...>
step s1-update-2:
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
<waiting ...>
step s1-update-2: <... completed>
step s2-update-1-on-worker: <... completed>
run_commands_on_session_level_connection_to_node
error in steps s1-update-2 s2-update-1-on-worker: ERROR: canceling the transaction since it was involved in a distributed deadlock
step s1-commit:
COMMIT;
step s2-commit-on-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step disable-deadlock-detection:
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
step reload-conf:
SELECT pg_reload_conf();
pg_reload_conf
t
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync s3-wait
create_distributed_table
step increase-retry-interval:
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
step reload-conf:
SELECT pg_reload_conf();
pg_reload_conf
t
step s2-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57638);
start_session_level_connection_to_node
step s2-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s2-truncate-on-worker:
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
run_commands_on_session_level_connection_to_node
step s3-invalidate-metadata:
update pg_dist_node SET metadatasynced = false;
step s3-resync:
SELECT trigger_metadata_sync();
trigger_metadata_sync
step s3-wait:
SELECT pg_sleep(2);
pg_sleep
step s1-count-daemons:
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
count
1
step s1-cancel-metadata-sync:
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
SELECT pg_sleep(2);
pg_cancel_backend
t
pg_sleep
step s1-count-daemons:
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
count
0
step reset-retry-interval:
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
step reload-conf:
SELECT pg_reload_conf();
pg_reload_conf
t
step s2-commit-on-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s2-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s3-resync:
SELECT trigger_metadata_sync();
trigger_metadata_sync
step s3-wait:
SELECT pg_sleep(2);
pg_sleep
restore_isolation_tester_func

View File

@ -21,6 +21,27 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
master_run_on_worker(ARRAY[hostname], ARRAY[port], master_run_on_worker(ARRAY[hostname], ARRAY[port],
ARRAY['SELECT pg_reload_conf()'], false); ARRAY['SELECT pg_reload_conf()'], false);
$$; $$;
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
declare
counter integer := -1;
begin
while counter != target_count loop
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
-- Look it up in a new connection to get latest updates.
SELECT result::int into counter FROM
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
PERFORM pg_sleep(0.1);
end loop;
end$$ LANGUAGE plpgsql;
-- add a node to the cluster -- add a node to the cluster
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
@ -152,6 +173,142 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
2 | t | f 2 | t | f
(1 row) (1 row)
-- verify that metadata sync daemon has started
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
count
---------------------------------------------------------------------
1
(1 row)
--
-- terminate maintenance daemon, and verify that we don't spawn multiple
-- metadata sync daemons
--
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
pg_terminate_backend
---------------------------------------------------------------------
t
(1 row)
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
select trigger_metadata_sync();
trigger_metadata_sync
---------------------------------------------------------------------
(1 row)
select wait_until_metadata_sync();
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
count
---------------------------------------------------------------------
1
(1 row)
--
-- cancel metadata sync daemon, and verify that it exits and restarts.
--
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
select wait_until_metadata_sync();
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
metadata_sync_restarted
---------------------------------------------------------------------
t
(1 row)
--
-- cancel metadata sync daemon so it exits and restarts, but at the
-- same time tell maintenanced to trigger a new metadata sync. One
-- of these should exit to avoid multiple metadata syncs.
--
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
select trigger_metadata_sync();
trigger_metadata_sync
---------------------------------------------------------------------
(1 row)
select wait_until_metadata_sync();
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
select pg_sleep(1.2);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
count
---------------------------------------------------------------------
1
(1 row)
--
-- error in metadata sync daemon, and verify it exits and restarts.
--
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
select raise_error_in_metadata_sync();
raise_error_in_metadata_sync
---------------------------------------------------------------------
(1 row)
select wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
metadata_sync_restarted
---------------------------------------------------------------------
t
(1 row)
SELECT trigger_metadata_sync();
trigger_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
count
---------------------------------------------------------------------
1
(1 row)
-- update it back to :worker_1_port, now metadata should be synced -- update it back to :worker_1_port, now metadata should be synced
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column? ?column?
@ -594,6 +751,59 @@ SELECT verify_metadata('localhost', :worker_1_port);
t t
(1 row) (1 row)
-- verify that metadata sync daemon exits
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
-- verify that DROP DATABASE terminates metadata sync
SELECT current_database() datname \gset
CREATE DATABASE db_to_drop;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE DATABASE")
(localhost,57638,t,"CREATE DATABASE")
(2 rows)
\c db_to_drop - - :worker_1_port
CREATE EXTENSION citus;
\c db_to_drop - - :master_port
CREATE EXTENSION citus;
SELECT master_add_node('localhost', :worker_1_port);
master_add_node
---------------------------------------------------------------------
1
(1 row)
UPDATE pg_dist_node SET hasmetadata = true;
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
master_update_node
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
SELECT trigger_metadata_sync();
trigger_metadata_sync
---------------------------------------------------------------------
(1 row)
\c :datname - - :master_port
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
datname
---------------------------------------------------------------------
db_to_drop
(1 row)
DROP DATABASE db_to_drop;
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
datname
---------------------------------------------------------------------
(0 rows)
-- cleanup -- cleanup
DROP TABLE ref_table; DROP TABLE ref_table;
TRUNCATE pg_dist_colocation; TRUNCATE pg_dist_colocation;

View File

@ -26,7 +26,7 @@ WITH dist_node_summary AS (
ARRAY[dist_node_summary.query, dist_node_summary.query], ARRAY[dist_node_summary.query, dist_node_summary.query],
false) false)
), dist_placement_summary AS ( ), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
), dist_placement_check AS ( ), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL FROM dist_placement_summary CROSS JOIN LATERAL

View File

@ -80,3 +80,4 @@ test: isolation_insert_select_vs_all_on_mx
test: isolation_ref_select_for_update_vs_all_on_mx test: isolation_ref_select_for_update_vs_all_on_mx
test: isolation_ref_update_delete_upsert_vs_all_on_mx test: isolation_ref_update_delete_upsert_vs_all_on_mx
test: isolation_dis2ref_foreign_keys_on_mx test: isolation_dis2ref_foreign_keys_on_mx
test: isolation_metadata_sync_deadlock

View File

@ -0,0 +1,153 @@
#include "isolation_mx_common.include.spec"
setup
{
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int);
INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i;
SELECT create_distributed_table('deadlock_detection_test', 'user_id');
CREATE TABLE t2(a int);
SELECT create_distributed_table('t2', 'a');
}
teardown
{
DROP FUNCTION trigger_metadata_sync();
DROP TABLE deadlock_detection_test;
DROP TABLE t2;
SET citus.shard_replication_factor = 1;
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"
step "increase-retry-interval"
{
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
}
step "reset-retry-interval"
{
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
}
step "enable-deadlock-detection"
{
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
}
step "disable-deadlock-detection"
{
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
}
step "reload-conf"
{
SELECT pg_reload_conf();
}
step "s1-begin"
{
BEGIN;
}
step "s1-update-1"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
}
step "s1-update-2"
{
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
}
step "s1-commit"
{
COMMIT;
}
step "s1-count-daemons"
{
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
}
step "s1-cancel-metadata-sync"
{
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
SELECT pg_sleep(2);
}
session "s2"
step "s2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
}
step "s2-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s2-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s2-update-1-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
}
step "s2-update-2-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
}
step "s2-truncate-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
}
step "s2-commit-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
session "s3"
step "s3-invalidate-metadata"
{
update pg_dist_node SET metadatasynced = false;
}
step "s3-resync"
{
SELECT trigger_metadata_sync();
}
step "s3-wait"
{
SELECT pg_sleep(2);
}
// Backends can block metadata sync. The following test verifies that if this happens,
// we still do distributed deadlock detection. In the following, s2-truncate-on-worker
// causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are
// themselves involved in a distributed deadlock.
// See https://github.com/citusdata/citus/issues/4393 for more details.
permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection"
// Test that when metadata sync is waiting for locks, cancelling it terminates it.
// This is important in cases where the metadata sync daemon itself is involved in a deadlock.
permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" "s3-wait"

View File

@ -27,6 +27,30 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
ARRAY['SELECT pg_reload_conf()'], false); ARRAY['SELECT pg_reload_conf()'], false);
$$; $$;
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
declare
counter integer := -1;
begin
while counter != target_count loop
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
-- Look it up in a new connection to get latest updates.
SELECT result::int into counter FROM
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
PERFORM pg_sleep(0.1);
end loop;
end$$ LANGUAGE plpgsql;
-- add a node to the cluster -- add a node to the cluster
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
@ -79,6 +103,54 @@ END;
SELECT wait_until_metadata_sync(30000); SELECT wait_until_metadata_sync(30000);
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
-- verify that metadata sync daemon has started
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
--
-- terminate maintenance daemon, and verify that we don't spawn multiple
-- metadata sync daemons
--
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
select trigger_metadata_sync();
select wait_until_metadata_sync();
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
--
-- cancel metadata sync daemon, and verify that it exits and restarts.
--
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
select wait_until_metadata_sync();
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
--
-- cancel metadata sync daemon so it exits and restarts, but at the
-- same time tell maintenanced to trigger a new metadata sync. One
-- of these should exit to avoid multiple metadata syncs.
--
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
select trigger_metadata_sync();
select wait_until_metadata_sync();
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
select pg_sleep(1.2);
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
--
-- error in metadata sync daemon, and verify it exits and restarts.
--
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
select raise_error_in_metadata_sync();
select wait_until_metadata_sync(30000);
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
SELECT trigger_metadata_sync();
SELECT wait_until_metadata_sync(30000);
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
-- update it back to :worker_1_port, now metadata should be synced -- update it back to :worker_1_port, now metadata should be synced
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT wait_until_metadata_sync(30000); SELECT wait_until_metadata_sync(30000);
@ -249,6 +321,39 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
SELECT verify_metadata('localhost', :worker_1_port); SELECT verify_metadata('localhost', :worker_1_port);
-- verify that metadata sync daemon exits
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
-- verify that DROP DATABASE terminates metadata sync
SELECT current_database() datname \gset
CREATE DATABASE db_to_drop;
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
\c db_to_drop - - :worker_1_port
CREATE EXTENSION citus;
\c db_to_drop - - :master_port
CREATE EXTENSION citus;
SELECT master_add_node('localhost', :worker_1_port);
UPDATE pg_dist_node SET hasmetadata = true;
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
SELECT trigger_metadata_sync();
\c :datname - - :master_port
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
DROP DATABASE db_to_drop;
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
-- cleanup -- cleanup
DROP TABLE ref_table; DROP TABLE ref_table;
TRUNCATE pg_dist_colocation; TRUNCATE pg_dist_colocation;

View File

@ -23,7 +23,7 @@ WITH dist_node_summary AS (
ARRAY[dist_node_summary.query, dist_node_summary.query], ARRAY[dist_node_summary.query, dist_node_summary.query],
false) false)
), dist_placement_summary AS ( ), dist_placement_summary AS (
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
), dist_placement_check AS ( ), dist_placement_check AS (
SELECT count(distinct result) = 1 AS matches SELECT count(distinct result) = 1 AS matches
FROM dist_placement_summary CROSS JOIN LATERAL FROM dist_placement_summary CROSS JOIN LATERAL