diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 1c0920841..8f1e38be1 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -1263,24 +1263,28 @@ DetachPartitionCommandList(void) /* * SyncMetadataToNodes tries recreating the metadata snapshot in the - * metadata workers that are out of sync. Returns false if synchronization - * to at least one of the workers fails. + * metadata workers that are out of sync. Returns the result of + * synchronization. */ -bool +MetadataSyncResult SyncMetadataToNodes(void) { List *workerList = NIL; ListCell *workerCell = NULL; - bool result = true; + MetadataSyncResult result = METADATA_SYNC_SUCCESS; if (!IsCoordinator()) { - return true; + return METADATA_SYNC_SUCCESS; } - LockRelationOid(DistNodeRelationId(), ExclusiveLock); + if (!ConditionalLockRelationOid(DistNodeRelationId(), AccessShareLock)) + { + return METADATA_SYNC_FAILED_LOCK; + } workerList = ActivePrimaryNodeList(NoLock); + foreach(workerCell, workerList) { WorkerNode *workerNode = lfirst(workerCell); @@ -1288,9 +1292,20 @@ SyncMetadataToNodes(void) if (workerNode->hasMetadata && !workerNode->metadataSynced) { bool raiseInterrupts = false; - if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) + + /* + * Request a RowExclusiveLock so we don't run concurrently with other + * functions updating pg_dist_node, but allow concurrency with functions + * which are just reading from pg_dist_node. + */ + if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock)) { - result = false; + result = METADATA_SYNC_FAILED_LOCK; + break; + } + else if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) + { + result = METADATA_SYNC_FAILED_SYNC; } else { diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 9b640c5d7..6edcb324e 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -388,8 +388,17 @@ CitusMaintenanceDaemonMain(Datum main_arg) } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { - metadataSyncFailed = !SyncMetadataToNodes(); - Async_Notify(METADATA_SYNC_CHANNEL, NULL); + MetadataSyncResult result = SyncMetadataToNodes(); + metadataSyncFailed = (result != METADATA_SYNC_SUCCESS); + + /* + * Notification means we had an attempt on synchronization + * without being blocked for pg_dist_node access. + */ + if (result != METADATA_SYNC_FAILED_LOCK) + { + Async_Notify(METADATA_SYNC_CHANNEL, NULL); + } } PopActiveSnapshot(); diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index 325fef9df..d91fc4f89 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -1633,6 +1633,11 @@ UnsetMetadataSyncedForAll(void) HeapTuple heapTuple = NULL; TupleDesc tupleDescriptor = NULL; + /* + * Concurrent master_update_node() calls might iterate and try to update + * pg_dist_node in different orders. To protect against deadlock, we + * get an exclusive lock here. + */ relation = heap_open(DistNodeRelationId(), ExclusiveLock); tupleDescriptor = RelationGetDescr(relation); ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata, diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index aaf79d8b1..3284cf5e2 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -20,6 +20,13 @@ extern int MetadataSyncInterval; extern int MetadataSyncRetryInterval; +typedef enum +{ + METADATA_SYNC_SUCCESS = 0, + METADATA_SYNC_FAILED_LOCK = 1, + METADATA_SYNC_FAILED_SYNC = 2 +} MetadataSyncResult; + /* Functions declarations for metadata syncing */ extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort); extern bool ClusterHasKnownMetadataWorkers(void); @@ -43,7 +50,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha extern void CreateTableMetadataOnWorkers(Oid relationId); extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced); -extern bool SyncMetadataToNodes(void); +extern MetadataSyncResult SyncMetadataToNodes(void); extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort, char *nodeUser, List *commandList); diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 41ba05440..7942bf846 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -13,6 +13,19 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser; CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; +-- set sync intervals to less than 15s so wait_until_metadata_sync never times out +ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; -- Create and distribute a simple function CREATE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 + $2;' @@ -564,6 +577,13 @@ SET citus.shard_count TO 55; SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with HINT: Provide a distributed table via "colocate_with" option to create_distributed_function() +-- sync metadata to workers for consistent results when clearing objects +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + -- clear objects SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary'; stop_metadata_sync_to_node diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index b22f8fc10..65b592b3b 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -9,6 +9,16 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser; SET search_path TO function_tests; SET citus.shard_count TO 4; +-- set sync intervals to less than 15s so wait_until_metadata_sync never times out +ALTER SYSTEM SET citus.metadata_sync_interval TO 3000; +ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500; +SELECT pg_reload_conf(); + +CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + -- Create and distribute a simple function CREATE FUNCTION add(integer, integer) RETURNS integer AS 'select $1 + $2;' @@ -292,6 +302,9 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca SET citus.shard_count TO 55; SELECT create_distributed_function('add_with_param_names(int, int)', 'val1'); +-- sync metadata to workers for consistent results when clearing objects +SELECT wait_until_metadata_sync(); + -- clear objects SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';