diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 581c04c8a..da3e02853 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -48,6 +48,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" +#include "executor/spi.h" #include "lib/stringinfo.h" #include "postmaster/postmaster.h" #include "storage/bufmgr.h" @@ -94,6 +95,7 @@ static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata, bool *nodeAlreadyExists); static WorkerNode * SetNodeState(char *nodeName, int32 nodePort, bool isActive); static HeapTuple GetNodeTuple(const char *nodeName, int32 nodePort); +static HeapTuple GetNodeByNodeId(int32 nodeId); static int32 GetNextGroupId(void); static int GetNextNodeId(void); static void InsertPlaceholderCoordinatorRecord(void); @@ -125,7 +127,9 @@ static void SyncNodeMetadata(MetadataSyncContext *context); static void SetWorkerColumnViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode, int columnIndex, Datum value); -static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context); +static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t + parentSessionPid); +static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); @@ -152,6 +156,7 @@ PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid); PG_FUNCTION_INFO_V1(citus_nodeport_for_nodeid); PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); +PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); /* @@ -836,10 +841,11 @@ PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes) /* * MarkNodesNotSyncedInLoopBackConnection unsets metadatasynced flag in separate - * connection to localhost. + * connection to localhost by calling the udf `citus_internal_mark_node_not_synced`. */ static void -MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context) +MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t + parentSessionPid) { Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL); Assert(!MetadataSyncCollectsCommands(context)); @@ -865,16 +871,24 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context) MultiConnection *connection = GetNodeConnection(connectionFlag, LocalHostName, PostPortNumber); - bool metadatasynced = false; + List *commandList = NIL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, context->activatedWorkerNodeList) { - char *metadatasyncCommand = NodeHasmetadataUpdateCommand(workerNode->nodeId, - metadatasynced); - List *commandList = list_make1(metadatasyncCommand); - SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList); + /* + * We need to prevent self deadlock when we access pg_dist_node using separate + * connection to localhost. To achieve this, we check if the caller session's + * pid holds the Exclusive lock on pg_dist_node. After ensuring that (we are + * called from parent session which holds the Exclusive lock), we can safely + * update node metadata by acquiring lower level of lock. + */ + StringInfo metadatasyncCommand = makeStringInfo(); + appendStringInfo(metadatasyncCommand, CITUS_INTERNAL_MARK_NODE_NOT_SYNCED, + parentSessionPid, workerNode->nodeId); + commandList = lappend(commandList, metadatasyncCommand->data); } + SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList); CloseConnection(connection); } @@ -972,6 +986,18 @@ ActivateNodeList(MetadataSyncContext *context) */ EnsureSuperUser(); + /* + * Take an exclusive lock on pg_dist_node to serialize pg_dist_node + * changes. + */ + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + + /* + * Error if there is concurrent change to node table before acquiring + * the lock + */ + ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList); + /* * we need to unset metadatasynced flag to false at coordinator in separate * transaction only at nontransactional sync mode and if we do not collect @@ -984,22 +1010,9 @@ ActivateNodeList(MetadataSyncContext *context) if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL && !MetadataSyncCollectsCommands(context)) { - MarkNodesNotSyncedInLoopBackConnection(context); + MarkNodesNotSyncedInLoopBackConnection(context, MyProcPid); } - /* - * Take an exclusive lock on pg_dist_node to serialize pg_dist_node - * changes. We should not acquire the lock before deactivating - * metadata nodes as it causes deadlock. - */ - LockRelationOid(DistNodeRelationId(), ExclusiveLock); - - /* - * Error if there is concurrent change to node table before acquiring - * the lock - */ - ErrorIfAnyNodeNotExist(context->activatedWorkerNodeList); - /* * Delete existing reference and replicated table placements on the * given groupId if the group has been disabled earlier (e.g., isActive @@ -1496,6 +1509,101 @@ citus_is_coordinator(PG_FUNCTION_ARGS) } +/* + * EnsureParentSessionHasExclusiveLockOnPgDistNode ensures given session id + * holds Exclusive lock on pg_dist_node. + */ +static void +EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid) +{ + StringInfo checkIfParentLockCommandStr = makeStringInfo(); + + int spiConnectionResult = SPI_connect(); + if (spiConnectionResult != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + char *checkIfParentLockCommand = "SELECT pid FROM pg_locks WHERE " + "database = %d AND relation = %d AND " + "mode = 'ExclusiveLock' AND granted = TRUE"; + appendStringInfo(checkIfParentLockCommandStr, checkIfParentLockCommand, + MyDatabaseId, DistNodeRelationId()); + + bool readOnly = true; + int spiQueryResult = SPI_execute(checkIfParentLockCommandStr->data, readOnly, 0); + if (spiQueryResult != SPI_OK_SELECT) + { + ereport(ERROR, (errmsg("execution was not successful \"%s\"", + checkIfParentLockCommandStr->data))); + } + + bool parentHasExclusiveLock = SPI_processed > 0; + + SPI_finish(); + + if (!parentHasExclusiveLock) + { + ereport(ERROR, (errmsg("lock is not held by the caller. Unexpected caller " + "for citus_internal_mark_node_not_synced"))); + } +} + + +/* + * citus_internal_mark_node_not_synced unsets metadatasynced flag in separate + * connection to localhost. Should only be called by + * `MarkNodesNotSyncedInLoopBackConnection`. See it for details. + */ +Datum +citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + /* only called by superuser */ + EnsureSuperUser(); + + pid_t parentSessionPid = PG_GETARG_INT32(0); + + /* ensure that parent session holds Exclusive lock to pg_dist_node */ + EnsureParentSessionHasExclusiveLockOnPgDistNode(parentSessionPid); + + /* + * We made sure parent session holds the ExclusiveLock, so we can update + * pg_dist_node safely with low level lock here. + */ + int nodeId = PG_GETARG_INT32(1); + HeapTuple heapTuple = GetNodeByNodeId(nodeId); + if (heapTuple == NULL) + { + ereport(ERROR, (errmsg("could not find valid entry for node id %d", nodeId))); + } + + Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode); + + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + memset(replace, 0, sizeof(replace)); + values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false); + isnull[Anum_pg_dist_node_metadatasynced - 1] = false; + replace[Anum_pg_dist_node_metadatasynced - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + + CatalogTupleUpdate(pgDistNode, &heapTuple->t_self, heapTuple); + + CitusInvalidateRelcacheByRelid(DistNodeRelationId()); + CommandCounterIncrement(); + + table_close(pgDistNode, NoLock); + + PG_RETURN_VOID(); +} + + /* * FindWorkerNode searches over the worker nodes and returns the workerNode * if it already exists. Else, the function returns NULL. @@ -2282,6 +2390,37 @@ GetNodeTuple(const char *nodeName, int32 nodePort) } +/* + * GetNodeByNodeId returns the heap tuple for given node id by looking up catalog. + */ +static HeapTuple +GetNodeByNodeId(int32 nodeId) +{ + Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock); + const int scanKeyCount = 1; + const bool indexOK = false; + + ScanKeyData scanKey[1]; + HeapTuple nodeTuple = NULL; + + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid, + BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId)); + SysScanDesc scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + HeapTuple heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + nodeTuple = heap_copytuple(heapTuple); + } + + systable_endscan(scanDescriptor); + table_close(pgDistNode, NoLock); + + return nodeTuple; +} + + /* * GetNextGroupId allocates and returns a unique groupId for the group * to be created. This allocation occurs both in shared memory and in write diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index a6f7d7725..bbaf0ce4d 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -9,3 +9,4 @@ ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_ ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint; #include "udfs/worker_drop_all_shell_tables/11.3-1.sql" +#include "udfs/citus_internal_mark_node_not_synced/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index e89e54ab5..322613e5f 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -19,3 +19,4 @@ ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING; DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool); +DROP FUNCTION pg_catalog.citus_internal_mark_node_not_synced(int, int); diff --git a/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/11.3-1.sql b/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/11.3-1.sql new file mode 100644 index 000000000..0d90c8f1a --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/11.3-1.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$; +COMMENT ON FUNCTION citus_internal_mark_node_not_synced(int, int) + IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/latest.sql new file mode 100644 index 000000000..0d90c8f1a --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_mark_node_not_synced/latest.sql @@ -0,0 +1,6 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$; +COMMENT ON FUNCTION citus_internal_mark_node_not_synced(int, int) + IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.'; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 6f8254071..90253b13e 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -173,6 +173,8 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context); #define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation" #define WORKER_DROP_ALL_SHELL_TABLES \ "CALL pg_catalog.worker_drop_all_shell_tables(%s)" +#define CITUS_INTERNAL_MARK_NODE_NOT_SYNCED \ + "SELECT citus_internal_mark_node_not_synced(%d, %d)" #define REMOVE_ALL_CITUS_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition" diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 02021acd4..d0ed4f82a 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1363,11 +1363,12 @@ SELECT * FROM multi_extension.print_extension_changes(); previous_object | current_object --------------------------------------------------------------------- | function citus_internal_is_replication_origin_tracking_active() boolean + | function citus_internal_mark_node_not_synced(integer,integer) void | function citus_internal_start_replication_origin_tracking() void | function citus_internal_stop_replication_origin_tracking() void | function worker_adjust_identity_column_seq_ranges(regclass) void | function worker_drop_all_shell_tables(boolean) -(5 rows) +(6 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 92b91ccfc..a234c4bac 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -76,6 +76,7 @@ ORDER BY 1; function citus_internal_global_blocked_processes() function citus_internal_is_replication_origin_tracking_active() function citus_internal_local_blocked_processes() + function citus_internal_mark_node_not_synced(integer,integer) function citus_internal_start_replication_origin_tracking() function citus_internal_stop_replication_origin_tracking() function citus_internal_update_placement_metadata(bigint,integer,integer) @@ -323,5 +324,5 @@ ORDER BY 1; view citus_stat_statements view pg_dist_shard_placement view time_partitions -(314 rows) +(316 rows)