mirror of https://github.com/citusdata/citus.git
Merge pull request #3068 from citusdata/fix_metadata_sync_locks
Don't block for locks in SyncMetadataToNodes()pull/3075/head
commit
c15ddfb63f
|
@ -1263,24 +1263,33 @@ DetachPartitionCommandList(void)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SyncMetadataToNodes tries recreating the metadata snapshot in the
|
* SyncMetadataToNodes tries recreating the metadata snapshot in the
|
||||||
* metadata workers that are out of sync. Returns false if synchronization
|
* metadata workers that are out of sync. Returns the result of
|
||||||
* to at least one of the workers fails.
|
* synchronization.
|
||||||
*/
|
*/
|
||||||
bool
|
MetadataSyncResult
|
||||||
SyncMetadataToNodes(void)
|
SyncMetadataToNodes(void)
|
||||||
{
|
{
|
||||||
List *workerList = NIL;
|
List *workerList = NIL;
|
||||||
ListCell *workerCell = NULL;
|
ListCell *workerCell = NULL;
|
||||||
bool result = true;
|
MetadataSyncResult result = METADATA_SYNC_SUCCESS;
|
||||||
|
|
||||||
if (!IsCoordinator())
|
if (!IsCoordinator())
|
||||||
{
|
{
|
||||||
return true;
|
return METADATA_SYNC_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
|
/*
|
||||||
|
* Request a RowExclusiveLock so we don't run concurrently with other
|
||||||
|
* functions updating pg_dist_node, but allow concurrency with functions
|
||||||
|
* which are just reading from pg_dist_node.
|
||||||
|
*/
|
||||||
|
if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock))
|
||||||
|
{
|
||||||
|
return METADATA_SYNC_FAILED_LOCK;
|
||||||
|
}
|
||||||
|
|
||||||
workerList = ActivePrimaryNodeList(NoLock);
|
workerList = ActivePrimaryNodeList(NoLock);
|
||||||
|
|
||||||
foreach(workerCell, workerList)
|
foreach(workerCell, workerList)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = lfirst(workerCell);
|
WorkerNode *workerNode = lfirst(workerCell);
|
||||||
|
@ -1288,9 +1297,10 @@ SyncMetadataToNodes(void)
|
||||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||||
{
|
{
|
||||||
bool raiseInterrupts = false;
|
bool raiseInterrupts = false;
|
||||||
|
|
||||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||||
{
|
{
|
||||||
result = false;
|
result = METADATA_SYNC_FAILED_SYNC;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -97,7 +97,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
|
||||||
{
|
{
|
||||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||||
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
if (targetWorkerSet == WORKERS_WITH_METADATA &&
|
||||||
!workerNode->hasMetadata)
|
(!workerNode->hasMetadata || !workerNode->metadataSynced))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -388,9 +388,18 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
}
|
}
|
||||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||||
{
|
{
|
||||||
metadataSyncFailed = !SyncMetadataToNodes();
|
MetadataSyncResult result = SyncMetadataToNodes();
|
||||||
|
metadataSyncFailed = (result != METADATA_SYNC_SUCCESS);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Notification means we had an attempt on synchronization
|
||||||
|
* without being blocked for pg_dist_node access.
|
||||||
|
*/
|
||||||
|
if (result != METADATA_SYNC_FAILED_LOCK)
|
||||||
|
{
|
||||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
PopActiveSnapshot();
|
PopActiveSnapshot();
|
||||||
CommitTransactionCommand();
|
CommitTransactionCommand();
|
||||||
|
|
|
@ -1633,6 +1633,11 @@ UnsetMetadataSyncedForAll(void)
|
||||||
HeapTuple heapTuple = NULL;
|
HeapTuple heapTuple = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Concurrent master_update_node() calls might iterate and try to update
|
||||||
|
* pg_dist_node in different orders. To protect against deadlock, we
|
||||||
|
* get an exclusive lock here.
|
||||||
|
*/
|
||||||
relation = heap_open(DistNodeRelationId(), ExclusiveLock);
|
relation = heap_open(DistNodeRelationId(), ExclusiveLock);
|
||||||
tupleDescriptor = RelationGetDescr(relation);
|
tupleDescriptor = RelationGetDescr(relation);
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
|
||||||
|
|
|
@ -20,6 +20,13 @@
|
||||||
extern int MetadataSyncInterval;
|
extern int MetadataSyncInterval;
|
||||||
extern int MetadataSyncRetryInterval;
|
extern int MetadataSyncRetryInterval;
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
METADATA_SYNC_SUCCESS = 0,
|
||||||
|
METADATA_SYNC_FAILED_LOCK = 1,
|
||||||
|
METADATA_SYNC_FAILED_SYNC = 2
|
||||||
|
} MetadataSyncResult;
|
||||||
|
|
||||||
/* Functions declarations for metadata syncing */
|
/* Functions declarations for metadata syncing */
|
||||||
extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort);
|
extern void StartMetadatSyncToNode(char *nodeNameString, int32 nodePort);
|
||||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||||
|
@ -43,7 +50,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
|
||||||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||||
extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
|
extern void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
|
||||||
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
|
extern void MarkNodeMetadataSynced(char *nodeName, int32 nodePort, bool synced);
|
||||||
extern bool SyncMetadataToNodes(void);
|
extern MetadataSyncResult SyncMetadataToNodes(void);
|
||||||
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
|
extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 nodePort,
|
||||||
char *nodeUser,
|
char *nodeUser,
|
||||||
List *commandList);
|
List *commandList);
|
||||||
|
|
|
@ -13,6 +13,19 @@ CREATE SCHEMA function_tests AUTHORIZATION functionuser;
|
||||||
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
|
CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
|
||||||
SET search_path TO function_tests;
|
SET search_path TO function_tests;
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
|
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
pg_reload_conf
|
||||||
|
----------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
-- Create and distribute a simple function
|
-- Create and distribute a simple function
|
||||||
CREATE FUNCTION add(integer, integer) RETURNS integer
|
CREATE FUNCTION add(integer, integer) RETURNS integer
|
||||||
AS 'select $1 + $2;'
|
AS 'select $1 + $2;'
|
||||||
|
@ -564,6 +577,13 @@ SET citus.shard_count TO 55;
|
||||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||||
ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with
|
ERROR: cannot distribute the function "add_with_param_names" since there is no table to colocate with
|
||||||
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
|
HINT: Provide a distributed table via "colocate_with" option to create_distributed_function()
|
||||||
|
-- sync metadata to workers for consistent results when clearing objects
|
||||||
|
SELECT wait_until_metadata_sync();
|
||||||
|
wait_until_metadata_sync
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- clear objects
|
-- clear objects
|
||||||
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
||||||
stop_metadata_sync_to_node
|
stop_metadata_sync_to_node
|
||||||
|
|
|
@ -9,6 +9,16 @@ CREATE SCHEMA function_tests2 AUTHORIZATION functionuser;
|
||||||
SET search_path TO function_tests;
|
SET search_path TO function_tests;
|
||||||
SET citus.shard_count TO 4;
|
SET citus.shard_count TO 4;
|
||||||
|
|
||||||
|
-- set sync intervals to less than 15s so wait_until_metadata_sync never times out
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_interval TO 3000;
|
||||||
|
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 500;
|
||||||
|
SELECT pg_reload_conf();
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'citus';
|
||||||
|
|
||||||
-- Create and distribute a simple function
|
-- Create and distribute a simple function
|
||||||
CREATE FUNCTION add(integer, integer) RETURNS integer
|
CREATE FUNCTION add(integer, integer) RETURNS integer
|
||||||
AS 'select $1 + $2;'
|
AS 'select $1 + $2;'
|
||||||
|
@ -292,6 +302,9 @@ SELECT create_distributed_function('add_polygons(polygon,polygon)', '$1', coloca
|
||||||
SET citus.shard_count TO 55;
|
SET citus.shard_count TO 55;
|
||||||
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
SELECT create_distributed_function('add_with_param_names(int, int)', 'val1');
|
||||||
|
|
||||||
|
-- sync metadata to workers for consistent results when clearing objects
|
||||||
|
SELECT wait_until_metadata_sync();
|
||||||
|
|
||||||
-- clear objects
|
-- clear objects
|
||||||
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
SELECT stop_metadata_sync_to_node(nodename,nodeport) FROM pg_dist_node WHERE isactive AND noderole = 'primary';
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue