Tell other backends it's safe to ignore the backend that concurrently built the shell table index (#5520)

In addition to starting a new transaction, we also need to tell other
backends --including the ones spawned for connections opened to
localhost to build indexes on shards of this relation-- that concurrent
index builds can safely ignore us.

Normally, DefineIndex() only does that if index doesn't have any
predicates (i.e.: where clause) and no index expressions at all.
However, now that we already called standard process utility, index
build on the shell table is finished anyway.

The reason behind doing so is that we cannot guarantee not grabbing any
snapshots via adaptive executor, and the backends creating indexes on
local shards (if any) might block on waiting for current xact of the
current backend to finish, which would cause self deadlocks that are not
detectable.
pull/5609/head
Onur Tirtir 2022-01-10 10:23:09 +03:00 committed by GitHub
parent 73a76b876a
commit 3cc44ed8b3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 95 additions and 0 deletions

View File

@ -33,7 +33,9 @@
#include "access/attnum.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#if PG_VERSION_NUM < 140000
#include "access/xact.h"
#endif
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "commands/dbcommands.h"
@ -52,7 +54,9 @@
#include "distributed/local_executor.h"
#include "distributed/maintenanced.h"
#include "distributed/multi_partitioning_utils.h"
#if PG_VERSION_NUM < 140000
#include "distributed/metadata_cache.h"
#endif
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
@ -91,6 +95,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
QueryCompletionCompat *completionTag);
#if PG_VERSION_NUM >= 140000
static void set_indexsafe_procflags(void);
#endif
static char * SetSearchPathToCurrentSearchPathCommand(void);
static char * CurrentSearchPath(void);
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
@ -1118,9 +1125,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
/*
* Start a new transaction to make sure CONCURRENTLY commands
* on localhost do not block waiting for this transaction to finish.
*
* In addition to doing that, we also need to tell other backends
* --including the ones spawned for connections opened to localhost to
* build indexes on shards of this relation-- that concurrent index
* builds can safely ignore us.
*
* Normally, DefineIndex() only does that if index doesn't have any
* predicates (i.e.: where clause) and no index expressions at all.
* However, now that we already called standard process utility,
* index build on the shell table is finished anyway.
*
* The reason behind doing so is that we cannot guarantee not
* grabbing any snapshots via adaptive executor, and the backends
* creating indexes on local shards (if any) might block on waiting
* for current xact of the current backend to finish, which would
* cause self deadlocks that are not detectable.
*/
if (ddlJob->startNewTransaction)
{
#if PG_VERSION_NUM < 140000
/*
* Older versions of postgres doesn't have PROC_IN_SAFE_IC flag
* so we cannot use set_indexsafe_procflags in those versions.
*
* For this reason, we do our best to ensure not grabbing any
* snapshots later in the executor.
*/
/*
* If cache is not populated, system catalog lookups will cause
* the xmin of current backend to change. Then the last phase
@ -1141,8 +1174,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
* will already be in the hash table, hence we won't be holding any snapshots.
*/
WarmUpConnParamsHash();
#endif
/*
* Since it is not certain whether the code-path that we followed
* until reaching here caused grabbing any snapshots or not, we
* need to pop the active snapshot if we had any, to ensure not
* leaking any snapshots.
*
* For example, EnsureCoordinator might return without grabbing
* any snapshots if we didn't receive any invalidation messages
* but the otherwise is also possible.
*/
if (ActiveSnapshotSet())
{
PopActiveSnapshot();
}
CommitTransactionCommand();
StartTransactionCommand();
#if PG_VERSION_NUM >= 140000
/*
* Tell other backends to ignore us, even if we grab any
* snapshots via adaptive executor.
*/
set_indexsafe_procflags();
#endif
}
MemoryContext savedContext = CurrentMemoryContext;
@ -1205,6 +1264,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
}
#if PG_VERSION_NUM >= 140000
/*
* set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags.
*
* The flag is reset automatically at transaction end, so it must be set
* for each transaction.
*
* Copied from pg/src/backend/commands/indexcmds.c
* Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163.
*/
static void
set_indexsafe_procflags(void)
{
Assert(MyProc->xid == InvalidTransactionId &&
MyProc->xmin == InvalidTransactionId);
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
MyProc->statusFlags |= PROC_IN_SAFE_IC;
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
LWLockRelease(ProcArrayLock);
}
#endif
/*
* CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
* of shards of a distributed table. The command to be applied is generated by the

View File

@ -36,6 +36,7 @@
#include "distributed/version_compat.h"
#include "distributed/worker_log_messages.h"
#include "mb/pg_wchar.h"
#include "pg_config.h"
#include "portability/instr_time.h"
#include "storage/ipc.h"
#include "utils/hsearch.h"
@ -1242,6 +1243,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
}
#if PG_VERSION_NUM < 140000
/*
* WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
* conn params for active primary nodes.
@ -1263,6 +1266,9 @@ WarmUpConnParamsHash(void)
}
#endif
/*
* FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
* if it is not found, it is created.

View File

@ -16,6 +16,7 @@
#include "distributed/transaction_management.h"
#include "distributed/remote_transaction.h"
#include "lib/ilist.h"
#include "pg_config.h"
#include "portability/instr_time.h"
#include "utils/guc.h"
#include "utils/hsearch.h"
@ -283,5 +284,7 @@ extern void MarkConnectionConnected(MultiConnection *connection);
extern double MillisecondsPassedSince(instr_time moment);
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
#if PG_VERSION_NUM < 140000
extern void WarmUpConnParamsHash(void);
#endif
#endif /* CONNECTION_MANAGMENT_H */