Adds "sync" option to citus_disable_node() UDF

Before this commit, we had:
```SQL
SELECT citus_disable_node(nodename, nodeport, force boolean DEFAULT false)
```

Where, we allow forcing to disable first worker node with
`force:=true`. However, it entails the risk for losing
data / diverging placement data etc.

With `force` flag, we control disabling the first worker node,
and with `async` flag we control whether the changes are done
via bg worker or immediately.

```SQL
SELECT citus_disable_node(nodename, nodeport, force boolean DEFAULT false, sync boolean DEFAULT false)
```

Where we can achieve all the following:

| Mode  | Data loss possibility | Can run in 2PC | Handle multiple node failures | Immediately effective |
| --- |--- |--- |--- |--- |
| force:false, sync: false  | false   | true  | true  | false |
| force:false, sync: true   | false  | false | false | true |
| force:true, sync: false   | true   | true  | true   | false |
| force:true, sync: true    | false  | false | false  | true |
pull/5912/head
Onder Kalaci 2022-04-25 10:58:13 +02:00
parent 69d007deec
commit db998b3d66
12 changed files with 178 additions and 34 deletions

View File

@ -97,6 +97,7 @@ static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static void CreateShellTableOnWorkers(Oid relationId);
static void CreateTableMetadataOnWorkers(Oid relationId);
static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey);
static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
@ -2237,16 +2238,16 @@ DetachPartitionCommandList(void)
/*
* SyncNodeMetadataToNodes tries recreating the metadata snapshot in the
* metadata workers that are out of sync. Returns the result of
* synchronization.
* SyncNodeMetadataToNodesOptional tries recreating the metadata
* snapshot in the metadata workers that are out of sync.
* Returns the result of synchronization.
*
* This function must be called within coordinated transaction
* since updates on the pg_dist_node metadata must be rollbacked if anything
* goes wrong.
*/
static NodeMetadataSyncResult
SyncNodeMetadataToNodes(void)
SyncNodeMetadataToNodesOptional(void)
{
NodeMetadataSyncResult result = NODE_METADATA_SYNC_SUCCESS;
if (!IsCoordinator())
@ -2306,6 +2307,46 @@ SyncNodeMetadataToNodes(void)
}
/*
* SyncNodeMetadataToNodes recreates the node metadata snapshot in all the
* metadata workers.
*
* This function runs within a coordinated transaction since updates on
* the pg_dist_node metadata must be rollbacked if anything
* goes wrong.
*/
void
SyncNodeMetadataToNodes(void)
{
EnsureCoordinator();
/*
* Request a RowExclusiveLock so we don't run concurrently with other
* functions updating pg_dist_node, but allow concurrency with functions
* which are just reading from pg_dist_node.
*/
if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock))
{
ereport(ERROR, (errmsg("cannot sync metadata because a concurrent "
"metadata syncing operation is in progress")));
}
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
if (workerNode->hasMetadata)
{
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
bool raiseOnError = true;
SyncNodeMetadataSnapshotToNode(workerNode, raiseOnError);
}
}
}
/*
* SyncNodeMetadataToNodesMain is the main function for syncing node metadata to
* MX nodes. It retries until success and then exits.
@ -2352,7 +2393,7 @@ SyncNodeMetadataToNodesMain(Datum main_arg)
{
UseCoordinatedTransaction();
NodeMetadataSyncResult result = SyncNodeMetadataToNodes();
NodeMetadataSyncResult result = SyncNodeMetadataToNodesOptional();
syncedAllNodes = (result == NODE_METADATA_SYNC_SUCCESS);
/* we use LISTEN/NOTIFY to wait for metadata syncing in tests */

View File

@ -453,6 +453,7 @@ citus_disable_node(PG_FUNCTION_ARGS)
text *nodeNameText = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
bool forceDisableNode = PG_GETARG_BOOL(2);
bool synchronousDisableNode = PG_GETARG_BOOL(3);
char *nodeName = text_to_cstring(nodeNameText);
WorkerNode *workerNode = ModifiableWorkerNode(nodeName, nodePort);
@ -483,11 +484,16 @@ citus_disable_node(PG_FUNCTION_ARGS)
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("disabling the first worker node in the "
"metadata is not allowed"),
errhint("You can force disabling node, but this operation "
"might cause replicated shards to diverge: SELECT "
"citus_disable_node('%s', %d, force:=true);",
workerNode->workerName,
nodePort)));
errhint("You can force disabling node, SELECT "
"citus_disable_node('%s', %d, force:=true, "
"synchronous:=true); "
"Passing synchronous:=false might cause replicated shards "
"to diverge.",workerNode->workerName, nodePort),
errdetail("Citus uses the first worker node in the "
"metadata for certain internal operations when "
"replicated tables are modified. Synchronous mode "
"ensures the first worker node is accurately "
"visible to all nodes.")));
}
/*
@ -524,20 +530,40 @@ citus_disable_node(PG_FUNCTION_ARGS)
TransactionModifiedNodeMetadata = true;
/*
* We have not propagated the node metadata changes yet, make sure that all the
* active nodes get the metadata updates. We defer this operation to the
* background worker to make it possible disabling nodes when multiple nodes
* are down.
*
* Note that the active placements reside on the active nodes. Hence, when
* Citus finds active placements, it filters out the placements that are on
* the disabled nodes. That's why, we don't have to change/sync placement
* metadata at this point. Instead, we defer that to citus_activate_node()
* where we expect all nodes up and running.
*/
if (UnsetMetadataSyncedForAllWorkers())
if (synchronousDisableNode)
{
/*
* The user might pick between sync vs async options.
* - Pros for the sync option:
* (a) the changes become visible on the cluster immediately
* (b) even if the first worker node is disabled, there is no
* risk of divergence of the placements of replicated shards
* - Cons for the sync options:
* (a) Does not work within 2PC transaction (e.g., BEGIN;
* citus_disable_node(); PREPARE TRANSACTION ...);
* (b) If there are multiple node failures (e.g., one another node
* than the current node being disabled), the sync option would
* fail because it'd try to sync the metadata changes to a node
* that is not up and running.
*
*/
SyncNodeMetadataToNodes();
}
else if (UnsetMetadataSyncedForAllWorkers())
{
/*
* We have not propagated the node metadata changes yet, make sure that all the
* active nodes get the metadata updates. We defer this operation to the
* background worker to make it possible disabling nodes when multiple nodes
* are down.
*
* Note that the active placements reside on the active nodes. Hence, when
* Citus finds active placements, it filters out the placements that are on
* the disabled nodes. That's why, we don't have to change/sync placement
* metadata at this point. Instead, we defer that to citus_activate_node()
* where we expect all nodes up and running.
*/
TriggerNodeMetadataSyncOnCommit();
}

View File

@ -1,3 +1,4 @@
#include "udfs/citus_shards_on_worker/11.0-2.sql"
#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql"
#include "udfs/citus_is_coordinator/11.0-2.sql"
#include "udfs/citus_disable_node/11.0-2.sql"

View File

@ -1,2 +1,4 @@
#include "../udfs/citus_shards_on_worker/11.0-1.sql"
#include "../udfs/citus_shard_indexes_on_worker/11.0-1.sql"
#include "../udfs/citus_disable_node/11.0-1.sql"

View File

@ -0,0 +1,9 @@
DROP FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool);
CREATE FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool default false, synchronous bool default false)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_disable_node$$;
COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool, synchronous bool)
IS 'removes node from the cluster temporarily';
REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool, bool) FROM PUBLIC;

View File

@ -1,9 +1,9 @@
DROP FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer);
CREATE FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool default false)
DROP FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool);
CREATE FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool default false, synchronous bool default false)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_disable_node$$;
COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool)
COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport integer, force bool, synchronous bool)
IS 'removes node from the cluster temporarily';
REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool) FROM PUBLIC;
REVOKE ALL ON FUNCTION pg_catalog.citus_disable_node(text,int, bool, bool) FROM PUBLIC;

View File

@ -65,6 +65,7 @@ extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId);
extern List * InterTableRelationshipOfRelationCommandList(Oid relationId);
extern List * DetachPartitionCommandList(void);
extern void SyncNodeMetadataToNodes(void);
extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner);
extern void SyncNodeMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig);

View File

@ -129,6 +129,54 @@ SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
1
(1 row)
-- disable node with sync/force options
SELECT citus_disable_node('localhost', :worker_1_port);
ERROR: disabling the first worker node in the metadata is not allowed
DETAIL: Citus uses the first worker node in the metadata for certain internal operations when replicated tables are modified. Synchronous mode ensures the first worker node is accurately visible to all nodes.
HINT: You can force disabling node, SELECT citus_disable_node('localhost', 57637, force:=true, synchronous:=true); Passing synchronous:=false might cause replicated shards to diverge.
SELECT citus_disable_node('localhost', :worker_1_port, force:=true, synchronous:=true);
citus_disable_node
---------------------------------------------------------------------
(1 row)
SELECT run_command_on_workers($$SELECT array_agg(isactive ORDER BY nodeport) FROM pg_dist_node WHERE hasmetadata and noderole='primary'::noderole AND nodecluster='default'$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57638,t,"{f,t}")
(1 row)
SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- disable node with sync/force options
SELECT citus_disable_node('localhost', :worker_2_port, synchronous:=true);
citus_disable_node
---------------------------------------------------------------------
(1 row)
SELECT run_command_on_workers($$SELECT array_agg(isactive ORDER BY nodeport) FROM pg_dist_node WHERE hasmetadata and noderole='primary'::noderole AND nodecluster='default'$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"{t,f}")
(1 row)
SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
SELECT create_distributed_table('cluster_management_test', 'col_1', 'hash');
create_distributed_table
@ -217,7 +265,7 @@ GRANT EXECUTE ON FUNCTION master_activate_node(text,int) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node(text,int,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node(text,int,bool,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node_and_wait(text,int,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user;

View File

@ -1034,10 +1034,12 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 11.0-2
ALTER EXTENSION citus UPDATE TO '11.0-2';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
| function citus_is_coordinator() boolean
(1 row)
function citus_disable_node(text,integer,boolean) void |
| function citus_disable_node(text,integer,boolean,boolean) void
| function citus_is_coordinator() boolean
(3 rows)
-- Snapshot of state at 11.1-1
ALTER EXTENSION citus UPDATE TO '11.1-1';

View File

@ -1780,7 +1780,8 @@ ERROR: localhost:xxxxx is a metadata node, but is out of sync
HINT: If the node is up, wait until metadata gets synced to it and try again.
SELECT citus_disable_node_and_wait('localhost', :worker_1_port);
ERROR: disabling the first worker node in the metadata is not allowed
HINT: You can force disabling node, but this operation might cause replicated shards to diverge: SELECT citus_disable_node('localhost', 57637, force:=true);
DETAIL: Citus uses the first worker node in the metadata for certain internal operations when replicated tables are modified. Synchronous mode ensures the first worker node is accurately visible to all nodes.
HINT: You can force disabling node, SELECT citus_disable_node('localhost', 57637, force:=true, synchronous:=true); Passing synchronous:=false might cause replicated shards to diverge.
CONTEXT: SQL statement "SELECT pg_catalog.citus_disable_node(nodename, nodeport, force)"
PL/pgSQL function citus_disable_node_and_wait(text,integer,boolean) line XX at PERFORM
SELECT citus_disable_node_and_wait('localhost', :worker_2_port);

View File

@ -47,7 +47,7 @@ ORDER BY 1;
function citus_coordinator_nodeid()
function citus_copy_shard_placement(bigint,text,integer,text,integer,boolean,citus.shard_transfer_mode)
function citus_create_restore_point(text)
function citus_disable_node(text,integer,boolean)
function citus_disable_node(text,integer,boolean,boolean)
function citus_dist_local_group_cache_invalidate()
function citus_dist_node_cache_invalidate()
function citus_dist_object_cache_invalidate()

View File

@ -56,6 +56,19 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
-- disable node with sync/force options
SELECT citus_disable_node('localhost', :worker_1_port);
SELECT citus_disable_node('localhost', :worker_1_port, force:=true, synchronous:=true);
SELECT run_command_on_workers($$SELECT array_agg(isactive ORDER BY nodeport) FROM pg_dist_node WHERE hasmetadata and noderole='primary'::noderole AND nodecluster='default'$$);
SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
-- disable node with sync/force options
SELECT citus_disable_node('localhost', :worker_2_port, synchronous:=true);
SELECT run_command_on_workers($$SELECT array_agg(isactive ORDER BY nodeport) FROM pg_dist_node WHERE hasmetadata and noderole='primary'::noderole AND nodecluster='default'$$);
SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
SELECT 1 FROM citus_activate_node('localhost', :worker_1_port);
CREATE TABLE cluster_management_test (col_1 text, col_2 int);
SELECT create_distributed_table('cluster_management_test', 'col_1', 'hash');
@ -99,7 +112,7 @@ GRANT EXECUTE ON FUNCTION master_activate_node(text,int) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_inactive_node(text,int,int,noderole,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_node(text,int,int,noderole,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_add_secondary_node(text,int,text,int,name) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node(text,int,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node(text,int,bool,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION citus_disable_node_and_wait(text,int,bool) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_remove_node(text,int) TO node_metadata_user;
GRANT EXECUTE ON FUNCTION master_update_node(int,text,int,bool,int) TO node_metadata_user;