diff --git a/citus-tools b/citus-tools new file mode 160000 index 000000000..3376bd684 --- /dev/null +++ b/citus-tools @@ -0,0 +1 @@ +Subproject commit 3376bd6845f0614908ed304f5033bd644c82d3bf diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 4148e442d..df28f87ca 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -150,6 +150,10 @@ multi_ProcessUtility(PlannedStmt *pstmt, DestReceiver *dest, QueryCompletion *completionTag) { + + bool isCitusExtensionDDL = false; + + if (readOnlyTree) { pstmt = copyObject(pstmt); @@ -169,6 +173,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, } } + if (IsA(parsetree, AlterExtensionStmt)) + { + isCitusExtensionDDL = true; + } + if (IsA(parsetree, TransactionStmt) || IsA(parsetree, ListenStmt) || IsA(parsetree, NotifyStmt) || @@ -329,6 +338,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, UtilityHookLevel++; + if (isCitusExtensionDDL) + { + LockCitusExtension(); /* EXCLUSIVE lock for extension DDL */ + } + PG_TRY(); { ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest, @@ -392,6 +406,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, } PG_CATCH(); { + if (isCitusExtensionDDL) + { + UnlockCitusExtension(); + } + if (UtilityHookLevel == 1) { ResetConstraintDropped(); @@ -402,6 +421,11 @@ multi_ProcessUtility(PlannedStmt *pstmt, PG_RE_THROW(); } PG_END_TRY(); + + if (isCitusExtensionDDL) + { + UnlockCitusExtension(); + } } @@ -1320,6 +1344,26 @@ set_indexsafe_procflags(void) LWLockRelease(ProcArrayLock); } +/* Acquire the lock in EXCLUSIVE mode. */ +static void +LockCitusExtension(void) +{ + if (!enable_extension_update_lock || ExtensionUpdateLock == NULL) + return; /* do nothing if disabled or not set up */ + + LWLockAcquire(ExtensionUpdateLock, LW_EXCLUSIVE); +} + +/* Release the lock. */ +static void +UnlockCitusExtension(void) +{ + if (!enable_extension_update_lock || ExtensionUpdateLock == NULL) + return; + + LWLockRelease(ExtensionUpdateLock); +} + /* * CurrentSearchPath is a C interface for calling current_schemas(bool) that diff --git a/src/backend/distributed/executor/query_stats.c b/src/backend/distributed/executor/query_stats.c index 319041b56..4355e5af2 100644 --- a/src/backend/distributed/executor/query_stats.c +++ b/src/backend/distributed/executor/query_stats.c @@ -172,6 +172,7 @@ CitusQueryStatsShmemStartup(void) { /* First time through ... */ queryStats->lock = &(GetNamedLWLockTranche(STATS_SHARED_MEM_NAME))->lock; + queryStats->lock = &(GetNamedLWLockTranche(CITUS_EXTENSION_LOCK))->lock; } memset(&info, 0, sizeof(info)); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 33faaa6c8..185ad7f33 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -613,6 +613,7 @@ citus_shmem_request(void) RequestAddinShmemSpace(CitusQueryStatsSharedMemSize()); RequestAddinShmemSpace(LogicalClockShmemSize()); RequestNamedLWLockTranche(STATS_SHARED_MEM_NAME, 1); + RequestNamedLWLockTranche(CITUS_EXTENSION_LOCK, 1); } @@ -2642,6 +2643,16 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_extension_update_lock", + gettext_noop("Use a dedicated LWLock to prevent deadlocks during extension updates."), + NULL, + &enable_extension_update_lock, + true, + PGC_SUSET, + 0, + NULL, NULL, NULL); + /* warn about config items in the citus namespace that are not registered above */ EmitWarningsOnPlaceholders("citus"); diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 49f2266c7..6a2d86761 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -302,6 +302,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * Look up this worker's configuration. */ LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); + LWLockAcquire(ExtensionUpdateLock, LW_SHARED); MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) hash_search(MaintenanceDaemonDBHash, &databaseOid, @@ -355,6 +356,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) IsMaintenanceDaemon = true; LWLockRelease(&MaintenanceDaemonControl->lock); + LWLockRelease(ExtensionUpdateLock); /* * Setup error context so log messages can be properly attributed. Some of diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6708d9a64..fe5f3db6d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -71,6 +71,7 @@ extern int MaxAdaptiveExecutorPoolSize; extern int ExecutorSlowStartInterval; extern bool SortReturning; extern int ExecutorLevel; +extern bool enable_extension_update_lock; extern void CitusExecutorStart(QueryDesc *queryDesc, int eflags); diff --git a/src/include/distributed/query_stats.h b/src/include/distributed/query_stats.h index cc847c42b..a1182c0c3 100644 --- a/src/include/distributed/query_stats.h +++ b/src/include/distributed/query_stats.h @@ -13,6 +13,7 @@ #include "distributed/multi_server_executor.h" #define STATS_SHARED_MEM_NAME "citus_query_stats" +#define CITUS_EXTENSION_LOCK "CitusExtensionLock" extern Size CitusQueryStatsSharedMemSize(void); extern void InitializeCitusQueryStats(void);