From 5a1a1334d3e7bc23638f15fc6d3b528f2032b76f Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 10 Jan 2022 10:23:09 +0300 Subject: [PATCH] Tell other backends it's safe to ignore the backend that concurrently built the shell table index (#5520) In addition to starting a new transaction, we also need to tell other backends --including the ones spawned for connections opened to localhost to build indexes on shards of this relation-- that concurrent index builds can safely ignore us. Normally, DefineIndex() only does that if index doesn't have any predicates (i.e.: where clause) and no index expressions at all. However, now that we already called standard process utility, index build on the shell table is finished anyway. The reason behind doing so is that we cannot guarantee not grabbing any snapshots via adaptive executor, and the backends creating indexes on local shards (if any) might block on waiting for current xact of the current backend to finish, which would cause self deadlocks that are not detectable. (cherry picked from commit 3cc44ed8b3c4d26ab0642c2b810b502bfd55a27a) Conflicts: src/backend/distributed/commands/utility_hook.c --- .../distributed/commands/utility_hook.c | 87 +++++++++++++++++++ .../connection/connection_management.c | 6 ++ .../distributed/connection_management.h | 3 + 3 files changed, 96 insertions(+) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 12fdc5f45..35c525633 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -33,7 +33,9 @@ #include "access/attnum.h" #include "access/heapam.h" #include "access/htup_details.h" +#if PG_VERSION_NUM < 140000 #include "access/xact.h" +#endif #include "catalog/catalog.h" #include "catalog/dependency.h" #include "commands/dbcommands.h" @@ -51,7 +53,9 @@ #include "distributed/local_executor.h" #include "distributed/maintenanced.h" #include "distributed/coordinator_protocol.h" +#if PG_VERSION_NUM < 140000 #include "distributed/metadata_cache.h" +#endif #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -67,6 +71,7 @@ #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -88,6 +93,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag); +#if PG_VERSION_NUM >= 140000 +static void set_indexsafe_procflags(void); +#endif static char * SetSearchPathToCurrentSearchPathCommand(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -906,9 +914,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) /* * Start a new transaction to make sure CONCURRENTLY commands * on localhost do not block waiting for this transaction to finish. + * + * In addition to doing that, we also need to tell other backends + * --including the ones spawned for connections opened to localhost to + * build indexes on shards of this relation-- that concurrent index + * builds can safely ignore us. + * + * Normally, DefineIndex() only does that if index doesn't have any + * predicates (i.e.: where clause) and no index expressions at all. + * However, now that we already called standard process utility, + * index build on the shell table is finished anyway. + * + * The reason behind doing so is that we cannot guarantee not + * grabbing any snapshots via adaptive executor, and the backends + * creating indexes on local shards (if any) might block on waiting + * for current xact of the current backend to finish, which would + * cause self deadlocks that are not detectable. */ if (ddlJob->startNewTransaction) { +#if PG_VERSION_NUM < 140000 + + /* + * Older versions of postgres doesn't have PROC_IN_SAFE_IC flag + * so we cannot use set_indexsafe_procflags in those versions. + * + * For this reason, we do our best to ensure not grabbing any + * snapshots later in the executor. + */ + /* * If cache is not populated, system catalog lookups will cause * the xmin of current backend to change. Then the last phase @@ -929,8 +963,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) * will already be in the hash table, hence we won't be holding any snapshots. */ WarmUpConnParamsHash(); +#endif + + /* + * Since it is not certain whether the code-path that we followed + * until reaching here caused grabbing any snapshots or not, we + * need to pop the active snapshot if we had any, to ensure not + * leaking any snapshots. + * + * For example, EnsureCoordinator might return without grabbing + * any snapshots if we didn't receive any invalidation messages + * but the otherwise is also possible. + */ + if (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + } + CommitTransactionCommand(); StartTransactionCommand(); + +#if PG_VERSION_NUM >= 140000 + + /* + * Tell other backends to ignore us, even if we grab any + * snapshots via adaptive executor. + */ + set_indexsafe_procflags(); +#endif } /* save old commit protocol to restore at xact end */ @@ -997,6 +1057,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } +#if PG_VERSION_NUM >= 140000 + +/* + * set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags. + * + * The flag is reset automatically at transaction end, so it must be set + * for each transaction. + * + * Copied from pg/src/backend/commands/indexcmds.c + * Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163. + */ +static void +set_indexsafe_procflags(void) +{ + Assert(MyProc->xid == InvalidTransactionId && + MyProc->xmin == InvalidTransactionId); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags |= PROC_IN_SAFE_IC; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); +} + + +#endif + + /* * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements * of shards of a distributed table. The command to be applied is generated by the diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 612453ef9..161d2c7d7 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -36,6 +36,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "storage/ipc.h" #include "utils/hsearch.h" @@ -1155,6 +1156,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key } +#if PG_VERSION_NUM < 140000 + /* * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the * conn params for active primary nodes. @@ -1176,6 +1179,9 @@ WarmUpConnParamsHash(void) } +#endif + + /* * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, * if it is not found, it is created. diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5dffdef35..0245626bd 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -16,6 +16,7 @@ #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -264,5 +265,7 @@ extern void MarkConnectionConnected(MultiConnection *connection); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +#if PG_VERSION_NUM < 140000 extern void WarmUpConnParamsHash(void); +#endif #endif /* CONNECTION_MANAGMENT_H */