mirror of https://github.com/citusdata/citus.git
Don't block for locks in SyncMetadataToNodes()
parent
ae915493e6
commit
217db2a03e
|
@ -1263,24 +1263,28 @@ DetachPartitionCommandList(void)
|
|||
|
||||
/*
|
||||
* SyncMetadataToNodes tries recreating the metadata snapshot in the
|
||||
* metadata workers that are out of sync. Returns false if synchronization
|
||||
* to at least one of the workers fails.
|
||||
* metadata workers that are out of sync. Returns the result of
|
||||
* synchronization.
|
||||
*/
|
||||
bool
|
||||
MetadataSyncResult
|
||||
SyncMetadataToNodes(void)
|
||||
{
|
||||
List *workerList = NIL;
|
||||
ListCell *workerCell = NULL;
|
||||
bool result = true;
|
||||
MetadataSyncResult result = METADATA_SYNC_SUCCESS;
|
||||
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
return true;
|
||||
return METADATA_SYNC_SUCCESS;
|
||||
}
|
||||
|
||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
||||
if (!ConditionalLockRelationOid(DistNodeRelationId(), AccessShareLock))
|
||||
{
|
||||
return METADATA_SYNC_FAILED_LOCK;
|
||||
}
|
||||
|
||||
workerList = ActivePrimaryNodeList(NoLock);
|
||||
|
||||
foreach(workerCell, workerList)
|
||||
{
|
||||
WorkerNode *workerNode = lfirst(workerCell);
|
||||
|
@ -1288,9 +1292,20 @@ SyncMetadataToNodes(void)
|
|||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
bool raiseInterrupts = false;
|
||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||
|
||||
/*
|
||||
* Request a RowExclusiveLock so we don't run concurrently with other
|
||||
* functions updating pg_dist_node, but allow concurrency with functions
|
||||
* which are just reading from pg_dist_node.
|
||||
*/
|
||||
if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock))
|
||||
{
|
||||
result = false;
|
||||
result = METADATA_SYNC_FAILED_LOCK;
|
||||
break;
|
||||
}
|
||||
else if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||
{
|
||||
result = METADATA_SYNC_FAILED_SYNC;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -388,8 +388,17 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
}
|
||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||
{
|
||||
metadataSyncFailed = !SyncMetadataToNodes();
|
||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||
MetadataSyncResult result = SyncMetadataToNodes();
|
||||
metadataSyncFailed = (result != METADATA_SYNC_SUCCESS);
|
||||
|
||||
/*
|
||||
* Notification means we had an attempt on synchronization
|
||||
* without being blocked for pg_dist_node access.
|
||||
*/
|
||||
if (result != METADATA_SYNC_FAILED_LOCK)
|
||||
{
|
||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
PopActiveSnapshot();
|
||||
|
|
|
@ -1633,6 +1633,11 @@ UnsetMetadataSyncedForAll(void)
|
|||
HeapTuple heapTuple = NULL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
|
||||
/*
|
||||
* Concurrent master_update_node() calls might iterate and try to update
|
||||
* pg_dist_node in different orders. To protect against deadlock, we
|
||||
* get an exclusive lock here.
|
||||
*/
|
||||
relation = heap_open(DistNodeRelationId(), ExclusiveLock);
|
||||
tupleDescriptor = RelationGetDescr(relation);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
|
||||
|
|
|
@ -20,6 +20,13 @@
|
|||
extern int MetadataSyncInterval;
|
||||
extern int MetadataSyncRetryInterval;
|
||||
|
||||
typedef enum
|
||||
{
|
||||
METADATA_SYNC_SUCCESS = 0,
|
||||
METADATA_SYNC_FAILED_LOCK = 1,
|
||||
METADATA_SYNC_FAILED_SYNC = 2
|
||||
} MetadataSyncResult;
|
||||
|
||||
/* Functions declarations for metadata syncing */
|
||||
extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort);
|
||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||
|
@ -43,7 +50,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
|
|||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||
extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
|
||||
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
|
||||
extern bool SyncMetadataToNodes(void);
|
||||
extern MetadataSyncResult SyncMetadataToNodes(void);
|
||||
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
|
||||
char *nodeUser,
|
||||
List *commandList);
|
||||
|
|
|
@ -13,6 +13,19 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser;
|
|||
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
|
||||
SET search_path TO function_tests;
|
||||
SET citus.shard_count TO 4;
|
||||
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||
SELECT pg_reload_conf();
|
||||
pg_reload_conf
|
||||
----------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
-- Create and distribute a simple function
|
||||
CREATE FUNCTION add(integer, integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
|
@ -564,6 +577,13 @@ SET citus.shard_count TO 55;
|
|||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||
ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with
|
||||
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
|
||||
-- sync metadata to workers for consistent results when clearing objects
|
||||
SELECT wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- clear objects
|
||||
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
||||
stop_metadata_sync_to_node
|
||||
|
|
|
@ -9,6 +9,16 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
|
|||
SET search_path TO function_tests;
|
||||
SET citus.shard_count TO 4;
|
||||
|
||||
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
-- Create and distribute a simple function
|
||||
CREATE FUNCTION add(integer, integer) RETURNS integer
|
||||
AS 'select $1 + $2;'
|
||||
|
@ -292,6 +302,9 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca
|
|||
SET citus.shard_count TO 55;
|
||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||
|
||||
-- sync metadata to workers for consistent results when clearing objects
|
||||
SELECT wait_until_metadata_sync();
|
||||
|
||||
-- clear objects
|
||||
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
||||
|
||||
|
|
Loading…
Reference in New Issue