extension update lock example

m3hm3t/lock_ext_update
Mehmet Yilmaz 2025-02-04 10:26:33 +00:00
parent ee76c4423e
commit 43d1ed713f
7 changed files with 61 additions and 0 deletions

1
citus-tools Submodule

@ -0,0 +1 @@
Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf

View File

@ -150,6 +150,10 @@ multi_ProcessUtility(PlannedStmt *pstmt,
DestReceiver *dest, DestReceiver *dest,
QueryCompletion *completionTag) QueryCompletion *completionTag)
{ {
bool isCitusExtensionDDL = false;
if (readOnlyTree) if (readOnlyTree)
{ {
pstmt = copyObject(pstmt); pstmt = copyObject(pstmt);
@ -169,6 +173,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
} }
} }
if (IsA(parsetree, AlterExtensionStmt))
{
isCitusExtensionDDL = true;
}
if (IsA(parsetree, TransactionStmt) || if (IsA(parsetree, TransactionStmt) ||
IsA(parsetree, ListenStmt) || IsA(parsetree, ListenStmt) ||
IsA(parsetree, NotifyStmt) || IsA(parsetree, NotifyStmt) ||
@ -329,6 +338,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
UtilityHookLevel++; UtilityHookLevel++;
if (isCitusExtensionDDL)
{
LockCitusExtension(); /* EXCLUSIVE lock for extension DDL */
}
PG_TRY(); PG_TRY();
{ {
ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest, ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest,
@ -392,6 +406,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
} }
PG_CATCH(); PG_CATCH();
{ {
if (isCitusExtensionDDL)
{
UnlockCitusExtension();
}
if (UtilityHookLevel == 1) if (UtilityHookLevel == 1)
{ {
ResetConstraintDropped(); ResetConstraintDropped();
@ -402,6 +421,11 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
if (isCitusExtensionDDL)
{
UnlockCitusExtension();
}
} }
@ -1320,6 +1344,26 @@ set_indexsafe_procflags(void)
LWLockRelease(ProcArrayLock); LWLockRelease(ProcArrayLock);
} }
/* Acquire the lock in EXCLUSIVE mode. */
static void
LockCitusExtension(void)
{
if (!enable_extension_update_lock || ExtensionUpdateLock == NULL)
return; /* do nothing if disabled or not set up */
LWLockAcquire(ExtensionUpdateLock, LW_EXCLUSIVE);
}
/* Release the lock. */
static void
UnlockCitusExtension(void)
{
if (!enable_extension_update_lock || ExtensionUpdateLock == NULL)
return;
LWLockRelease(ExtensionUpdateLock);
}
/* /*
* CurrentSearchPath is a C interface for calling current_schemas(bool) that * CurrentSearchPath is a C interface for calling current_schemas(bool) that

View File

@ -172,6 +172,7 @@ CitusQueryStatsShmemStartup(void)
{ {
/* First time through ... */ /* First time through ... */
queryStats->lock = &(GetNamedLWLockTranche(STATS_SHARED_MEM_NAME))->lock; queryStats->lock = &(GetNamedLWLockTranche(STATS_SHARED_MEM_NAME))->lock;
queryStats->lock = &(GetNamedLWLockTranche(CITUS_EXTENSION_LOCK))->lock;
} }
memset(&info, 0, sizeof(info)); memset(&info, 0, sizeof(info));

View File

@ -613,6 +613,7 @@ citus_shmem_request(void)
RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(CitusQueryStatsSharedMemSize());
RequestAddinShmemSpace(LogicalClockShmemSize()); RequestAddinShmemSpace(LogicalClockShmemSize());
RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1);
RequestNamedLWLockTranche(CITUS_EXTENSION_LOCK, 1);
} }
@ -2642,6 +2643,16 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_extension_update_lock",
gettext_noop("Use a dedicated LWLock to prevent deadlocks during extension updates."),
NULL,
&enable_extension_update_lock,
true,
PGC_SUSET,
0,
NULL, NULL, NULL);
/* warn about config items in the citus namespace that are not registered above */ /* warn about config items in the citus namespace that are not registered above */
EmitWarningsOnPlaceholders("citus"); EmitWarningsOnPlaceholders("citus");

View File

@ -302,6 +302,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* Look up this worker's configuration. * Look up this worker's configuration.
*/ */
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
LWLockAcquire(ExtensionUpdateLock, LW_SHARED);
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid, hash_search(MaintenanceDaemonDBHash, &databaseOid,
@ -355,6 +356,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
IsMaintenanceDaemon = true; IsMaintenanceDaemon = true;
LWLockRelease(&MaintenanceDaemonControl->lock); LWLockRelease(&MaintenanceDaemonControl->lock);
LWLockRelease(ExtensionUpdateLock);
/* /*
* Setup error context so log messages can be properly attributed. Some of * Setup error context so log messages can be properly attributed. Some of

View File

@ -71,6 +71,7 @@ extern int MaxAdaptiveExecutorPoolSize;
extern int ExecutorSlowStartInterval; extern int ExecutorSlowStartInterval;
extern bool SortReturning; extern bool SortReturning;
extern int ExecutorLevel; extern int ExecutorLevel;
extern bool enable_extension_update_lock;
extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags);

View File

@ -13,6 +13,7 @@
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#define STATS_SHARED_MEM_NAME "citus_query_stats" #define STATS_SHARED_MEM_NAME "citus_query_stats"
#define CITUS_EXTENSION_LOCK "CitusExtensionLock"
extern Size CitusQueryStatsSharedMemSize(void); extern Size CitusQueryStatsSharedMemSize(void);
extern void InitializeCitusQueryStats(void); extern void InitializeCitusQueryStats(void);