mirror of https://github.com/citusdata/citus.git
Remove metadata by checking isactive
parent
070e2afbe5
commit
7e3f2486f3
|
@ -510,6 +510,10 @@ citus_disable_node(PG_FUNCTION_ARGS)
|
||||||
workerNode->workerName,
|
workerNode->workerName,
|
||||||
nodePort)));
|
nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool forceRemoteDelete = false;
|
||||||
|
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
|
||||||
|
forceRemoteDelete);
|
||||||
}
|
}
|
||||||
|
|
||||||
TransactionModifiedNodeMetadata = true;
|
TransactionModifiedNodeMetadata = true;
|
||||||
|
@ -1180,11 +1184,22 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
|
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Delete replicated table placements from the coordinator's metadata,
|
||||||
|
* including remote ones if the node is inactive primary worker node.
|
||||||
|
*/
|
||||||
|
if (!NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode) && !workerNode->isActive)
|
||||||
|
{
|
||||||
|
; bool forceRemoteDelete = true;
|
||||||
|
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
|
||||||
|
forceRemoteDelete);
|
||||||
|
}
|
||||||
|
|
||||||
workerNode =
|
workerNode =
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
||||||
BoolGetDatum(isActive));
|
BoolGetDatum(isActive));
|
||||||
bool syncMetadata =
|
|
||||||
EnableMetadataSyncByDefault && NodeIsPrimary(workerNode);
|
bool syncMetadata = EnableMetadataSyncByDefault && NodeIsPrimary(workerNode);
|
||||||
|
|
||||||
if (syncMetadata)
|
if (syncMetadata)
|
||||||
{
|
{
|
||||||
|
@ -1196,17 +1211,6 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
BoolGetDatum(isActive));
|
BoolGetDatum(isActive));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Delete replicated table placements from the coordinator's metadata,
|
|
||||||
* including remote ones.
|
|
||||||
*/
|
|
||||||
if (syncMetadata && !NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
|
|
||||||
{
|
|
||||||
bool forceRemoteDelete = true;
|
|
||||||
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
|
|
||||||
forceRemoteDelete);
|
|
||||||
}
|
|
||||||
|
|
||||||
SetUpDistributedTableWithDependencies(workerNode);
|
SetUpDistributedTableWithDependencies(workerNode);
|
||||||
|
|
||||||
if (syncMetadata)
|
if (syncMetadata)
|
||||||
|
|
|
@ -468,8 +468,10 @@ SELECT shardid, nodename, nodeport
|
||||||
-- disable the first node
|
-- disable the first node
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
|
table pg_dist_node;
|
||||||
SELECT master_disable_node('localhost', :worker_1_port);
|
SELECT master_disable_node('localhost', :worker_1_port);
|
||||||
SELECT public.wait_until_metadata_sync(30000);
|
SELECT public.wait_until_metadata_sync(30000);
|
||||||
|
table pg_dist_node;
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
|
@ -481,6 +483,7 @@ SET citus.shard_replication_factor TO 1;
|
||||||
-- add two new shards and verify they are created at the other node
|
-- add two new shards and verify they are created at the other node
|
||||||
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset
|
||||||
SELECT master_create_empty_shard('numbers_append') AS shardid2 \gset
|
SELECT master_create_empty_shard('numbers_append') AS shardid2 \gset
|
||||||
|
table pg_dist_node;
|
||||||
|
|
||||||
COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid1);
|
COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid1);
|
||||||
5,7
|
5,7
|
||||||
|
@ -497,8 +500,16 @@ SELECT shardid, nodename, nodeport
|
||||||
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
|
||||||
|
|
||||||
-- add the node back
|
-- add the node back
|
||||||
SET client_min_messages TO ERROR;
|
table pg_dist_node;
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
table pg_dist_node;
|
||||||
|
table pg_dist_placement;
|
||||||
|
table pg_dist_shard;
|
||||||
|
table pg_dist_shard_placement;
|
||||||
|
\c - - - :master_port
|
||||||
|
set citus.log_remote_commands to true;
|
||||||
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
|
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
|
||||||
|
reset citus.log_remote_commands;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
RESET citus.shard_replication_factor;
|
RESET citus.shard_replication_factor;
|
||||||
-- add two new shards and verify they are created at both workers
|
-- add two new shards and verify they are created at both workers
|
||||||
|
|
|
@ -20,34 +20,34 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
-- Failure to set groupid in the worker
|
-- Failure to set groupid in the worker
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").cancel(' || :pid || ')');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^UPDATE pg_dist_local_group SET groupid").kill()');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
-- Failure to drop all tables in pg_dist_partition
|
-- Failure to drop all tables in pg_dist_partition
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").cancel(' || :pid || ')');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT worker_drop_distributed_table").kill()');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
-- Failure to truncate pg_dist_node in the worker
|
-- Failure to truncate pg_dist_node in the worker
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").cancel(' || :pid || ')');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^TRUNCATE pg_dist_node CASCADE").kill()');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
-- Failure to populate pg_dist_node in the worker
|
-- Failure to populate pg_dist_node in the worker
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").cancel(' || :pid || ')');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^INSERT INTO pg_dist_node").kill()');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
-- Verify that coordinator knows worker does not have valid metadata
|
-- Verify that coordinator knows worker does not have valid metadata
|
||||||
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
-- Verify we can sync metadata after unsuccessful attempts
|
-- Verify we can sync metadata after unsuccessful attempts
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
-- Check failures on DDL command propagation
|
-- Check failures on DDL command propagation
|
||||||
|
@ -98,7 +98,7 @@ SELECT count(*) FROM pg_dist_node;
|
||||||
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
|
||||||
|
|
||||||
-- turn metadata sync back on
|
-- turn metadata sync back on
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
|
|
||||||
SET SEARCH_PATH = mx_metadata_sync;
|
SET SEARCH_PATH = mx_metadata_sync;
|
||||||
DROP TABLE t1;
|
DROP TABLE t1;
|
||||||
|
|
|
@ -76,8 +76,10 @@ CREATE OR REPLACE FUNCTION pg_catalog.master_create_worker_shards(table_name tex
|
||||||
AS 'citus', $$master_create_worker_shards$$
|
AS 'citus', $$master_create_worker_shards$$
|
||||||
LANGUAGE C STRICT;
|
LANGUAGE C STRICT;
|
||||||
-- re-add the nodes to the cluster
|
-- re-add the nodes to the cluster
|
||||||
|
set citus.log_remote_commands to true;
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
SELECT 1 FROM master_add_node('localhost', :worker_1_port);
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||||
|
reset citus.log_remote_commands;
|
||||||
|
|
||||||
-- verify that a table can be created after the extension has been dropped and recreated
|
-- verify that a table can be created after the extension has been dropped and recreated
|
||||||
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
CREATE TABLE testtableddl(somecol int, distributecol text NOT NULL);
|
||||||
|
|
|
@ -107,7 +107,7 @@ DROP TABLE local_table;
|
||||||
-- Verify that all indexes got created on the master node and one of the workers
|
-- Verify that all indexes got created on the master node and one of the workers
|
||||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'lineitem_%';
|
SELECT count(*) FROM pg_indexes WHERE tablename = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem_%' ORDER BY relname LIMIT 1);
|
||||||
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash_%';
|
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_hash_%';
|
||||||
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_range_%';
|
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_range_%';
|
||||||
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append_%';
|
SELECT count(*) FROM pg_indexes WHERE tablename LIKE 'index_test_append_%';
|
||||||
|
@ -189,7 +189,7 @@ SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (
|
||||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SET citus.override_table_visibility TO FALSE;
|
SET citus.override_table_visibility TO FALSE;
|
||||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem_360000' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem%\d' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
||||||
SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname;
|
SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname;
|
||||||
|
|
||||||
-- create index that will conflict with master operations
|
-- create index that will conflict with master operations
|
||||||
|
|
Loading…
Reference in New Issue