diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 12fdc5f45..35c525633 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -33,7 +33,9 @@ #include "access/attnum.h" #include "access/heapam.h" #include "access/htup_details.h" +#if PG_VERSION_NUM < 140000 #include "access/xact.h" +#endif #include "catalog/catalog.h" #include "catalog/dependency.h" #include "commands/dbcommands.h" @@ -51,7 +53,9 @@ #include "distributed/local_executor.h" #include "distributed/maintenanced.h" #include "distributed/coordinator_protocol.h" +#if PG_VERSION_NUM < 140000 #include "distributed/metadata_cache.h" +#endif #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" @@ -67,6 +71,7 @@ #include "tcop/utility.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -88,6 +93,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag); +#if PG_VERSION_NUM >= 140000 +static void set_indexsafe_procflags(void); +#endif static char * SetSearchPathToCurrentSearchPathCommand(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -906,9 +914,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) /* * Start a new transaction to make sure CONCURRENTLY commands * on localhost do not block waiting for this transaction to finish. + * + * In addition to doing that, we also need to tell other backends + * --including the ones spawned for connections opened to localhost to + * build indexes on shards of this relation-- that concurrent index + * builds can safely ignore us. + * + * Normally, DefineIndex() only does that if index doesn't have any + * predicates (i.e.: where clause) and no index expressions at all. + * However, now that we already called standard process utility, + * index build on the shell table is finished anyway. + * + * The reason behind doing so is that we cannot guarantee not + * grabbing any snapshots via adaptive executor, and the backends + * creating indexes on local shards (if any) might block on waiting + * for current xact of the current backend to finish, which would + * cause self deadlocks that are not detectable. */ if (ddlJob->startNewTransaction) { +#if PG_VERSION_NUM < 140000 + + /* + * Older versions of postgres doesn't have PROC_IN_SAFE_IC flag + * so we cannot use set_indexsafe_procflags in those versions. + * + * For this reason, we do our best to ensure not grabbing any + * snapshots later in the executor. + */ + /* * If cache is not populated, system catalog lookups will cause * the xmin of current backend to change. Then the last phase @@ -929,8 +963,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) * will already be in the hash table, hence we won't be holding any snapshots. */ WarmUpConnParamsHash(); +#endif + + /* + * Since it is not certain whether the code-path that we followed + * until reaching here caused grabbing any snapshots or not, we + * need to pop the active snapshot if we had any, to ensure not + * leaking any snapshots. + * + * For example, EnsureCoordinator might return without grabbing + * any snapshots if we didn't receive any invalidation messages + * but the otherwise is also possible. + */ + if (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + } + CommitTransactionCommand(); StartTransactionCommand(); + +#if PG_VERSION_NUM >= 140000 + + /* + * Tell other backends to ignore us, even if we grab any + * snapshots via adaptive executor. + */ + set_indexsafe_procflags(); +#endif } /* save old commit protocol to restore at xact end */ @@ -997,6 +1057,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } +#if PG_VERSION_NUM >= 140000 + +/* + * set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags. + * + * The flag is reset automatically at transaction end, so it must be set + * for each transaction. + * + * Copied from pg/src/backend/commands/indexcmds.c + * Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163. + */ +static void +set_indexsafe_procflags(void) +{ + Assert(MyProc->xid == InvalidTransactionId && + MyProc->xmin == InvalidTransactionId); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags |= PROC_IN_SAFE_IC; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); +} + + +#endif + + /* * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements * of shards of a distributed table. The command to be applied is generated by the diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 612453ef9..161d2c7d7 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -36,6 +36,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "storage/ipc.h" #include "utils/hsearch.h" @@ -1155,6 +1156,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key } +#if PG_VERSION_NUM < 140000 + /* * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the * conn params for active primary nodes. @@ -1176,6 +1179,9 @@ WarmUpConnParamsHash(void) } +#endif + + /* * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, * if it is not found, it is created. diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5dffdef35..0245626bd 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -16,6 +16,7 @@ #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -264,5 +265,7 @@ extern void MarkConnectionConnected(MultiConnection *connection); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +#if PG_VERSION_NUM < 140000 extern void WarmUpConnParamsHash(void); +#endif #endif /* CONNECTION_MANAGMENT_H */