mirror of https://github.com/citusdata/citus.git
Tell other backends it's safe to ignore the backend that concurrently built the shell table index (#5520)
In addition to starting a new transaction, we also need to tell other
backends --including the ones spawned for connections opened to
localhost to build indexes on shards of this relation-- that concurrent
index builds can safely ignore us.
Normally, DefineIndex() only does that if index doesn't have any
predicates (i.e.: where clause) and no index expressions at all.
However, now that we already called standard process utility, index
build on the shell table is finished anyway.
The reason behind doing so is that we cannot guarantee not grabbing any
snapshots via adaptive executor, and the backends creating indexes on
local shards (if any) might block on waiting for current xact of the
current backend to finish, which would cause self deadlocks that are not
detectable.
(cherry picked from commit 3cc44ed8b3
)
Conflicts:
src/backend/distributed/commands/utility_hook.c
onder_10_2_comm
parent
5f04346408
commit
5a1a1334d3
|
@ -33,7 +33,9 @@
|
|||
#include "access/attnum.h"
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#if PG_VERSION_NUM < 140000
|
||||
#include "access/xact.h"
|
||||
#endif
|
||||
#include "catalog/catalog.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "commands/dbcommands.h"
|
||||
|
@ -51,7 +53,9 @@
|
|||
#include "distributed/local_executor.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#if PG_VERSION_NUM < 140000
|
||||
#include "distributed/metadata_cache.h"
|
||||
#endif
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
|
@ -67,6 +71,7 @@
|
|||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/snapmgr.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
|
||||
|
@ -88,6 +93,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
struct QueryEnvironment *queryEnv,
|
||||
DestReceiver *dest,
|
||||
QueryCompletionCompat *completionTag);
|
||||
#if PG_VERSION_NUM >= 140000
|
||||
static void set_indexsafe_procflags(void);
|
||||
#endif
|
||||
static char * SetSearchPathToCurrentSearchPathCommand(void);
|
||||
static char * CurrentSearchPath(void);
|
||||
static void IncrementUtilityHookCountersIfNecessary(Node *parsetree);
|
||||
|
@ -906,9 +914,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
/*
|
||||
* Start a new transaction to make sure CONCURRENTLY commands
|
||||
* on localhost do not block waiting for this transaction to finish.
|
||||
*
|
||||
* In addition to doing that, we also need to tell other backends
|
||||
* --including the ones spawned for connections opened to localhost to
|
||||
* build indexes on shards of this relation-- that concurrent index
|
||||
* builds can safely ignore us.
|
||||
*
|
||||
* Normally, DefineIndex() only does that if index doesn't have any
|
||||
* predicates (i.e.: where clause) and no index expressions at all.
|
||||
* However, now that we already called standard process utility,
|
||||
* index build on the shell table is finished anyway.
|
||||
*
|
||||
* The reason behind doing so is that we cannot guarantee not
|
||||
* grabbing any snapshots via adaptive executor, and the backends
|
||||
* creating indexes on local shards (if any) might block on waiting
|
||||
* for current xact of the current backend to finish, which would
|
||||
* cause self deadlocks that are not detectable.
|
||||
*/
|
||||
if (ddlJob->startNewTransaction)
|
||||
{
|
||||
#if PG_VERSION_NUM < 140000
|
||||
|
||||
/*
|
||||
* Older versions of postgres doesn't have PROC_IN_SAFE_IC flag
|
||||
* so we cannot use set_indexsafe_procflags in those versions.
|
||||
*
|
||||
* For this reason, we do our best to ensure not grabbing any
|
||||
* snapshots later in the executor.
|
||||
*/
|
||||
|
||||
/*
|
||||
* If cache is not populated, system catalog lookups will cause
|
||||
* the xmin of current backend to change. Then the last phase
|
||||
|
@ -929,8 +963,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
* will already be in the hash table, hence we won't be holding any snapshots.
|
||||
*/
|
||||
WarmUpConnParamsHash();
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Since it is not certain whether the code-path that we followed
|
||||
* until reaching here caused grabbing any snapshots or not, we
|
||||
* need to pop the active snapshot if we had any, to ensure not
|
||||
* leaking any snapshots.
|
||||
*
|
||||
* For example, EnsureCoordinator might return without grabbing
|
||||
* any snapshots if we didn't receive any invalidation messages
|
||||
* but the otherwise is also possible.
|
||||
*/
|
||||
if (ActiveSnapshotSet())
|
||||
{
|
||||
PopActiveSnapshot();
|
||||
}
|
||||
|
||||
CommitTransactionCommand();
|
||||
StartTransactionCommand();
|
||||
|
||||
#if PG_VERSION_NUM >= 140000
|
||||
|
||||
/*
|
||||
* Tell other backends to ignore us, even if we grab any
|
||||
* snapshots via adaptive executor.
|
||||
*/
|
||||
set_indexsafe_procflags();
|
||||
#endif
|
||||
}
|
||||
|
||||
/* save old commit protocol to restore at xact end */
|
||||
|
@ -997,6 +1057,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
|
|||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= 140000
|
||||
|
||||
/*
|
||||
* set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags.
|
||||
*
|
||||
* The flag is reset automatically at transaction end, so it must be set
|
||||
* for each transaction.
|
||||
*
|
||||
* Copied from pg/src/backend/commands/indexcmds.c
|
||||
* Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163.
|
||||
*/
|
||||
static void
|
||||
set_indexsafe_procflags(void)
|
||||
{
|
||||
Assert(MyProc->xid == InvalidTransactionId &&
|
||||
MyProc->xmin == InvalidTransactionId);
|
||||
|
||||
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
|
||||
MyProc->statusFlags |= PROC_IN_SAFE_IC;
|
||||
ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
|
||||
LWLockRelease(ProcArrayLock);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements
|
||||
* of shards of a distributed table. The command to be applied is generated by the
|
||||
|
|
|
@ -36,6 +36,7 @@
|
|||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_log_messages.h"
|
||||
#include "mb/pg_wchar.h"
|
||||
#include "pg_config.h"
|
||||
#include "portability/instr_time.h"
|
||||
#include "storage/ipc.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
@ -1155,6 +1156,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key
|
|||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM < 140000
|
||||
|
||||
/*
|
||||
* WarmUpConnParamsHash warms up the ConnParamsHash by loading all the
|
||||
* conn params for active primary nodes.
|
||||
|
@ -1176,6 +1179,9 @@ WarmUpConnParamsHash(void)
|
|||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
* FindOrCreateConnParamsEntry searches ConnParamsHash for the given key,
|
||||
* if it is not found, it is created.
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "distributed/transaction_management.h"
|
||||
#include "distributed/remote_transaction.h"
|
||||
#include "lib/ilist.h"
|
||||
#include "pg_config.h"
|
||||
#include "portability/instr_time.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hsearch.h"
|
||||
|
@ -264,5 +265,7 @@ extern void MarkConnectionConnected(MultiConnection *connection);
|
|||
extern double MillisecondsPassedSince(instr_time moment);
|
||||
extern long MillisecondsToTimeout(instr_time start, long msAfterStart);
|
||||
|
||||
#if PG_VERSION_NUM < 140000
|
||||
extern void WarmUpConnParamsHash(void);
|
||||
#endif
|
||||
#endif /* CONNECTION_MANAGMENT_H */
|
||||
|
|
Loading…
Reference in New Issue