PR #6728  / commit - 10

Do not acquire strict lock on separate transaction to localhost as we already take the lock before.
But make sure that caller has the ExclusiveLock.
pull/6728/head
aykutbozkurt 2023-03-28 13:26:44 +03:00
parent a74232bb39
commit fe00b3263a
8 changed files with 181 additions and 24 deletions

View File

@ -48,6 +48,7 @@
#include "distributed/version_compat.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "executor/spi.h"
#include "lib/stringinfo.h"
#include "postmaster/postmaster.h"
#include "storage/bufmgr.h"
@ -94,6 +95,7 @@ static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
*nodeMetadata, bool *nodeAlreadyExists);
static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive);
static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort);
static HeapTuple GetNodeByNodeId(int32 nodeId);
static int32 GetNextGroupId(void);
static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void);
@ -125,7 +127,9 @@ static void SyncNodeMetadata(MetadataSyncContext *context);
static void SetWorkerColumnViaMetadataContext(MetadataSyncContext *context,
WorkerNode *workerNode,
int columnIndex, Datum value);
static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context);
static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t
parentSessionPid);
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
@ -152,6 +156,7 @@ PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid);
PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
/*
@ -836,10 +841,11 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes)
/*
* MarkNodesNotSyncedInLoopBackConnection unsets metadatasynced flag in separate
* connection to localhost.
* connection to localhost by calling the udf `citus_internal_mark_node_not_synced`.
*/
static void
MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context)
MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t
parentSessionPid)
{
Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL);
Assert(!MetadataSyncCollectsCommands(context));
@ -865,16 +871,24 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context)
MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName,
PostPortNumber);
bool metadatasynced = false;
List *commandList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, context->activatedWorkerNodeList)
{
char *metadatasyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId,
metadatasynced);
List *commandList = list_make1(metadatasyncCommand);
SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
/*
* We need to prevent self deadlock when we access pg_dist_node using separate
* connection to localhost. To achieve this, we check if the caller session's
* pid holds the Exclusive lock on pg_dist_node. After ensuring that (we are
* called from parent session which holds the Exclusive lock), we can safely
* update node metadata by acquiring lower level of lock.
*/
StringInfo metadatasyncCommand = makeStringInfo();
appendStringInfo(metadatasyncCommand, CITUS_INTERNAL_MARK_NODE_NOT_SYNCED,
parentSessionPid, workerNode->nodeId);
commandList = lappend(commandList, metadatasyncCommand->data);
}
SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
CloseConnection(connection);
}
@ -972,6 +986,18 @@ ActivateNodeList(MetadataSyncContext *context)
*/
EnsureSuperUser();
/*
* Take an exclusive lock on pg_dist_node to serialize pg_dist_node
* changes.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
/*
* Error if there is concurrent change to node table before acquiring
* the lock
*/
ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList);
/*
* we need to unset metadatasynced flag to false at coordinator in separate
* transaction only at nontransactional sync mode and if we do not collect
@ -984,22 +1010,9 @@ ActivateNodeList(MetadataSyncContext *context)
if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
!MetadataSyncCollectsCommands(context))
{
MarkNodesNotSyncedInLoopBackConnection(context);
MarkNodesNotSyncedInLoopBackConnection(context, MyProcPid);
}
/*
* Take an exclusive lock on pg_dist_node to serialize pg_dist_node
* changes. We should not acquire the lock before deactivating
* metadata nodes as it causes deadlock.
*/
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
/*
* Error if there is concurrent change to node table before acquiring
* the lock
*/
ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList);
/*
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
@ -1496,6 +1509,101 @@ citus_is_coordinator(PG_FUNCTION_ARGS)
}
/*
* EnsureParentSessionHasExclusiveLockOnPgDistNode ensures given session id
* holds Exclusive lock on pg_dist_node.
*/
static void
EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid)
{
StringInfo checkIfParentLockCommandStr = makeStringInfo();
int spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
char *checkIfParentLockCommand = "SELECT pid FROM pg_locks WHERE "
"database = %d AND relation = %d AND "
"mode = 'ExclusiveLock' AND granted = TRUE";
appendStringInfo(checkIfParentLockCommandStr, checkIfParentLockCommand,
MyDatabaseId, DistNodeRelationId());
bool readOnly = true;
int spiQueryResult = SPI_execute(checkIfParentLockCommandStr->data, readOnly, 0);
if (spiQueryResult != SPI_OK_SELECT)
{
ereport(ERROR, (errmsg("execution was not successful \"%s\"",
checkIfParentLockCommandStr->data)));
}
bool parentHasExclusiveLock = SPI_processed > 0;
SPI_finish();
if (!parentHasExclusiveLock)
{
ereport(ERROR, (errmsg("lock is not held by the caller. Unexpected caller "
"for citus_internal_mark_node_not_synced")));
}
}
/*
* citus_internal_mark_node_not_synced unsets metadatasynced flag in separate
* connection to localhost. Should only be called by
* `MarkNodesNotSyncedInLoopBackConnection`. See it for details.
*/
Datum
citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
/* only called by superuser */
EnsureSuperUser();
pid_t parentSessionPid = PG_GETARG_INT32(0);
/* ensure that parent session holds Exclusive lock to pg_dist_node */
EnsureParentSessionHasExclusiveLockOnPgDistNode(parentSessionPid);
/*
* We made sure parent session holds the ExclusiveLock, so we can update
* pg_dist_node safely with low level lock here.
*/
int nodeId = PG_GETARG_INT32(1);
HeapTuple heapTuple = GetNodeByNodeId(nodeId);
if (heapTuple == NULL)
{
ereport(ERROR, (errmsg("could not find valid entry for node id %d", nodeId)));
}
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
CommandCounterIncrement();
table_close(pgDistNode, NoLock);
PG_RETURN_VOID();
}
/*
* FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL.
@ -2282,6 +2390,37 @@ GetNodeTuple(const char *nodeName, int32 nodePort)
}
/*
* GetNodeByNodeId returns the heap tuple for given node id by looking up catalog.
*/
static HeapTuple
GetNodeByNodeId(int32 nodeId)
{
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
const int scanKeyCount = 1;
const bool indexOK = false;
ScanKeyData scanKey[1];
HeapTuple nodeTuple = NULL;
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
nodeTuple = heap_copytuple(heapTuple);
}
systable_endscan(scanDescriptor);
table_close(pgDistNode, NoLock);
return nodeTuple;
}
/*
* GetNextGroupId allocates and returns a unique groupId for the group
* to be created. This allocation occurs both in shared memory and in write

View File

@ -9,3 +9,4 @@ ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint;
#include "udfs/worker_drop_all_shell_tables/11.3-1.sql"
#include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql"

View File

@ -19,3 +19,4 @@ ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;
DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool);
DROP FUNCTION pg_catalog.citus_internal_mark_node_not_synced(int, int);

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$;
COMMENT ON FUNCTION citus_internal_mark_node_not_synced(int, int)
IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.';

View File

@ -0,0 +1,6 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$;
COMMENT ON FUNCTION citus_internal_mark_node_not_synced(int, int)
IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.';

View File

@ -173,6 +173,8 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation"
#define WORKER_DROP_ALL_SHELL_TABLES \
"CALL pg_catalog.worker_drop_all_shell_tables(%s)"
#define CITUS_INTERNAL_MARK_NODE_NOT_SYNCED \
"SELECT citus_internal_mark_node_not_synced(%d, %d)"
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"

View File

@ -1363,11 +1363,12 @@ SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
---------------------------------------------------------------------
| function citus_internal_is_replication_origin_tracking_active() boolean
| function citus_internal_mark_node_not_synced(integer,integer) void
| function citus_internal_start_replication_origin_tracking() void
| function citus_internal_stop_replication_origin_tracking() void
| function worker_adjust_identity_column_seq_ranges(regclass) void
| function worker_drop_all_shell_tables(boolean)
(5 rows)
(6 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -76,6 +76,7 @@ ORDER BY 1;
function citus_internal_global_blocked_processes()
function citus_internal_is_replication_origin_tracking_active()
function citus_internal_local_blocked_processes()
function citus_internal_mark_node_not_synced(integer,integer)
function citus_internal_start_replication_origin_tracking()
function citus_internal_stop_replication_origin_tracking()
function citus_internal_update_placement_metadata(bigint,integer,integer)
@ -323,5 +324,5 @@ ORDER BY 1;
view citus_stat_statements
view pg_dist_shard_placement
view time_partitions
(314 rows)
(316 rows)