diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index afe993031..5ab19f88c 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1138,12 +1138,16 @@ TriggerSyncMetadataToPrimaryNodes(void) triggerMetadataSync = true; } + else if (!workerNode->metadataSynced) + { + triggerMetadataSync = true; + } } /* let the maintanince deamon know about the metadata sync */ if (triggerMetadataSync) { - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 93a222409..c80e2d8bc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -14,6 +14,7 @@ #include "postgres.h" #include "miscadmin.h" +#include #include #include @@ -28,6 +29,7 @@ #include "catalog/pg_foreign_server.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" +#include "commands/async.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/deparser.h" @@ -35,6 +37,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" @@ -48,11 +51,15 @@ #include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/postmaster.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" @@ -76,11 +83,18 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid, char *permission, bool withGrantOption); static char * GenerateSetRoleQuery(Oid roleOid); +static void MetadataSyncSigTermHandler(SIGNAL_ARGS); +static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); +static bool got_SIGTERM = false; +static bool got_SIGALRM = false; + +#define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon" + /* * start_metadata_sync_to_node function sets hasmetadata column of the given @@ -1497,7 +1511,7 @@ DetachPartitionCommandList(void) * metadata workers that are out of sync. Returns the result of * synchronization. */ -MetadataSyncResult +static MetadataSyncResult SyncMetadataToNodes(void) { MetadataSyncResult result = METADATA_SYNC_SUCCESS; @@ -1527,6 +1541,9 @@ SyncMetadataToNodes(void) if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) { + ereport(WARNING, (errmsg("failed to sync metadata to %s:%d", + workerNode->workerName, + workerNode->workerPort))); result = METADATA_SYNC_FAILED_SYNC; } else @@ -1539,3 +1556,244 @@ SyncMetadataToNodes(void) return result; } + + +/* + * SyncMetadataToNodesMain is the main function for syncing metadata to + * MX nodes. It retries until success and then exits. + */ +void +SyncMetadataToNodesMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + + /* extension owner is passed via bgw_extra */ + Oid extensionOwner = InvalidOid; + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + pqsignal(SIGTERM, MetadataSyncSigTermHandler); + pqsignal(SIGALRM, MetadataSyncSigAlrmHandler); + BackgroundWorkerUnblockSignals(); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname(METADATA_SYNC_APP_NAME); + + bool syncedAllNodes = false; + + while (!syncedAllNodes) + { + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + + /* + * Some functions in ruleutils.c, which we use to get the DDL for + * metadata propagation, require an active snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping metadata sync"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + UseCoordinatedTransaction(); + MetadataSyncResult result = SyncMetadataToNodes(); + + syncedAllNodes = (result == METADATA_SYNC_SUCCESS); + + /* we use LISTEN/NOTIFY to wait for metadata syncing in tests */ + if (result != METADATA_SYNC_FAILED_LOCK) + { + Async_Notify(METADATA_SYNC_CHANNEL, NULL); + } + } + + PopActiveSnapshot(); + CommitTransactionCommand(); + ProcessCompletedNotifies(); + + if (syncedAllNodes) + { + break; + } + + /* + * If backend is cancelled (e.g. bacause of distributed deadlock), + * CHECK_FOR_INTERRUPTS() will raise a cancellation error which will + * result in exit(1). + */ + CHECK_FOR_INTERRUPTS(); + + /* + * SIGTERM is used for when maintenance daemon tries to clean-up + * metadata sync daemons spawned by terminated maintenance daemons. + */ + if (got_SIGTERM) + { + exit(0); + } + + /* + * SIGALRM is used for testing purposes and it simulates an error in metadata + * sync daemon. + */ + if (got_SIGALRM) + { + elog(ERROR, "Error in metadata sync daemon"); + } + + pg_usleep(MetadataSyncRetryInterval * 1000); + } +} + + +/* + * MetadataSyncSigTermHandler set a flag to request termination of metadata + * sync daemon. + */ +static void +MetadataSyncSigTermHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGTERM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * MetadataSyncSigAlrmHandler set a flag to request error at metadata + * sync daemon. This is used for testing purposes. + */ +static void +MetadataSyncSigAlrmHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGALRM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * SpawnSyncMetadataToNodes starts a background worker which runs metadata + * sync. On success it returns workers' handle. Otherwise it returns NULL. + */ +BackgroundWorkerHandle * +SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Metadata Sync: %u/%u", + database, extensionOwner); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "SyncMetadataToNodesMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +/* + * SignalMetadataSyncDaemon signals metadata sync daemons belonging to + * the given database. + */ +void +SignalMetadataSyncDaemon(Oid database, int sig) +{ + int backendCount = pgstat_fetch_stat_numbackends(); + for (int backend = 1; backend <= backendCount; backend++) + { + LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend); + if (!localBeEntry) + { + continue; + } + + PgBackendStatus *beStatus = &localBeEntry->backendStatus; + if (beStatus->st_databaseid == database && + strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0) + { + kill(beStatus->st_procpid, sig); + } + } +} + + +/* + * ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated. + * It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the + * check. + */ +bool +ShouldInitiateMetadataSync(bool *lockFailure) +{ + if (!IsCoordinator()) + { + *lockFailure = false; + return false; + } + + Oid distNodeOid = DistNodeRelationId(); + if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock)) + { + *lockFailure = true; + return false; + } + + bool shouldSyncMetadata = false; + + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerList) + { + if (workerNode->hasMetadata && !workerNode->metadataSynced) + { + shouldSyncMetadata = true; + break; + } + } + + UnlockRelationOid(distNodeOid, AccessShareLock); + + *lockFailure = false; + return shouldSyncMetadata; +} diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index e6e8e7a9d..400fff505 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -580,7 +580,7 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) { MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort, true); - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } } } @@ -956,7 +956,7 @@ citus_update_node(PG_FUNCTION_ARGS) */ if (UnsetMetadataSyncedForAll()) { - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } if (handle != NULL) diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 2a1b6c290..47e12ce7a 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -16,6 +16,7 @@ #include "catalog/pg_type.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_sync.h" #include "distributed/remote_commands.h" #include "postmaster/postmaster.h" @@ -28,6 +29,8 @@ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_metadata_snapshot); PG_FUNCTION_INFO_V1(wait_until_metadata_sync); +PG_FUNCTION_INFO_V1(trigger_metadata_sync); +PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync); /* @@ -124,3 +127,26 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * trigger_metadata_sync triggers metadata sync for testing. + */ +Datum +trigger_metadata_sync(PG_FUNCTION_ARGS) +{ + TriggerMetadataSyncOnCommit(); + PG_RETURN_VOID(); +} + + +/* + * raise_error_in_metadata_sync causes metadata sync to raise an error. + */ +Datum +raise_error_in_metadata_sync(PG_FUNCTION_ARGS) +{ + /* metadata sync uses SIGALRM to test errors */ + SignalMetadataSyncDaemon(MyDatabaseId, SIGALRM); + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1999c836c..96a4180a4 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -28,6 +28,7 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/locally_reserved_shared_connections.h" +#include "distributed/maintenanced.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" @@ -102,6 +103,9 @@ bool CoordinatedTransactionUses2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; +/* if true, we should trigger metadata sync on commit */ +bool MetadataSyncOnCommit = false; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -262,6 +266,15 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) AfterXactConnectionHandling(true); } + /* + * Changes to catalog tables are now visible to the metadata sync + * daemon, so we can trigger metadata sync if necessary. + */ + if (MetadataSyncOnCommit) + { + TriggerMetadataSync(MyDatabaseId); + } + ResetGlobalVariables(); /* @@ -474,6 +487,7 @@ ResetGlobalVariables() activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; TransactionModifiedNodeMetadata = false; + MetadataSyncOnCommit = false; ResetWorkerErrorIndication(); } @@ -728,3 +742,15 @@ MaybeExecutingUDF(void) { return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0); } + + +/* + * TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit. + * This is because new metadata only becomes visible to the metadata sync + * daemon after commit happens. + */ +void +TriggerMetadataSyncOnCommit(void) +{ + MetadataSyncOnCommit = true; +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9b329de12..5312663e2 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -120,7 +120,6 @@ static size_t MaintenanceDaemonShmemSize(void); static void MaintenanceDaemonShmemInit(void); static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); -static bool LockCitusExtension(void); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); @@ -294,6 +293,13 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz lastShardCleanTime = 0; TimestampTz nextMetadataSyncTime = 0; + + /* + * We do metadata sync in a separate background worker. We need its + * handle to be able to check its status. + */ + BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + /* * Look up this worker's configuration. */ @@ -374,6 +380,12 @@ CitusMaintenanceDaemonMain(Datum main_arg) /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); + /* + * Terminate orphaned metadata sync daemons spawned from previously terminated + * or crashed maintenanced instances. + */ + SignalMetadataSyncDaemon(databaseOid, SIGTERM); + /* enter main loop */ while (!got_SIGTERM) { @@ -453,21 +465,42 @@ CitusMaintenanceDaemonMain(Datum main_arg) } #endif - if (!RecoveryInProgress() && - (MetadataSyncTriggeredCheckAndReset(myDbData) || - GetCurrentTimestamp() >= nextMetadataSyncTime)) + pid_t metadataSyncBgwPid = 0; + BgwHandleStatus metadataSyncStatus = + metadataSyncBgwHandle != NULL ? + GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) : + BGWH_STOPPED; + + if (metadataSyncStatus != BGWH_STOPPED && + GetCurrentTimestamp() >= nextMetadataSyncTime) { - bool metadataSyncFailed = false; + /* + * Metadata sync is still running, recheck in a short while. + */ + int nextTimeout = MetadataSyncRetryInterval; + nextMetadataSyncTime = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); + timeout = Min(timeout, nextTimeout); + } + else if (!RecoveryInProgress() && + metadataSyncStatus == BGWH_STOPPED && + (MetadataSyncTriggeredCheckAndReset(myDbData) || + GetCurrentTimestamp() >= nextMetadataSyncTime)) + { + if (metadataSyncBgwHandle) + { + TerminateBackgroundWorker(metadataSyncBgwHandle); + pfree(metadataSyncBgwHandle); + metadataSyncBgwHandle = NULL; + } InvalidateMetadataSystemCache(); StartTransactionCommand(); - - /* - * Some functions in ruleutils.c, which we use to get the DDL for - * metadata propagation, require an active snapshot. - */ PushActiveSnapshot(GetTransactionSnapshot()); + int nextTimeout = MetadataSyncRetryInterval; + bool syncMetadata = false; + if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " @@ -475,25 +508,28 @@ CitusMaintenanceDaemonMain(Datum main_arg) } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { - MetadataSyncResult result = SyncMetadataToNodes(); - metadataSyncFailed = (result != METADATA_SYNC_SUCCESS); + bool lockFailure = false; + syncMetadata = ShouldInitiateMetadataSync(&lockFailure); /* - * Notification means we had an attempt on synchronization - * without being blocked for pg_dist_node access. + * If lock fails, we need to recheck in a short while. If we are + * going to sync metadata, we should recheck in a short while to + * see if it failed. Otherwise, we can wait longer. */ - if (result != METADATA_SYNC_FAILED_LOCK) - { - Async_Notify(METADATA_SYNC_CHANNEL, NULL); - } + nextTimeout = (lockFailure || syncMetadata) ? + MetadataSyncRetryInterval : + MetadataSyncInterval; } PopActiveSnapshot(); CommitTransactionCommand(); - ProcessCompletedNotifies(); - int64 nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval : - MetadataSyncInterval; + if (syncMetadata) + { + metadataSyncBgwHandle = + SpawnSyncMetadataToNodes(MyDatabaseId, myDbData->userOid); + } + nextMetadataSyncTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); timeout = Min(timeout, nextTimeout); @@ -668,6 +704,11 @@ CitusMaintenanceDaemonMain(Datum main_arg) ProcessConfigFile(PGC_SIGHUP); } } + + if (metadataSyncBgwHandle) + { + TerminateBackgroundWorker(metadataSyncBgwHandle); + } } @@ -828,7 +869,7 @@ MaintenanceDaemonErrorContext(void *arg) * LockCitusExtension acquires a lock on the Citus extension or returns * false if the extension does not exist or is being dropped. */ -static bool +bool LockCitusExtension(void) { Oid extensionOid = get_extension_oid("citus", true); diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index ee9662047..7ff42674f 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -25,6 +25,7 @@ extern void StopMaintenanceDaemon(Oid databaseId); extern void TriggerMetadataSync(Oid databaseId); extern void InitializeMaintenanceDaemon(void); extern void InitializeMaintenanceDaemonBackend(void); +extern bool LockCitusExtension(void); extern void CitusMaintenanceDaemonMain(Datum main_arg); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 88f1e63a9..8538830ba 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -50,11 +50,14 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha extern void CreateTableMetadataOnWorkers(Oid relationId); extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced); -extern MetadataSyncResult SyncMetadataToNodes(void); +extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SyncMetadataToNodesMain(Datum main_arg); +extern void SignalMetadataSyncDaemon(Oid database, int sig); +extern bool ShouldInitiateMetadataSync(bool *lockFailure); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index a0a595fac..f6bac9100 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -121,6 +121,7 @@ extern void InitializeTransactionManagement(void); /* other functions */ extern List * ActiveSubXactContexts(void); extern StringInfo BeginAndSetDistributedTransactionIdCommand(void); +extern void TriggerMetadataSyncOnCommit(void); #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 71a7f1a7a..b0343b982 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -28,11 +28,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -390 389 f +395 394 f transactionnumberwaitingtransactionnumbers -389 -390 389 +394 +395 394 step s1-abort: ABORT; @@ -75,14 +75,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -394 393 f -395 393 f -395 394 t +399 398 f +400 398 f +400 399 t transactionnumberwaitingtransactionnumbers -393 -394 393 -395 393,394 +398 +399 398 +400 398,399 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_metadata_sync_deadlock.out b/src/test/regress/expected/isolation_metadata_sync_deadlock.out new file mode 100644 index 000000000..e19498f6f --- /dev/null +++ b/src/test/regress/expected/isolation_metadata_sync_deadlock.out @@ -0,0 +1,204 @@ +Parsed test spec with 3 sessions + +starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection +create_distributed_table + + +step enable-deadlock-detection: + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s1-begin: + BEGIN; + +step s1-update-1: + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1; + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-2-on-worker: + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2'); + +run_commands_on_session_level_connection_to_node + + +step s2-truncate-on-worker: + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); + +run_commands_on_session_level_connection_to_node + + +step s3-invalidate-metadata: + update pg_dist_node SET metadatasynced = false; + +step s3-resync: + SELECT trigger_metadata_sync(); + +trigger_metadata_sync + + +step s3-wait: + SELECT pg_sleep(2); + +pg_sleep + + +step s2-update-1-on-worker: + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1'); + +step s1-update-2: + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2; + +step s1-update-2: <... completed> +step s2-update-1-on-worker: <... completed> +run_commands_on_session_level_connection_to_node + + +error in steps s1-update-2 s2-update-1-on-worker: ERROR: canceling the transaction since it was involved in a distributed deadlock +step s1-commit: + COMMIT; + +step s2-commit-on-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step disable-deadlock-detection: + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync s3-wait +create_distributed_table + + +step increase-retry-interval: + ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-truncate-on-worker: + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); + +run_commands_on_session_level_connection_to_node + + +step s3-invalidate-metadata: + update pg_dist_node SET metadatasynced = false; + +step s3-resync: + SELECT trigger_metadata_sync(); + +trigger_metadata_sync + + +step s3-wait: + SELECT pg_sleep(2); + +pg_sleep + + +step s1-count-daemons: + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +count + +1 +step s1-cancel-metadata-sync: + SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + SELECT pg_sleep(2); + +pg_cancel_backend + +t +pg_sleep + + +step s1-count-daemons: + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +count + +0 +step reset-retry-interval: + ALTER SYSTEM RESET citus.metadata_sync_retry_interval; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-commit-on-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s3-resync: + SELECT trigger_metadata_sync(); + +trigger_metadata_sync + + +step s3-wait: + SELECT pg_sleep(2); + +pg_sleep + + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index e1321e549..bf473c310 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -21,6 +21,27 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE master_run_on_worker(ARRAY[hostname], ARRAY[port], ARRAY['SELECT pg_reload_conf()'], false); $$; +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$ +declare + counter integer := -1; +begin + while counter != target_count loop + -- pg_stat_activity is cached at xact level and there is no easy way to clear it. + -- Look it up in a new connection to get latest updates. + SELECT result::int into counter FROM + master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[ + 'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false); + PERFORM pg_sleep(0.1); + end loop; +end$$ LANGUAGE plpgsql; -- add a node to the cluster SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; @@ -152,6 +173,142 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; 2 | t | f (1 row) +-- verify that metadata sync daemon has started +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- terminate maintenance daemon, and verify that we don't spawn multiple +-- metadata sync daemons +-- +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; + pg_terminate_backend +--------------------------------------------------------------------- + t +(1 row) + +CALL wait_until_process_count('Citus Maintenance Daemon', 1); +select trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- cancel metadata sync daemon, and verify that it exits and restarts. +-- +select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted; + metadata_sync_restarted +--------------------------------------------------------------------- + t +(1 row) + +-- +-- cancel metadata sync daemon so it exits and restarts, but at the +-- same time tell maintenanced to trigger a new metadata sync. One +-- of these should exit to avoid multiple metadata syncs. +-- +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +select trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes. +select pg_sleep(1.2); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- error in metadata sync daemon, and verify it exits and restarts. +-- +select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select raise_error_in_metadata_sync(); + raise_error_in_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_error != :pid_after_error AS metadata_sync_restarted; + metadata_sync_restarted +--------------------------------------------------------------------- + t +(1 row) + +SELECT trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + -- update it back to :worker_1_port, now metadata should be synced SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); ?column? @@ -594,6 +751,59 @@ SELECT verify_metadata('localhost', :worker_1_port); t (1 row) +-- verify that metadata sync daemon exits +call wait_until_process_count('Citus Metadata Sync Daemon', 0); +-- verify that DROP DATABASE terminates metadata sync +SELECT current_database() datname \gset +CREATE DATABASE db_to_drop; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +SELECT run_command_on_workers('CREATE DATABASE db_to_drop'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE DATABASE") + (localhost,57638,t,"CREATE DATABASE") +(2 rows) + +\c db_to_drop - - :worker_1_port +CREATE EXTENSION citus; +\c db_to_drop - - :master_port +CREATE EXTENSION citus; +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +--------------------------------------------------------------------- + 1 +(1 row) + +UPDATE pg_dist_node SET hasmetadata = true; +SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node; + master_update_node +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +SELECT trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +\c :datname - - :master_port +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + datname +--------------------------------------------------------------------- + db_to_drop +(1 row) + +DROP DATABASE db_to_drop; +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + datname +--------------------------------------------------------------------- +(0 rows) + -- cleanup DROP TABLE ref_table; TRUNCATE pg_dist_colocation; diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index eca214309..cfc3cf02b 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -32,7 +32,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 39ce4402f..fb2c60a5e 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -87,3 +87,4 @@ test: isolation_insert_select_vs_all_on_mx test: isolation_ref_select_for_update_vs_all_on_mx test: isolation_ref_update_delete_upsert_vs_all_on_mx test: isolation_dis2ref_foreign_keys_on_mx +test: isolation_metadata_sync_deadlock diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 2216f21e4..df6947042 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -421,6 +421,15 @@ push(@pgOptions, "log_error_verbosity = 'verbose'"); # Allow CREATE SUBSCRIPTION to work push(@pgOptions, "wal_level='logical'"); +# Faster logical replication status update so tests with logical replication +# run faster +push(@pgOptions, "wal_receiver_status_interval=1"); + +# Faster logical replication apply worker launch so tests with logical +# replication run faster. This is used in ApplyLauncherMain in +# src/backend/replication/logical/launcher.c. +push(@pgOptions, "wal_retrieve_retry_interval=1000"); + # Citus options set for the tests push(@pgOptions, "citus.shard_count=4"); push(@pgOptions, "citus.max_adaptive_executor_pool_size=4"); diff --git a/src/test/regress/spec/isolation_metadata_sync_deadlock.spec b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec new file mode 100644 index 000000000..c5cabfd84 --- /dev/null +++ b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec @@ -0,0 +1,153 @@ +#include "isolation_mx_common.include.spec" + +setup +{ + CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + + CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + + CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int); + INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i; + SELECT create_distributed_table('deadlock_detection_test', 'user_id'); + + CREATE TABLE t2(a int); + SELECT create_distributed_table('t2', 'a'); +} + +teardown +{ + DROP FUNCTION trigger_metadata_sync(); + DROP TABLE deadlock_detection_test; + DROP TABLE t2; + SET citus.shard_replication_factor = 1; + SELECT citus_internal.restore_isolation_tester_func(); +} + +session "s1" + +step "increase-retry-interval" +{ + ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000; +} + +step "reset-retry-interval" +{ + ALTER SYSTEM RESET citus.metadata_sync_retry_interval; +} + +step "enable-deadlock-detection" +{ + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1; +} + +step "disable-deadlock-detection" +{ + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +} + +step "reload-conf" +{ + SELECT pg_reload_conf(); +} + +step "s1-begin" +{ + BEGIN; +} + +step "s1-update-1" +{ + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1; +} + +step "s1-update-2" +{ + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2; +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-count-daemons" +{ + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; +} + +step "s1-cancel-metadata-sync" +{ + SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + SELECT pg_sleep(2); +} + +session "s2" + +step "s2-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57638); +} + +step "s2-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +step "s2-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "s2-update-1-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1'); +} + +step "s2-update-2-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2'); +} + +step "s2-truncate-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); +} + +step "s2-commit-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +session "s3" + +step "s3-invalidate-metadata" +{ + update pg_dist_node SET metadatasynced = false; +} + +step "s3-resync" +{ + SELECT trigger_metadata_sync(); +} + +step "s3-wait" +{ + SELECT pg_sleep(2); +} + +// Backends can block metadata sync. The following test verifies that if this happens, +// we still do distributed deadlock detection. In the following, s2-truncate-on-worker +// causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are +// themselves involved in a distributed deadlock. +// See https://github.com/citusdata/citus/issues/4393 for more details. +permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection" + +// Test that when metadata sync is waiting for locks, cancelling it terminates it. +// This is important in cases where the metadata sync daemon itself is involved in a deadlock. +permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" "s3-wait" diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 2f6b22872..834baa09e 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -27,6 +27,30 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE ARRAY['SELECT pg_reload_conf()'], false); $$; +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$ +declare + counter integer := -1; +begin + while counter != target_count loop + -- pg_stat_activity is cached at xact level and there is no easy way to clear it. + -- Look it up in a new connection to get latest updates. + SELECT result::int into counter FROM + master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[ + 'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false); + PERFORM pg_sleep(0.1); + end loop; +end$$ LANGUAGE plpgsql; + -- add a node to the cluster SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; @@ -79,6 +103,54 @@ END; SELECT wait_until_metadata_sync(30000); SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; +-- verify that metadata sync daemon has started +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- terminate maintenance daemon, and verify that we don't spawn multiple +-- metadata sync daemons +-- +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; +CALL wait_until_process_count('Citus Maintenance Daemon', 1); +select trigger_metadata_sync(); +select wait_until_metadata_sync(); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- cancel metadata sync daemon, and verify that it exits and restarts. +-- +select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; +select wait_until_metadata_sync(); +select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted; + +-- +-- cancel metadata sync daemon so it exits and restarts, but at the +-- same time tell maintenanced to trigger a new metadata sync. One +-- of these should exit to avoid multiple metadata syncs. +-- +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; +select trigger_metadata_sync(); +select wait_until_metadata_sync(); +-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes. +select pg_sleep(1.2); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- error in metadata sync daemon, and verify it exits and restarts. +-- +select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select raise_error_in_metadata_sync(); +select wait_until_metadata_sync(30000); +select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_error != :pid_after_error AS metadata_sync_restarted; + + +SELECT trigger_metadata_sync(); +SELECT wait_until_metadata_sync(30000); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + -- update it back to :worker_1_port, now metadata should be synced SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); SELECT wait_until_metadata_sync(30000); @@ -249,6 +321,39 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT verify_metadata('localhost', :worker_1_port); +-- verify that metadata sync daemon exits +call wait_until_process_count('Citus Metadata Sync Daemon', 0); + +-- verify that DROP DATABASE terminates metadata sync +SELECT current_database() datname \gset +CREATE DATABASE db_to_drop; +SELECT run_command_on_workers('CREATE DATABASE db_to_drop'); + +\c db_to_drop - - :worker_1_port +CREATE EXTENSION citus; + +\c db_to_drop - - :master_port +CREATE EXTENSION citus; +SELECT master_add_node('localhost', :worker_1_port); +UPDATE pg_dist_node SET hasmetadata = true; + +SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node; + +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +SELECT trigger_metadata_sync(); + +\c :datname - - :master_port + +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +DROP DATABASE db_to_drop; + +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + -- cleanup DROP TABLE ref_table; TRUNCATE pg_dist_colocation; diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index a50d1d3cd..4026c2f00 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -30,7 +30,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL