diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 412bb2b19..d2b29d9e9 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -921,6 +921,21 @@ StartDistributedExecution(DistributedExecution *execution) execution->errorOnAnyFailure = true; } + /* + * Prevent unsafe concurrent modifications of replicated shards by taking + * locks. + * + * When modifying a reference tables in MX mode, we take the lock via RPC + * to the first worker in a transaction block, which activates a coordinated + * transaction. We need to do this before determining whether the execution + * should use transaction blocks (see below). + */ + AcquireExecutorShardLocksForExecution(execution); + + /* + * If the current or previous execution in the current transaction requires + * rollback then we should use transaction blocks. + */ execution->isTransaction = InCoordinatedTransaction(); /* @@ -937,9 +952,6 @@ StartDistributedExecution(DistributedExecution *execution) { RecordParallelRelationAccessForTaskList(taskList); } - - /* prevent unsafe concurrent modifications */ - AcquireExecutorShardLocksForExecution(execution); } diff --git a/src/backend/distributed/master/worker_node_manager.c b/src/backend/distributed/master/worker_node_manager.c index 84ff9d29e..ed04c1ed1 100644 --- a/src/backend/distributed/master/worker_node_manager.c +++ b/src/backend/distributed/master/worker_node_manager.c @@ -512,3 +512,31 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize) portCompare = workerLhs->workerPort - workerRhs->workerPort; return portCompare; } + + +/* + * GetFirstPrimaryWorkerNode returns the primary worker node with the + * lowest rank based on CompareWorkerNodes. + * + * The ranking is arbitrary, but needs to be kept consistent with IsFirstWorkerNode. + */ +WorkerNode * +GetFirstPrimaryWorkerNode(void) +{ + List *workerNodeList = ActivePrimaryNodeList(NoLock); + ListCell *workerNodeCell = NULL; + WorkerNode *firstWorkerNode = NULL; + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + + if (firstWorkerNode == NULL || + CompareWorkerNodes(&workerNode, &firstWorkerNode) < 0) + { + firstWorkerNode = workerNode; + } + } + + return firstWorkerNode; +} diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index dde72b8d1..5812f79cd 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -68,30 +68,6 @@ SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, } -/* - * SendCommandToFirstWorker sends the given command only to the first worker node - * sorted by host name and port number using SendCommandToWorker. - */ -void -SendCommandToFirstWorker(char *command) -{ - List *workerNodeList = ActivePrimaryNodeList(NoLock); - WorkerNode *firstWorkerNode = NULL; - - workerNodeList = SortList(workerNodeList, CompareWorkerNodes); - - if (list_length(workerNodeList) == 0) - { - ereport(ERROR, (errmsg("cannot find a worker node"))); - } - - firstWorkerNode = (WorkerNode *) linitial(workerNodeList); - - SendCommandToWorker(firstWorkerNode->workerName, firstWorkerNode->workerPort, - command); -} - - /* * SendCommandToWorkers sends a command to all workers in * parallel. Commands are committed on the workers when the local diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 6a145bf13..a6f86281b 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -31,6 +31,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/relay_utility.h" #include "distributed/reference_table_utils.h" +#include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/worker_protocol.h" @@ -179,6 +180,10 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) ListCell *shardIntervalCell = NULL; int processedShardIntervalCount = 0; int totalShardIntervalCount = list_length(shardIntervalList); + WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode(); + int connectionFlags = 0; + const char *superuser = CitusExtensionOwnerName(); + MultiConnection *firstWorkerConnection = NULL; appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode); @@ -198,7 +203,26 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList) appendStringInfo(lockCommand, "])"); - SendCommandToFirstWorker(lockCommand->data); + /* need to hold the lock until commit */ + BeginOrContinueCoordinatedTransaction(); + + /* + * Use the superuser connection to make sure we are allowed to lock. + * This also helps ensure we only use one connection. + */ + firstWorkerConnection = GetNodeUserDatabaseConnection(connectionFlags, + firstWorkerNode->workerName, + firstWorkerNode->workerPort, + superuser, NULL); + + /* the SELECT .. FOR UPDATE breaks if we lose the connection */ + MarkRemoteTransactionCritical(firstWorkerConnection); + + /* make sure we are in a tranasaction block to hold the lock until commit */ + RemoteTransactionBeginIfNecessary(firstWorkerConnection); + + /* grab the lock on the first worker node */ + ExecuteCriticalRemoteCommand(firstWorkerConnection, lockCommand->data); } @@ -378,6 +402,12 @@ LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode) ClusterHasKnownMetadataWorkers() && !IsFirstWorkerNode()) { + /* + * When there is metadata, all nodes can write to the reference table, + * but the writes need to be serialised. To achieve that, all nodes will + * take the shard resource lock on the first worker node via RPC, except + * for the first worker node which will just take it the regular way. + */ LockShardListResourcesOnFirstWorker(lockMode, referencedShardIntervalList); } diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 84a9fe0df..30a7e3c96 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -78,6 +78,7 @@ extern bool WorkerNodeIsPrimary(WorkerNode *worker); extern bool WorkerNodeIsSecondary(WorkerNode *worker); extern bool WorkerNodeIsReadable(WorkerNode *worker); extern uint32 CountPrimariesWithMetadata(void); +extern WorkerNode * GetFirstPrimaryWorkerNode(void); /* Function declarations for worker node utilities */ extern int CompareWorkerNodes(const void *leftElement, const void *rightElement); diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 0464e9681..f8e91bdbf 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -34,7 +34,6 @@ extern List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE extern void SendCommandToWorker(char *nodeName, int32 nodePort, const char *command); extern void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser, const char *command); -extern void SendCommandToFirstWorker(char *command); extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command); extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList); diff --git a/src/test/regress/expected/multi_mx_reference_table.out b/src/test/regress/expected/multi_mx_reference_table.out index 18e2a6695..b1ab56efe 100644 --- a/src/test/regress/expected/multi_mx_reference_table.out +++ b/src/test/regress/expected/multi_mx_reference_table.out @@ -12,7 +12,37 @@ INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05'); +-- SELECT .. FOR UPDATE should work on coordinator (takes lock on first worker) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +END; \c - - - :worker_1_port +-- SELECT .. FOR UPDATE should work on first worker (takes lock on self) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +END; -- run some queries on top of the data SELECT * @@ -569,6 +599,21 @@ INSERT INTO reference_table_test_second VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test_third VALUES (4, 4.0, '4', '2016-12-04'); INSERT INTO reference_table_test_third VALUES (5, 5.0, '5', '2016-12-05'); \c - - - :worker_2_port +-- SELECT .. FOR UPDATE should work on second worker (takes lock on first worker) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + value_1 | value_2 +---------+--------- + 1 | 1 +(1 row) + +END; -- some very basic tests SELECT DISTINCT t1.value_1 diff --git a/src/test/regress/sql/multi_mx_reference_table.sql b/src/test/regress/sql/multi_mx_reference_table.sql index 5b1c439dc..e7204fa13 100644 --- a/src/test/regress/sql/multi_mx_reference_table.sql +++ b/src/test/regress/sql/multi_mx_reference_table.sql @@ -10,7 +10,22 @@ INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05'); +-- SELECT .. FOR UPDATE should work on coordinator (takes lock on first worker) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; +END; + \c - - - :worker_1_port + +-- SELECT .. FOR UPDATE should work on first worker (takes lock on self) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; +END; + -- run some queries on top of the data SELECT * @@ -340,6 +355,13 @@ INSERT INTO reference_table_test_third VALUES (5, 5.0, '5', '2016-12-05'); \c - - - :worker_2_port +-- SELECT .. FOR UPDATE should work on second worker (takes lock on first worker) +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; + +BEGIN; +SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE; +END; + -- some very basic tests SELECT DISTINCT t1.value_1