transactional metadata sync for maintanince daemon

As we use the current user to sync the metadata to the nodes
with #5105 (and many other PRs), there is no reason that
prevents us to use the coordinated transaction for metadata syncing.

This commit also renames few functions to reflect their actual
implementation.
pull/5144/head
Onder Kalaci 2021-07-28 13:15:00 +02:00
parent 999a236540
commit 5f02d18ef8
15 changed files with 528 additions and 86 deletions

View File

@ -119,7 +119,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
ddlCommands); ddlCommands);
} }
@ -312,7 +312,7 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
/* since we are executing ddl commands lets disable propagation, primarily for mx */ /* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort,
CitusExtensionOwnerName(), ddlCommands); CitusExtensionOwnerName(), ddlCommands);
} }

View File

@ -47,11 +47,13 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h" #include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/pg_dist_node.h" #include "distributed/pg_dist_node.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h" #include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -78,6 +80,7 @@
char *EnableManualMetadataChangesForUser = ""; char *EnableManualMetadataChangesForUser = "";
static void EnsureSequentialModeMetadataOperations(void);
static List * GetDistributedTableDDLEvents(Oid relationId); static List * GetDistributedTableDDLEvents(Oid relationId);
static char * LocalGroupIdUpdateCommand(int32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort, static void UpdateDistNodeBoolAttr(const char *nodeName, int32 nodePort,
@ -175,7 +178,7 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
EnsureSuperUser(); EnsureSuperUser();
EnsureModificationsCanRun(); EnsureModificationsCanRun();
PreventInTransactionBlock(true, "start_metadata_sync_to_node"); EnsureSequentialModeMetadataOperations();
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -222,6 +225,50 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
} }
/*
* EnsureSequentialModeMetadataOperations makes sure that the current transaction is
* already in sequential mode, or can still safely be put in sequential mode,
* it errors if that is not possible. The error contains information for the user to
* retry the transaction with sequential mode set from the begining.
*
* Metadata objects (e.g., distributed table on the workers) exists only 1 instance of
* the type used by potentially multiple other shards/connections. To make sure all
* shards/connections in the transaction can interact with the metadata needs to be
* visible on all connections used by the transaction, meaning we can only use 1
* connection per node.
*/
static void
EnsureSequentialModeMetadataOperations(void)
{
if (!IsTransactionBlock())
{
/* we do not need to switch to sequential mode if we are not in a transaction */
return;
}
if (ParallelQueryExecutedInTransaction())
{
ereport(ERROR, (errmsg(
"cannot execute metadata syncing operation because there was a "
"parallel operation on a distributed table in the "
"transaction"),
errdetail("When modifying metadata, Citus needs to "
"perform all operations over a single connection per "
"node to ensure consistency."),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail("Metadata synced or stopped syncing. To make "
"sure subsequent commands see the metadata correctly "
"we need to make sure to use only one connection for "
"all future commands")));
SetLocalMultiShardModifyModeToSequential();
}
/* /*
* stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
* to false in pg_dist_node table, thus indicating that the specified worker node does not * to false in pg_dist_node table, thus indicating that the specified worker node does not
@ -233,7 +280,6 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
EnsureCoordinator(); EnsureCoordinator();
EnsureSuperUser(); EnsureSuperUser();
PreventInTransactionBlock(true, "stop_metadata_sync_to_node");
text *nodeName = PG_GETARG_TEXT_P(0); text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1); int32 nodePort = PG_GETARG_INT32(1);
@ -376,7 +422,7 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
*/ */
if (raiseOnError) if (raiseOnError)
{ {
SendCommandListToWorkerInSingleTransaction(workerNode->workerName, SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
recreateMetadataSnapshotCommandList); recreateMetadataSnapshotCommandList);
@ -385,10 +431,10 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
else else
{ {
bool success = bool success =
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, SendOptionalCommandListToWorkerInCoordinatedTransaction(
workerNode->workerPort, workerNode->workerName, workerNode->workerPort,
currentUser, currentUser, recreateMetadataSnapshotCommandList);
recreateMetadataSnapshotCommandList);
return success; return success;
} }
} }
@ -401,6 +447,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
static void static void
DropMetadataSnapshotOnNode(WorkerNode *workerNode) DropMetadataSnapshotOnNode(WorkerNode *workerNode)
{ {
EnsureSequentialModeMetadataOperations();
char *userName = CurrentUserName(); char *userName = CurrentUserName();
/* generate the queries which drop the metadata */ /* generate the queries which drop the metadata */
@ -409,7 +457,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
dropMetadataCommandList = lappend(dropMetadataCommandList, dropMetadataCommandList = lappend(dropMetadataCommandList,
LocalGroupIdUpdateCommand(0)); LocalGroupIdUpdateCommand(0));
SendOptionalCommandListToWorkerInTransaction(workerNode->workerName, SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
userName, userName,
dropMetadataCommandList); dropMetadataCommandList);

View File

@ -615,7 +615,7 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers*/ /* send commands to new workers*/
SendCommandListToWorkerInSingleTransaction(newWorkerNode->workerName, SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort, newWorkerNode->workerPort,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
ddlCommands); ddlCommands);

View File

@ -753,7 +753,7 @@ RepairShardPlacement(int64 shardId, const char *sourceNodeName, int32 sourceNode
} }
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort, tableOwner,
ddlCommandList); ddlCommandList);
/* after successful repair, we update shard state as healthy*/ /* after successful repair, we update shard state as healthy*/
@ -954,7 +954,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName, List *ddlCommandList = RecreateShardDDLCommandList(shardInterval, sourceNodeName,
sourceNodePort); sourceNodePort);
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
ddlCommandList = NIL; ddlCommandList = NIL;
@ -973,7 +973,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
ddlCommandList, ddlCommandList,
PostLoadShardCreationCommandList(shardInterval, sourceNodeName, PostLoadShardCreationCommandList(shardInterval, sourceNodeName,
sourceNodePort)); sourceNodePort));
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);
@ -1007,7 +1007,7 @@ CopyShardTablesViaBlockWrites(List *shardIntervalList, char *sourceNodeName,
} }
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerOutsideTransaction(targetNodeName, targetNodePort,
tableOwner, commandList); tableOwner, commandList);
MemoryContextReset(localContext); MemoryContextReset(localContext);

View File

@ -275,7 +275,7 @@ TryDropShard(GroupShardPlacement *placement)
/* remove the shard from the node */ /* remove the shard from the node */
bool success = bool success =
SendOptionalCommandListToWorkerInTransaction(shardPlacement->nodeName, SendOptionalCommandListToWorkerOutsideTransaction(shardPlacement->nodeName,
shardPlacement->nodePort, shardPlacement->nodePort,
NULL, dropCommandList); NULL, dropCommandList);
if (success) if (success)

View File

@ -434,13 +434,13 @@ EnsureNoModificationsHaveBeenDone()
/* /*
* SendCommandListToWorkerInSingleTransaction opens connection to the node with the given * SendCommandListToWorkerOutsideTransaction forces to open a new connection
* nodeName and nodePort. Then, the connection starts a transaction on the remote * to the node with the given nodeName and nodePort. Then, the connection starts
* node and executes the commands in the transaction. The function raises error if * a transaction on the remote node and executes the commands in the transaction.
* any of the queries fails. * The function raises error if any of the queries fails.
*/ */
void void
SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort, SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList) const char *nodeUser, List *commandList)
{ {
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
@ -465,12 +465,42 @@ SendCommandListToWorkerInSingleTransaction(const char *nodeName, int32 nodePort,
/* /*
* SendOptionalCommandListToWorkerInTransaction sends the given command list to * SendCommandListToWorkerInCoordinatedTransaction opens connection to the node
* the given worker in a single transaction. If any of the commands fail, it * with the given nodeName and nodePort. The commands are sent as part of the
* rollbacks the transaction, and otherwise commits. * coordinated transaction. Any failures aborts the coordinated transaction.
*/
void
SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList)
{
int connectionFlags = 0;
UseCoordinatedTransaction();
MultiConnection *workerConnection = GetNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort,
nodeUser, NULL);
MarkRemoteTransactionCritical(workerConnection);
RemoteTransactionBeginIfNecessary(workerConnection);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
ExecuteCriticalRemoteCommand(workerConnection, commandString);
}
}
/*
* SendOptionalCommandListToWorkerOutsideTransaction sends the given command
* list to the given worker in a single transaction that is outside of the
* coordinated tranaction. If any of the commands fail, it rollbacks the
* transaction, and otherwise commits.
*/ */
bool bool
SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
const char *nodeUser, List *commandList) const char *nodeUser, List *commandList)
{ {
int connectionFlags = FORCE_NEW_CONNECTION; int connectionFlags = FORCE_NEW_CONNECTION;
@ -511,6 +541,51 @@ SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePor
} }
/*
* SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given
* command list to the given worker as part of the coordinated transaction.
* If any of the commands fail, the function returns false.
*/
bool
SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32
nodePort,
const char *nodeUser,
List *commandList)
{
int connectionFlags = 0;
bool failed = false;
UseCoordinatedTransaction();
MultiConnection *workerConnection =
GetNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
nodeUser, NULL);
if (PQstatus(workerConnection->pgConn) != CONNECTION_OK)
{
return false;
}
RemoteTransactionsBeginIfNecessary(list_make1(workerConnection));
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
if (ExecuteOptionalRemoteCommand(workerConnection, commandString, NULL) !=
RESPONSE_OKAY)
{
failed = true;
bool raiseErrors = false;
MarkRemoteTransactionFailed(workerConnection, raiseErrors);
break;
}
}
return !failed;
}
/* /*
* ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given * ErrorIfAnyMetadataNodeOutOfSync raises an error if any of the given
* metadata nodes are out of sync. It is safer to avoid metadata changing * metadata nodes are out of sync. It is safer to avoid metadata changing

View File

@ -370,7 +370,7 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
nodePort))); nodePort)));
EnsureNoModificationsHaveBeenDone(); EnsureNoModificationsHaveBeenDone();
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
ddlCommandList); ddlCommandList);
int32 groupId = GroupForNode(nodeName, nodePort); int32 groupId = GroupForNode(nodeName, nodePort);
@ -570,7 +570,7 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort)
char *tableOwner = TableOwner(shardInterval->relationId); char *tableOwner = TableOwner(shardInterval->relationId);
List *commandList = CopyShardForeignConstraintCommandList(shardInterval); List *commandList = CopyShardForeignConstraintCommandList(shardInterval);
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner,
commandList); commandList);
} }
} }

View File

@ -37,14 +37,22 @@ extern void SendCommandToWorkersAsUser(TargetWorkerSet targetWorkerSet,
const char *nodeUser, const char *command); const char *nodeUser, const char *command);
extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort, extern void SendCommandToWorkerAsUser(const char *nodeName, int32 nodePort,
const char *nodeUser, const char *command); const char *nodeUser, const char *command);
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName,
nodePort, int32 nodePort,
const char *nodeUser,
List *commandList);
extern bool SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerInSingleTransaction(const char *nodeName, extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort,
const char *nodeUser,
List *commandList);
extern void SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);

View File

@ -187,6 +187,12 @@ WARNING: server closed the connection unexpectedly
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -216,6 +222,12 @@ WARNING: server closed the connection unexpectedly
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -245,6 +257,12 @@ WARNING: server closed the connection unexpectedly
before or while processing the request. before or while processing the request.
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx
WARNING: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -470,11 +470,15 @@ SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table':
1 1
(1 row) (1 row)
-- Make sure that start_metadata_sync_to_node cannot be called inside a transaction -- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked
\c - - - :master_port \c - - - :master_port
BEGIN; BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_2_port); SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
ERROR: start_metadata_sync_to_node cannot run inside a transaction block start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
ROLLBACK; ROLLBACK;
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
hasmetadata hasmetadata

View File

@ -338,13 +338,42 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Test updating a node when another node is in readonly-mode -- Test updating a node when another node is in readonly-mode
--------------------------------------------------------------------- ---------------------------------------------------------------------
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset -- first, add node and sync metadata in the same transaction
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); CREATE TYPE some_type AS (a int, b int);
CREATE TABLE some_ref_table (a int, b some_type);
SELECT create_reference_table('some_ref_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
(1 row) (1 row)
-- and modifications can be read from any worker in the same transaction
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM some_ref_table;
count
---------------------------------------------------------------------
22
(1 row)
SELECT count(*) FROM some_ref_table;
count
---------------------------------------------------------------------
22
(1 row)
COMMIT;
DROP TABLE some_ref_table;
DROP TYPE some_type;
-- Create a table with shards on both nodes -- Create a table with shards on both nodes
CREATE TABLE dist_table_2(a int); CREATE TABLE dist_table_2(a int);
SELECT create_distributed_table('dist_table_2', 'a'); SELECT create_distributed_table('dist_table_2', 'a');

View File

@ -91,11 +91,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
(1 row) (1 row)
VACUUM (FREEZE, ANALYZE) events_2021_jan; VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- this should fail
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: start_metadata_sync_to_node cannot run inside a transaction block
ROLLBACK;
-- sync metadata -- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node start_metadata_sync_to_node
@ -164,18 +159,15 @@ SELECT * FROM distributed_table_1;
(0 rows) (0 rows)
ALTER TABLE distributed_table_4 DROP COLUMN b; ALTER TABLE distributed_table_4 DROP COLUMN b;
-- this should fail
BEGIN; BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: stop_metadata_sync_to_node cannot run inside a transaction block
ROLLBACK;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637) NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
COMMIT;
SELECT * FROM test_view; SELECT * FROM test_view;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -245,12 +237,14 @@ SELECT * FROM distributed_table_1;
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node start_metadata_sync_to_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
COMMIT;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node; SELECT count(*) > 0 FROM pg_dist_node;
?column? ?column?
@ -278,7 +272,180 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel
\c - - - :master_port \c - - - :master_port
SET search_path TO "start_stop_metadata_sync"; SET search_path TO "start_stop_metadata_sync";
-- both start & stop metadata sync operations can be transactional
BEGIN;
-- sync the same node multiple times
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- sync the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- stop the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
COMMIT;
\c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_dist_shard;
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
?column?
---------------------------------------------------------------------
f
(1 row)
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
-- start metadata sync sets the multi-shard modify mode to sequential
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
show citus.multi_shard_modify_mode;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
sequential
(1 row)
COMMIT;
-- stop metadata sync sets the multi-shard modify mode to sequential
BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
show citus.multi_shard_modify_mode;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
sequential
(1 row)
COMMIT;
-- multi-connection commands are not allowed with start_metadata_sync
BEGIN;
SET citus.force_max_query_parallelization TO ON;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ERROR: cannot execute metadata syncing operation because there was a parallel operation on a distributed table in the transaction
DETAIL: When modifying metadata, Citus needs to perform all operations over a single connection per node to ensure consistency.
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET citus.multi_shard_modify_mode TO sequential;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
count
---------------------------------------------------------------------
101
(1 row)
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
count
---------------------------------------------------------------------
1
(1 row)
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
ROLLBACK;
-- cleanup -- cleanup
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637) NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node stop_metadata_sync_to_node

View File

@ -168,7 +168,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
relid = 'mx_testing_schema.mx_index'::regclass; relid = 'mx_testing_schema.mx_index'::regclass;
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass; SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
-- Make sure that start_metadata_sync_to_node cannot be called inside a transaction -- Make sure that start_metadata_sync_to_node can be called inside a transaction and rollbacked
\c - - - :master_port \c - - - :master_port
BEGIN; BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_2_port); SELECT start_metadata_sync_to_node('localhost', :worker_2_port);

View File

@ -162,8 +162,26 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
-------------------------------------------------------------------------- --------------------------------------------------------------------------
-- Test updating a node when another node is in readonly-mode -- Test updating a node when another node is in readonly-mode
-------------------------------------------------------------------------- --------------------------------------------------------------------------
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); -- first, add node and sync metadata in the same transaction
CREATE TYPE some_type AS (a int, b int);
CREATE TABLE some_ref_table (a int, b some_type);
SELECT create_reference_table('some_ref_table');
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
-- and modifications can be read from any worker in the same transaction
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
SET LOCAL citus.task_assignment_policy TO "round-robin";
SELECT count(*) FROM some_ref_table;
SELECT count(*) FROM some_ref_table;
COMMIT;
DROP TABLE some_ref_table;
DROP TYPE some_type;
-- Create a table with shards on both nodes -- Create a table with shards on both nodes
CREATE TABLE dist_table_2(a int); CREATE TABLE dist_table_2(a int);

View File

@ -68,11 +68,6 @@ SELECT alter_table_set_access_method('events_2021_jan', 'columnar');
VACUUM (FREEZE, ANALYZE) events_2021_jan; VACUUM (FREEZE, ANALYZE) events_2021_jan;
-- this should fail
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- sync metadata -- sync metadata
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
@ -93,12 +88,10 @@ SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1; SELECT * FROM distributed_table_1;
ALTER TABLE distributed_table_4 DROP COLUMN b; ALTER TABLE distributed_table_4 DROP COLUMN b;
-- this should fail
BEGIN; BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK; COMMIT;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT * FROM test_view; SELECT * FROM test_view;
SELECT * FROM test_matview; SELECT * FROM test_matview;
SELECT count(*) > 0 FROM pg_dist_node; SELECT count(*) > 0 FROM pg_dist_node;
@ -116,7 +109,9 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel
\c - - - :master_port \c - - - :master_port
SET search_path TO "start_stop_metadata_sync"; SET search_path TO "start_stop_metadata_sync";
SELECT * FROM distributed_table_1; SELECT * FROM distributed_table_1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
COMMIT;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node; SELECT count(*) > 0 FROM pg_dist_node;
@ -127,7 +122,87 @@ SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND rel
\c - - - :master_port \c - - - :master_port
SET search_path TO "start_stop_metadata_sync"; SET search_path TO "start_stop_metadata_sync";
-- both start & stop metadata sync operations can be transactional
BEGIN;
-- sync the same node multiple times
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- sync the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
-- stop the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
COMMIT;
\c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node;
SELECT count(*) > 0 FROM pg_dist_shard;
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'distributed_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
SELECT count(*) > 0 FROM pg_class WHERE relname LIKE 'reference_table__' AND relnamespace IN (SELECT oid FROM pg_namespace WHERE nspname = 'start_stop_metadata_sync');
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
-- start metadata sync sets the multi-shard modify mode to sequential
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
show citus.multi_shard_modify_mode;
COMMIT;
-- stop metadata sync sets the multi-shard modify mode to sequential
BEGIN;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
show citus.multi_shard_modify_mode;
COMMIT;
-- multi-connection commands are not allowed with start_metadata_sync
BEGIN;
SET citus.force_max_query_parallelization TO ON;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- this is safe because start_metadata_sync_to_node already switches to
-- sequential execution
BEGIN;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
ROLLBACK;
-- multi-shard commands are allowed with start_metadata_sync
-- as long as the start_metadata_sync_to_node executed
-- when it is OK to switch to sequential execution
BEGIN;
-- sync at the start of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SET citus.multi_shard_modify_mode TO sequential;
CREATE TABLE test_table(a int);
SELECT create_distributed_table('test_table', 'a');
ALTER TABLE test_table ADD COLUMN B INT;
INSERT INTO test_table SELECT i,i From generate_series(0,100)i;
SELECT count(*) FROM test_table;
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- cleanup -- cleanup
\c - - - :master_port
SET search_path TO "start_stop_metadata_sync";
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;