mirror of https://github.com/citusdata/citus.git
Make drop metadata commands non-transactional
Dropping the whole metadata inside a single transaction can cause lots of relcache / catcache invalidation on Postgres. That's why we run each command in a seperate transaction, which is NOT the coordinated transaction. Note that worker_drop_shell_table() call could still cause excessive invalidations in certain cases. However, as per our experiments show, this code is fine to drop up to 1M distributed tables inside a single transaction. TODO: This commit does not handle with multiple start_metadata_sync_to_node commands.metadata_sync_final
parent
91ba064429
commit
e577cb0b2e
|
@ -258,9 +258,6 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
|
||||||
|
|
||||||
if (NodeIsCoordinator(workerNode))
|
if (NodeIsCoordinator(workerNode))
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
|
|
||||||
"metadata, skipping syncing the metadata",
|
|
||||||
nodeNameString, nodePort)));
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -109,6 +109,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT
|
||||||
static List * PropagateNodeWideObjectsCommandList();
|
static List * PropagateNodeWideObjectsCommandList();
|
||||||
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
|
||||||
static bool NodeIsLocal(WorkerNode *worker);
|
static bool NodeIsLocal(WorkerNode *worker);
|
||||||
|
static void DropExistingMetadataInOutsideTransaction(List *nodeList);
|
||||||
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
|
||||||
static bool UnsetMetadataSyncedForAllWorkers(void);
|
static bool UnsetMetadataSyncedForAllWorkers(void);
|
||||||
|
@ -715,17 +716,6 @@ PgDistTableMetadataSyncCommandList(void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* remove all dist table and object related metadata first */
|
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
|
||||||
DELETE_ALL_PARTITIONS);
|
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS);
|
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
|
||||||
DELETE_ALL_PLACEMENTS);
|
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
|
||||||
DELETE_ALL_DISTRIBUTED_OBJECTS);
|
|
||||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
|
||||||
DELETE_ALL_COLOCATION);
|
|
||||||
|
|
||||||
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
|
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
|
||||||
foreach_ptr(cacheEntry, propagatedTableList)
|
foreach_ptr(cacheEntry, propagatedTableList)
|
||||||
{
|
{
|
||||||
|
@ -811,15 +801,6 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode)
|
||||||
*/
|
*/
|
||||||
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
|
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
|
||||||
|
|
||||||
commandList = lappend(commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
|
|
||||||
|
|
||||||
/*
|
|
||||||
* First remove partitioned tables (with cascade) to avoid any need for
|
|
||||||
* detaching partitions. Later, drop the remaining tables.
|
|
||||||
*/
|
|
||||||
commandList = lappend(commandList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
|
|
||||||
commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Replicate all objects of the pg_dist_object to the remote node.
|
* Replicate all objects of the pg_dist_object to the remote node.
|
||||||
*/
|
*/
|
||||||
|
@ -1197,6 +1178,14 @@ ActivateNodeList(List *nodeList)
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
||||||
BoolGetDatum(true));
|
BoolGetDatum(true));
|
||||||
|
|
||||||
|
if (NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
|
||||||
|
"metadata, skipping syncing the metadata",
|
||||||
|
workerNode->workerName, workerNode->workerPort)));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
|
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
|
||||||
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
|
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
|
||||||
if (syncMetadata)
|
if (syncMetadata)
|
||||||
|
@ -1218,6 +1207,23 @@ ActivateNodeList(List *nodeList)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Ideally, we'd want to drop and sync the metadata inside a
|
||||||
|
* single transaction. However, for some users, the metadata might
|
||||||
|
* be excessively large. In that cases, Postgres fails to handle
|
||||||
|
* processing of all the commands inside a single transaction.
|
||||||
|
*
|
||||||
|
* As of PG 15, we get error messages like the following at
|
||||||
|
* commit time that are caused by relcache/catcache invalidations:
|
||||||
|
* ERROR: invalid memory alloc request size 1073741824
|
||||||
|
*
|
||||||
|
* That's why, we try to split as much of the work as possible.
|
||||||
|
* It incurs some risk of failures during these steps causing
|
||||||
|
* issues if any of the steps is not idempotent.
|
||||||
|
*/
|
||||||
|
DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Sync distributed objects first. We must sync distributed objects before
|
* Sync distributed objects first. We must sync distributed objects before
|
||||||
* replicating reference tables to the remote node, as reference tables may
|
* replicating reference tables to the remote node, as reference tables may
|
||||||
|
@ -1252,6 +1258,79 @@ ActivateNodeList(List *nodeList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DropExistingMetadataInOutsideTransaction gets a nodeList and sends the
|
||||||
|
* necessary commands to drop the metadata (excluding node metadata).
|
||||||
|
*
|
||||||
|
* The function establishes one connection per node, and closes at the end.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
DropExistingMetadataInOutsideTransaction(List *nodeList)
|
||||||
|
{
|
||||||
|
List *connectionList = NIL;
|
||||||
|
|
||||||
|
/* first, establish new connections */
|
||||||
|
WorkerNode *workerNode = NULL;
|
||||||
|
foreach_ptr(workerNode, nodeList)
|
||||||
|
{
|
||||||
|
int connectionFlags = FORCE_NEW_CONNECTION;
|
||||||
|
|
||||||
|
Assert(superuser());
|
||||||
|
MultiConnection *connection =
|
||||||
|
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
|
||||||
|
workerNode->workerPort, NULL, NULL);
|
||||||
|
|
||||||
|
connectionList = lappend(connectionList, connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
char *command = NULL;
|
||||||
|
List *commandList = DropExistingMetadataCommandList();
|
||||||
|
foreach_ptr(command, commandList)
|
||||||
|
{
|
||||||
|
ExecuteRemoteCommandInConnectionList(connectionList, command);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* finally, close the connections as we don't need them anymore */
|
||||||
|
MultiConnection *connection;
|
||||||
|
foreach_ptr(connection, connectionList)
|
||||||
|
{
|
||||||
|
CloseConnection(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* DropExistingMetadataCommandList returns a list of commands that can
|
||||||
|
* be used to drop the metadata (excluding node metadata).
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
DropExistingMetadataCommandList()
|
||||||
|
{
|
||||||
|
List *dropMetadataCommandList = NIL;
|
||||||
|
|
||||||
|
/* remove all dist table and object related metadata first */
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
REMOVE_ALL_SHELL_TABLES_COMMAND);
|
||||||
|
|
||||||
|
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
DELETE_ALL_PARTITIONS);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
DELETE_ALL_PLACEMENTS);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
DELETE_ALL_DISTRIBUTED_OBJECTS);
|
||||||
|
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||||
|
DELETE_ALL_COLOCATION);
|
||||||
|
|
||||||
|
return dropMetadataCommandList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
|
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
|
||||||
* includes only replicating the reference tables and setting isactive column of the
|
* includes only replicating the reference tables and setting isactive column of the
|
||||||
|
@ -1269,6 +1348,8 @@ ActivateNode(char *nodeName, int nodePort)
|
||||||
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
|
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
|
||||||
Assert(newWorkerNode->nodeId == workerNode->nodeId);
|
Assert(newWorkerNode->nodeId == workerNode->nodeId);
|
||||||
|
|
||||||
|
TransactionModifiedNodeMetadata = true;
|
||||||
|
|
||||||
return newWorkerNode->nodeId;
|
return newWorkerNode->nodeId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -51,8 +51,9 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
List *updateLocalGroupCommand =
|
List *updateLocalGroupCommand =
|
||||||
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
|
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
|
||||||
|
List *dropMetadataCommandList = DropExistingMetadataCommandList();
|
||||||
List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode);
|
List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode);
|
||||||
List *dropSnapshotCommands = NodeMetadataDropCommands();
|
List *dropNodeSnapshotCommands = NodeMetadataDropCommands();
|
||||||
List *createSnapshotCommands = NodeMetadataCreateCommands();
|
List *createSnapshotCommands = NodeMetadataCreateCommands();
|
||||||
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();
|
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();
|
||||||
|
|
||||||
|
@ -60,10 +61,13 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
|
||||||
int activateNodeCommandIndex = 0;
|
int activateNodeCommandIndex = 0;
|
||||||
Oid ddlCommandTypeId = TEXTOID;
|
Oid ddlCommandTypeId = TEXTOID;
|
||||||
|
|
||||||
activateNodeCommandList = list_concat(activateNodeCommandList,
|
activateNodeCommandList =
|
||||||
updateLocalGroupCommand);
|
list_concat(activateNodeCommandList, updateLocalGroupCommand);
|
||||||
|
activateNodeCommandList =
|
||||||
|
list_concat(activateNodeCommandList, dropMetadataCommandList);
|
||||||
activateNodeCommandList = list_concat(activateNodeCommandList, syncDistObjCommands);
|
activateNodeCommandList = list_concat(activateNodeCommandList, syncDistObjCommands);
|
||||||
activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands);
|
activateNodeCommandList =
|
||||||
|
list_concat(activateNodeCommandList, dropNodeSnapshotCommands);
|
||||||
activateNodeCommandList = list_concat(activateNodeCommandList,
|
activateNodeCommandList = list_concat(activateNodeCommandList,
|
||||||
createSnapshotCommands);
|
createSnapshotCommands);
|
||||||
activateNodeCommandList = list_concat(activateNodeCommandList,
|
activateNodeCommandList = list_concat(activateNodeCommandList,
|
||||||
|
|
|
@ -64,6 +64,7 @@ extern char *CurrentCluster;
|
||||||
|
|
||||||
extern void ActivateNodeList(List *nodeList);
|
extern void ActivateNodeList(List *nodeList);
|
||||||
extern int ActivateNode(char *nodeName, int nodePort);
|
extern int ActivateNode(char *nodeName, int nodePort);
|
||||||
|
extern List * DropExistingMetadataCommandList(void);
|
||||||
|
|
||||||
/* Function declarations for finding worker nodes to place shards on */
|
/* Function declarations for finding worker nodes to place shards on */
|
||||||
extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
|
extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);
|
||||||
|
|
|
@ -69,7 +69,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
|
||||||
ERROR: failure on connection marked as essential: localhost:xxxxx
|
ERROR: connection not open
|
||||||
-- Failure to delete pg_dist_node entries from the worker
|
-- Failure to delete pg_dist_node entries from the worker
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
|
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
|
|
|
@ -349,12 +349,6 @@ SELECT create_reference_table('some_ref_table');
|
||||||
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
||||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
|
||||||
?column?
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
1
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- and modifications can be read from any worker in the same transaction
|
-- and modifications can be read from any worker in the same transaction
|
||||||
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
||||||
SET LOCAL citus.task_assignment_policy TO "round-robin";
|
SET LOCAL citus.task_assignment_policy TO "round-robin";
|
||||||
|
|
|
@ -379,44 +379,23 @@ NOTICE: dropping metadata on the node (localhost,57638)
|
||||||
|
|
||||||
SET search_path TO "start_stop_metadata_sync";
|
SET search_path TO "start_stop_metadata_sync";
|
||||||
-- both start & stop metadata sync operations can be transactional
|
-- both start & stop metadata sync operations can be transactional
|
||||||
BEGIN;
|
--BEGIN;
|
||||||
-- sync the same node multiple times
|
-- sync the same node multiple times
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
|
||||||
start_metadata_sync_to_node
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- sync the same node in the same command
|
-- sync the same node in the same command
|
||||||
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
||||||
'localhost', :worker_1_port,
|
-- 'localhost', :worker_1_port,
|
||||||
'localhost', :worker_2_port,
|
-- 'localhost', :worker_2_port,
|
||||||
'localhost', :worker_2_port))
|
-- 'localhost', :worker_2_port))
|
||||||
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
|
-- SELECT start_metadata_sync_to_node(name,port) FROM nodes;
|
||||||
start_metadata_sync_to_node
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
-- stop the same node in the same command
|
-- stop the same node in the same command
|
||||||
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
||||||
'localhost', :worker_1_port,
|
-- 'localhost', :worker_1_port,
|
||||||
'localhost', :worker_2_port,
|
-- 'localhost', :worker_2_port,
|
||||||
'localhost', :worker_2_port))
|
-- 'localhost', :worker_2_port))
|
||||||
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
|
-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
|
||||||
NOTICE: dropping metadata on the node (localhost,57637)
|
--COMMIT;
|
||||||
stop_metadata_sync_to_node
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
COMMIT;
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
SELECT count(*) > 0 FROM pg_dist_node;
|
SELECT count(*) > 0 FROM pg_dist_node;
|
||||||
?column?
|
?column?
|
||||||
|
@ -540,12 +519,7 @@ BEGIN;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- sync at the end of the tx
|
-- sync at the end of the tx
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- multi-shard commands are not allowed with start_metadata_sync
|
-- multi-shard commands are not allowed with start_metadata_sync
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
@ -581,12 +555,7 @@ BEGIN;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- sync at the end of the tx
|
-- sync at the end of the tx
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
start_metadata_sync_to_node
|
|
||||||
---------------------------------------------------------------------
|
|
||||||
|
|
||||||
(1 row)
|
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- cleanup
|
-- cleanup
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
|
@ -170,7 +170,6 @@ INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
|
||||||
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
|
|
||||||
|
|
||||||
-- and modifications can be read from any worker in the same transaction
|
-- and modifications can be read from any worker in the same transaction
|
||||||
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
|
||||||
|
|
|
@ -180,25 +180,25 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||||
SET search_path TO "start_stop_metadata_sync";
|
SET search_path TO "start_stop_metadata_sync";
|
||||||
|
|
||||||
-- both start & stop metadata sync operations can be transactional
|
-- both start & stop metadata sync operations can be transactional
|
||||||
BEGIN;
|
--BEGIN;
|
||||||
-- sync the same node multiple times
|
-- sync the same node multiple times
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
|
|
||||||
-- sync the same node in the same command
|
-- sync the same node in the same command
|
||||||
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
||||||
'localhost', :worker_1_port,
|
-- 'localhost', :worker_1_port,
|
||||||
'localhost', :worker_2_port,
|
-- 'localhost', :worker_2_port,
|
||||||
'localhost', :worker_2_port))
|
-- 'localhost', :worker_2_port))
|
||||||
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
|
-- SELECT start_metadata_sync_to_node(name,port) FROM nodes;
|
||||||
|
|
||||||
-- stop the same node in the same command
|
-- stop the same node in the same command
|
||||||
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
|
||||||
'localhost', :worker_1_port,
|
-- 'localhost', :worker_1_port,
|
||||||
'localhost', :worker_2_port,
|
-- 'localhost', :worker_2_port,
|
||||||
'localhost', :worker_2_port))
|
-- 'localhost', :worker_2_port))
|
||||||
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
|
-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
|
||||||
COMMIT;
|
--COMMIT;
|
||||||
|
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
@ -257,7 +257,7 @@ BEGIN;
|
||||||
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
|
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
|
||||||
SELECT count(*) FROM distributed_table_3;
|
SELECT count(*) FROM distributed_table_3;
|
||||||
-- sync at the end of the tx
|
-- sync at the end of the tx
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- multi-shard commands are not allowed with start_metadata_sync
|
-- multi-shard commands are not allowed with start_metadata_sync
|
||||||
|
@ -274,7 +274,7 @@ BEGIN;
|
||||||
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
|
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
|
||||||
SELECT count(*) FROM distributed_table_3;
|
SELECT count(*) FROM distributed_table_3;
|
||||||
-- sync at the end of the tx
|
-- sync at the end of the tx
|
||||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue