Add parameter to cleanup metadata (#5055)

* Add parameter to cleanup metadata

* Set clear metadata default to true

* Add test for clearing metadata

* Separate test file for start/stop metadata syncing

* Fix stop_sync bug for secondary nodes

* Use PreventInTransactionBlock

* DRemovedebuggiing logs

* Remove relation not found logs from mx test

* Revert localGroupId when doing stop_sync

* Move metadata sync test to mx schedule

* Add test with name that needs to be quoted

* Add test for views and matviews

* Add test for distributed table with custom type

* Add comments to test

* Add test with stats, indexes and constraints

* Fix matview test

* Add test for dropped column

* Add notice messages to stop_metadata_sync

* Add coordinator check to stop metadat sync

* Revert local_group_id only if clearMetadata is true

* Add a final check to see the metadata is sane

* Remove the drop verbosity in test

* Remove table description tests from sync test

* Add stop sync to coordinator test

* Change the order in stop_sync

* Add test for hybrid (columnar+heap) partitioned table

* Change error to notice for stop sync to coordinator

* Sync at the end of the test to prevent any failures

* Add test case in a transaction block

* Remove relation not found tests
pull/5082/head
Ahmet Gedemenli 2021-07-01 16:23:53 +03:00 committed by GitHub
parent c932642e3b
commit 8bae58fdb7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 589 additions and 9 deletions

View File

@ -75,6 +75,7 @@ static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void); static bool HasMetadataWorkers(void);
static List * DetachPartitionCommandList(void); static List * DetachPartitionCommandList(void);
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError); static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
char *columnName); char *columnName);
static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,
@ -191,23 +192,53 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
EnsureSuperUser(); EnsureSuperUser();
PreventInTransactionBlock(true, "stop_metadata_sync_to_node");
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
bool clearMetadata = PG_GETARG_BOOL(2);
char *nodeNameString = text_to_cstring(nodeName); char *nodeNameString = text_to_cstring(nodeName);
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
WorkerNode *workerNode = FindWorkerNode(nodeNameString, nodePort); WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort);
if (workerNode == NULL) if (workerNode == NULL)
{ {
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node (%s,%d) does not exist", nodeNameString, nodePort))); errmsg("node (%s,%d) does not exist", nodeNameString, nodePort)));
} }
if (NodeIsCoordinator(workerNode))
{
ereport(NOTICE, (errmsg("node (%s,%d) is the coordinator and should have "
"metadata, skipping stopping the metadata sync",
nodeNameString, nodePort)));
PG_RETURN_VOID();
}
MarkNodeHasMetadata(nodeNameString, nodePort, false); MarkNodeHasMetadata(nodeNameString, nodePort, false);
MarkNodeMetadataSynced(nodeNameString, nodePort, false); MarkNodeMetadataSynced(nodeNameString, nodePort, false);
if (clearMetadata)
{
if (NodeIsPrimary(workerNode))
{
ereport(NOTICE, (errmsg("dropping metadata on the node (%s,%d)",
nodeNameString, nodePort)));
DropMetadataSnapshotOnNode(workerNode);
}
else
{
/*
* If this is a secondary node we can't actually clear metadata from it,
* we assume the primary node is cleared.
*/
ereport(NOTICE, (errmsg("(%s,%d) is a secondary node: to clear the metadata,"
" you should clear metadata from the primary node",
nodeNameString, nodePort)));
}
}
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -322,6 +353,28 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
} }
/*
* DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them
* to the worker given as parameter.
*/
static void
DropMetadataSnapshotOnNode(WorkerNode *workerNode)
{
char *extensionOwner = CitusExtensionOwnerName();
/* generate the queries which drop the metadata */
List *dropMetadataCommandList = MetadataDropCommands();
dropMetadataCommandList = lappend(dropMetadataCommandList,
LocalGroupIdUpdateCommand(0));
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName,
workerNode->workerPort,
extensionOwner,
dropMetadataCommandList);
}
/* /*
* MetadataCreateCommands returns list of queries that are * MetadataCreateCommands returns list of queries that are
* required to create the current metadata snapshot of the node that the * required to create the current metadata snapshot of the node that the

View File

@ -1,3 +1,8 @@
-- citus--10.1-1--10.2-1 -- citus--10.1-1--10.2-1
-- bump version to 10.2-1
DROP FUNCTION IF EXISTS pg_catalog.stop_metadata_sync_to_node(text, integer);
#include "udfs/stop_metadata_sync_to_node/10.2-1.sql"
#include "../../columnar/sql/columnar--10.1-1--10.2-1.sql" #include "../../columnar/sql/columnar--10.1-1--10.2-1.sql"

View File

@ -1,3 +1,12 @@
-- citus--10.2-1--10.1-1 -- citus--10.2-1--10.1-1
#include "../../../columnar/sql/downgrades/columnar--10.2-1--10.1-1.sql" #include "../../../columnar/sql/downgrades/columnar--10.2-1--10.1-1.sql"
DROP FUNCTION pg_catalog.stop_metadata_sync_to_node(text, integer, bool);
CREATE FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$stop_metadata_sync_to_node$$;
COMMENT ON FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer)
IS 'stop metadata sync to node';

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer, clear_metadata bool DEFAULT true)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$stop_metadata_sync_to_node$$;
COMMENT ON FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer, clear_metadata bool)
IS 'stop metadata sync to node';

View File

@ -0,0 +1,6 @@
CREATE FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer, clear_metadata bool DEFAULT true)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$stop_metadata_sync_to_node$$;
COMMENT ON FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodeport integer, clear_metadata bool)
IS 'stop metadata sync to node';

View File

@ -26,6 +26,7 @@ SELECT citus_add_local_table_to_metadata('citus_local_table');
-- first stop metadata sync to worker_1 -- first stop metadata sync to worker_1
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -5,12 +5,14 @@ SET citus.next_shard_id TO 2460000;
-- first stop the metadata syncing to the node do that drop column -- first stop the metadata syncing to the node do that drop column
-- is not propogated -- is not propogated
SELECT stop_metadata_sync_to_node('localhost',:worker_1_port); SELECT stop_metadata_sync_to_node('localhost',:worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost',:worker_2_port); SELECT stop_metadata_sync_to_node('localhost',:worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -137,6 +137,7 @@ SELECT master_copy_shard_placement(
transfer_mode := 'block_writes'); transfer_mode := 'block_writes');
ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied ERROR: Table 'mx_table' is streaming replicated. Shards of streaming replicated tables cannot be copied
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -490,6 +490,7 @@ SELECT nodename, nodeport FROM pg_dist_node WHERE nodename='localhost' AND nodep
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
-- check that added nodes are not propagated to nodes without metadata -- check that added nodes are not propagated to nodes without metadata
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -698,12 +699,14 @@ DELETE FROM pg_dist_node;
\c - - - :master_port \c - - - :master_port
SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup SET citus.enable_object_propagation TO off; -- prevent object propagation on add node during setup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -597,9 +597,11 @@ ALTER EXTENSION citus UPDATE TO '10.2-1';
SELECT * FROM print_extension_changes(); SELECT * FROM print_extension_changes();
previous_object | current_object previous_object | current_object
--------------------------------------------------------------------- ---------------------------------------------------------------------
function stop_metadata_sync_to_node(text,integer) void |
| function citus_internal.downgrade_columnar_storage(regclass) void | function citus_internal.downgrade_columnar_storage(regclass) void
| function citus_internal.upgrade_columnar_storage(regclass) void | function citus_internal.upgrade_columnar_storage(regclass) void
(2 rows) | function stop_metadata_sync_to_node(text,integer,boolean) void
(4 rows)
DROP TABLE prev_objects, extension_diff; DROP TABLE prev_objects, extension_diff;
-- show running version -- show running version

View File

@ -223,6 +223,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport = 8888;
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', 8888); SELECT stop_metadata_sync_to_node('localhost', 8888);
NOTICE: (localhost,8888) is a secondary node: to clear the metadata, you should clear metadata from the primary node
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -550,6 +551,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_1_port;
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -948,12 +950,14 @@ DROP TABLE mx_temp_drop_test;
SET citus.shard_count TO 3; SET citus.shard_count TO 3;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1282,6 +1286,7 @@ UPDATE pg_dist_placement
WHERE groupid = :old_worker_2_group; WHERE groupid = :old_worker_2_group;
\c - - - :master_port \c - - - :master_port
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1750,12 +1755,14 @@ drop cascades to default value for column id of table test_table
DROP TABLE test_table CASCADE; DROP TABLE test_table CASCADE;
-- Cleanup -- Cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -221,6 +221,7 @@ DROP TABLE mx_table_1;
DROP TABLE mx_table_2; DROP TABLE mx_table_2;
DROP TABLE mx_table_3; DROP TABLE mx_table_3;
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -559,12 +559,14 @@ SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE pron
-- we don't want other tests to have metadata synced -- we don't want other tests to have metadata synced
-- that might change the test outputs, so we're just trying to be careful -- that might change the test outputs, so we're just trying to be careful
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -223,6 +223,13 @@ SELECT r.a FROM ref r JOIN local_table lt on r.a = lt.a;
\c - - - :master_port \c - - - :master_port
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
SELECT stop_metadata_sync_to_node('localhost', :master_port);
NOTICE: node (localhost,57636) is the coordinator and should have metadata, skipping stopping the metadata sync
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT * FROM ref ORDER BY a; SELECT * FROM ref ORDER BY a;
a a
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -432,6 +432,7 @@ WARNING: warning
ERROR: error ERROR: error
\set VERBOSITY default \set VERBOSITY default
-- Test that we don't propagate to non-metadata worker nodes -- Test that we don't propagate to non-metadata worker nodes
SET client_min_messages TO WARNING;
select stop_metadata_sync_to_node('localhost', :worker_1_port); select stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -444,6 +445,7 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row) (1 row)
SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
DEBUG: there is no worker node with metadata DEBUG: there is no worker node with metadata
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_call.mx_call_dist_table_1 t1 JOIN multi_mx_call.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))

View File

@ -498,6 +498,7 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT test.x, r.cou
(0 rows) (0 rows)
-- Test that we don't propagate to non-metadata worker nodes -- Test that we don't propagate to non-metadata worker nodes
SET client_min_messages TO WARNING;
select stop_metadata_sync_to_node('localhost', :worker_1_port); select stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -510,6 +511,7 @@ select stop_metadata_sync_to_node('localhost', :worker_2_port);
(1 row) (1 row)
SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0); select mx_call_func(2, 0);
DEBUG: the worker node does not have metadata DEBUG: the worker node does not have metadata
DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id)))

View File

@ -658,6 +658,7 @@ SELECT 1 FROM master_disable_node('localhost', 1);
ERROR: Disabling localhost:xxxxx failed ERROR: Disabling localhost:xxxxx failed
-- try again after stopping metadata sync -- try again after stopping metadata sync
SELECT stop_metadata_sync_to_node('localhost', 1); SELECT stop_metadata_sync_to_node('localhost', 1);
NOTICE: dropping metadata on the node (localhost,1)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -722,6 +723,7 @@ SELECT 1 FROM master_disable_node('localhost', :worker_2_port);
ERROR: Disabling localhost:xxxxx failed ERROR: Disabling localhost:xxxxx failed
-- try again after stopping metadata sync -- try again after stopping metadata sync
SELECT stop_metadata_sync_to_node('localhost', 1); SELECT stop_metadata_sync_to_node('localhost', 1);
NOTICE: dropping metadata on the node (localhost,1)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -1037,6 +1037,7 @@ DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1; DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE; DROP SCHEMA remove_node_reference_table_schema CASCADE;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -1028,6 +1028,7 @@ WHERE ref_table.a = dist_table.a;
\c - - - :master_port \c - - - :master_port
SET search_path TO replicate_reference_table; SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -731,6 +731,7 @@ SELECT run_command_on_workers('DROP ROLE IF EXISTS seq_role_0, seq_role_1');
-- Check some cases when default is defined by -- Check some cases when default is defined by
-- DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name')) -- DEFAULT nextval('seq_name'::text) (not by DEFAULT nextval('seq_name'))
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -806,6 +807,7 @@ SELECT run_command_on_workers('DROP SCHEMA IF EXISTS sequence_default CASCADE');
(2 rows) (2 rows)
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -265,6 +265,7 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
(1 row) (1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -280,11 +281,20 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition; SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition;
worker_drop_distributed_table worker_drop_distributed_table
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows)
SELECT count(*) FROM pg_dist_partition;
count
---------------------------------------------------------------------
0
(1 row)
(2 rows) SELECT count(*) FROM pg_dist_node;
count
---------------------------------------------------------------------
0
(1 row)
DELETE FROM pg_dist_node;
\c - - - :worker_1_port \c - - - :worker_1_port
-- DROP TABLE -- DROP TABLE
-- terse verbosity because pg10 has slightly different output -- terse verbosity because pg10 has slightly different output
@ -371,6 +381,7 @@ ROLLBACK;
DROP TABLE mx_table; DROP TABLE mx_table;
DROP TABLE mx_table_2; DROP TABLE mx_table_2;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -0,0 +1,309 @@
CREATE SCHEMA start_stop_metadata_sync;
SET search_path TO "start_stop_metadata_sync";
SET citus.next_shard_id TO 980000;
SET client_min_messages TO WARNING;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- create a custom type for testing with a distributed table
CREATE TYPE tt2 AS ENUM ('a', 'b');
-- create test tables
CREATE TABLE distributed_table_1(col int unique, b tt2);
CREATE TABLE "distributed_table_2'! ?._"(col int unique);
CREATE TABLE distributed_table_3(col int);
CREATE TABLE distributed_table_4(a int UNIQUE NOT NULL, b int, c int);
CREATE TABLE reference_table_1(col int unique);
CREATE TABLE reference_table_2(col int unique);
CREATE TABLE local_table(col int unique);
-- create a fkey graph: dist -> dist -> ref1 <- local && ref1 -> ref2
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES "distributed_table_2'! ?._"(col);
ALTER TABLE "distributed_table_2'! ?._" ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_1(col);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_2(col);
ALTER TABLE local_table ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_1(col);
SELECT create_reference_table('reference_table_2');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('reference_table_1');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('"distributed_table_2''! ?._"', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('distributed_table_1', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('distributed_table_3', 'col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('distributed_table_4', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE INDEX ind1 ON distributed_table_4(a);
CREATE INDEX ind2 ON distributed_table_4(b);
CREATE INDEX ind3 ON distributed_table_4(a, b);
CREATE STATISTICS stat ON a,b FROM distributed_table_4;
-- create views to make sure that they'll continue working after stop_sync
INSERT INTO distributed_table_3 VALUES (1);
CREATE VIEW test_view AS SELECT COUNT(*) FROM distributed_table_3;
CREATE MATERIALIZED VIEW test_matview AS SELECT COUNT(*) FROM distributed_table_3;
ALTER TABLE distributed_table_4 DROP COLUMN c;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_2021_jan PARTITION OF events
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_2021_feb PARTITION OF events
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events', 'ts');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
alter_table_set_access_method
---------------------------------------------------------------------
(1 row)
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- this should fail
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: start_metadata_sync_to_node cannot run inside a transaction block
ROLLBACK;
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
col | b
---------------------------------------------------------------------
(0 rows)
CREATE VIEW test_view AS SELECT COUNT(*) FROM distributed_table_3;
CREATE MATERIALIZED VIEW test_matview AS SELECT COUNT(*) FROM distributed_table_3;
SELECT * FROM test_view;
count
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM test_matview;
count
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text;
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
events | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
events_2021_feb | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
events_2021_jan | h | {VAR :varno 1 :varattno 1 :vartype 1184 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1} | 1390012 | s
(3 rows)
SELECT count(*) > 0 FROM pg_dist_node;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_dist_shard;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
col | b
---------------------------------------------------------------------
(0 rows)
ALTER TABLE distributed_table_4 DROP COLUMN b;
-- this should fail
BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: stop_metadata_sync_to_node cannot run inside a transaction block
ROLLBACK;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT * FROM test_view;
count
---------------------------------------------------------------------
1
(1 row)
SELECT * FROM test_matview;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) > 0 FROM pg_dist_node;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_dist_shard;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :worker_1_port
SET search_path TO "start_stop_metadata_sync";
SELECT count(*) > 0 FROM pg_dist_node;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_dist_shard;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
f
(1 row)
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
col | b
---------------------------------------------------------------------
(0 rows)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_dist_shard;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
t
(1 row)
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
-- cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA start_stop_metadata_sync CASCADE;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)

View File

@ -173,7 +173,7 @@ ORDER BY 1;
function run_command_on_workers(text,boolean) function run_command_on_workers(text,boolean)
function shard_name(regclass,bigint) function shard_name(regclass,bigint)
function start_metadata_sync_to_node(text,integer) function start_metadata_sync_to_node(text,integer)
function stop_metadata_sync_to_node(text,integer) function stop_metadata_sync_to_node(text,integer,boolean)
function time_partition_range(regclass) function time_partition_range(regclass)
function truncate_local_data_after_distributing_table(regclass) function truncate_local_data_after_distributing_table(regclass)
function undistribute_table(regclass,boolean) function undistribute_table(regclass,boolean)

View File

@ -22,6 +22,7 @@ test: multi_test_catalog_views
# the following test has to be run sequentially # the following test has to be run sequentially
test: multi_mx_create_table test: multi_mx_create_table
test: start_stop_metadata_sync
test: multi_mx_hide_shard_names test: multi_mx_hide_shard_names
test: multi_mx_add_coordinator test: multi_mx_add_coordinator
test: multi_mx_modifications_to_reference_tables test: multi_mx_modifications_to_reference_tables

View File

@ -98,6 +98,9 @@ SELECT r.a FROM ref r JOIN local_table lt on r.a = lt.a;
\c - - - :master_port \c - - - :master_port
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
SELECT stop_metadata_sync_to_node('localhost', :master_port);
SELECT * FROM ref ORDER BY a; SELECT * FROM ref ORDER BY a;
-- Clear pg_dist_transaction before removing the node. This is to keep the output -- Clear pg_dist_transaction before removing the node. This is to keep the output

View File

@ -196,8 +196,10 @@ call multi_mx_call.mx_call_proc_raise(2);
\set VERBOSITY default \set VERBOSITY default
-- Test that we don't propagate to non-metadata worker nodes -- Test that we don't propagate to non-metadata worker nodes
SET client_min_messages TO WARNING;
select stop_metadata_sync_to_node('localhost', :worker_1_port); select stop_metadata_sync_to_node('localhost', :worker_1_port);
select stop_metadata_sync_to_node('localhost', :worker_2_port); select stop_metadata_sync_to_node('localhost', :worker_2_port);
SET client_min_messages TO DEBUG1;
call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc(2, 0);
SET client_min_messages TO NOTICE; SET client_min_messages TO NOTICE;
select start_metadata_sync_to_node('localhost', :worker_1_port); select start_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -222,8 +222,10 @@ WITH r AS (
) SELECT * FROM test, r, t WHERE t.c=0; ) SELECT * FROM test, r, t WHERE t.c=0;
-- Test that we don't propagate to non-metadata worker nodes -- Test that we don't propagate to non-metadata worker nodes
SET client_min_messages TO WARNING;
select stop_metadata_sync_to_node('localhost', :worker_1_port); select stop_metadata_sync_to_node('localhost', :worker_1_port);
select stop_metadata_sync_to_node('localhost', :worker_2_port); select stop_metadata_sync_to_node('localhost', :worker_2_port);
SET client_min_messages TO DEBUG1;
select mx_call_func(2, 0); select mx_call_func(2, 0);
SET client_min_messages TO NOTICE; SET client_min_messages TO NOTICE;
select start_metadata_sync_to_node('localhost', :worker_1_port); select start_metadata_sync_to_node('localhost', :worker_1_port);

View File

@ -171,7 +171,8 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
\c - - - :worker_2_port \c - - - :worker_2_port
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition; SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition;
DELETE FROM pg_dist_node; SELECT count(*) FROM pg_dist_partition;
SELECT count(*) FROM pg_dist_node;
\c - - - :worker_1_port \c - - - :worker_1_port
-- DROP TABLE -- DROP TABLE

View File

@ -0,0 +1,136 @@
CREATE SCHEMA start_stop_metadata_sync;
SET search_path TO "start_stop_metadata_sync";
SET citus.next_shard_id TO 980000;
SET client_min_messages TO WARNING;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
-- create a custom type for testing with a distributed table
CREATE TYPE tt2 AS ENUM ('a', 'b');
-- create test tables
CREATE TABLE distributed_table_1(col int unique, b tt2);
CREATE TABLE "distributed_table_2'! ?._"(col int unique);
CREATE TABLE distributed_table_3(col int);
CREATE TABLE distributed_table_4(a int UNIQUE NOT NULL, b int, c int);
CREATE TABLE reference_table_1(col int unique);
CREATE TABLE reference_table_2(col int unique);
CREATE TABLE local_table(col int unique);
-- create a fkey graph: dist -> dist -> ref1 <- local && ref1 -> ref2
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES "distributed_table_2'! ?._"(col);
ALTER TABLE "distributed_table_2'! ?._" ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_1(col);
ALTER TABLE reference_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_2(col);
ALTER TABLE local_table ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES reference_table_1(col);
SELECT create_reference_table('reference_table_2');
SELECT create_reference_table('reference_table_1');
SELECT create_distributed_table('"distributed_table_2''! ?._"', 'col');
SELECT create_distributed_table('distributed_table_1', 'col');
SELECT create_distributed_table('distributed_table_3', 'col');
SELECT create_distributed_table('distributed_table_4', 'a');
CREATE INDEX ind1 ON distributed_table_4(a);
CREATE INDEX ind2 ON distributed_table_4(b);
CREATE INDEX ind3 ON distributed_table_4(a, b);
CREATE STATISTICS stat ON a,b FROM distributed_table_4;
-- create views to make sure that they'll continue working after stop_sync
INSERT INTO distributed_table_3 VALUES (1);
CREATE VIEW test_view AS SELECT COUNT(*) FROM distributed_table_3;
CREATE MATERIALIZED VIEW test_matview AS SELECT COUNT(*) FROM distributed_table_3;
ALTER TABLE distributed_table_4 DROP COLUMN c;
-- test for hybrid partitioned table (columnar+heap)
CREATE TABLE events(ts timestamptz, i int, n numeric, s text)
PARTITION BY RANGE (ts);
CREATE TABLE events_2021_jan PARTITION OF events
FOR VALUES FROM ('2021-01-01') TO ('2021-02-01');
CREATE TABLE events_2021_feb PARTITION OF events
FOR VALUES FROM ('2021-02-01') TO ('2021-03-01');
INSERT INTO events SELECT
'2021-01-01'::timestamptz + '0.45 seconds'::interval * g,
g,
g*pi(),
'number: ' || g::text
FROM generate_series(1,1000) g;
VACUUM (FREEZE, ANALYZE) events_2021_feb;
SELECT create_distributed_table('events', 'ts');
SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- this should fail
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
CREATE VIEW test_view AS SELECT COUNT(*) FROM distributed_table_3;
CREATE MATERIALIZED VIEW test_matview AS SELECT COUNT(*) FROM distributed_table_3;
SELECT * FROM test_view;
SELECT * FROM test_matview;
SELECT * FROM pg_dist_partition WHERE logicalrelid::text LIKE 'events%' ORDER BY logicalrelid::text;
SELECT count(*) > 0 FROM pg_dist_node;
SELECT count(*) > 0 FROM pg_dist_shard;
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
ALTER TABLE distributed_table_4 DROP COLUMN b;
-- this should fail
BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT * FROM test_view;
SELECT * FROM test_matview;
SELECT count(*) > 0 FROM pg_dist_node;
SELECT count(*) > 0 FROM pg_dist_shard;
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
\c - - - :worker_1_port
SET search_path TO "start_stop_metadata_sync";
SELECT count(*) > 0 FROM pg_dist_node;
SELECT count(*) > 0 FROM pg_dist_shard;
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node;
SELECT count(*) > 0 FROM pg_dist_shard;
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
-- cleanup
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
SET client_min_messages TO WARNING;
DROP SCHEMA start_stop_metadata_sync CASCADE;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);