Fix assert failure in bare SELECT FROM reference table FOR UPDATE in MX

pull/3008/head
Marco Slot 2019-09-22 16:33:19 +02:00
parent 71e7047e65
commit e58d76c5f6
8 changed files with 142 additions and 29 deletions

View File

@ -921,6 +921,21 @@ StartDistributedExecution(DistributedExecution *execution)
execution->errorOnAnyFailure = true;
}
/*
* Prevent unsafe concurrent modifications of replicated shards by taking
* locks.
*
* When modifying a reference tables in MX mode, we take the lock via RPC
* to the first worker in a transaction block, which activates a coordinated
* transaction. We need to do this before determining whether the execution
* should use transaction blocks (see below).
*/
AcquireExecutorShardLocksForExecution(execution);
/*
* If the current or previous execution in the current transaction requires
* rollback then we should use transaction blocks.
*/
execution->isTransaction = InCoordinatedTransaction();
/*
@ -937,9 +952,6 @@ StartDistributedExecution(DistributedExecution *execution)
{
RecordParallelRelationAccessForTaskList(taskList);
}
/* prevent unsafe concurrent modifications */
AcquireExecutorShardLocksForExecution(execution);
}

View File

@ -512,3 +512,31 @@ WorkerNodeCompare(const void *lhsKey, const void *rhsKey, Size keySize)
portCompare = workerLhs->workerPort - workerRhs->workerPort;
return portCompare;
}
/*
* GetFirstPrimaryWorkerNode returns the primary worker node with the
* lowest rank based on CompareWorkerNodes.
*
* The ranking is arbitrary, but needs to be kept consistent with IsFirstWorkerNode.
*/
WorkerNode *
GetFirstPrimaryWorkerNode(void)
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
ListCell *workerNodeCell = NULL;
WorkerNode *firstWorkerNode = NULL;
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
if (firstWorkerNode == NULL ||
CompareWorkerNodes(&workerNode, &firstWorkerNode) < 0)
{
firstWorkerNode = workerNode;
}
}
return firstWorkerNode;
}

View File

@ -68,30 +68,6 @@ SendCommandToWorkerAsUser(char *nodeName, int32 nodePort, const char *nodeUser,
}
/*
* SendCommandToFirstWorker sends the given command only to the first worker node
* sorted by host name and port number using SendCommandToWorker.
*/
void
SendCommandToFirstWorker(char *command)
{
List *workerNodeList = ActivePrimaryNodeList(NoLock);
WorkerNode *firstWorkerNode = NULL;
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
if (list_length(workerNodeList) == 0)
{
ereport(ERROR, (errmsg("cannot find a worker node")));
}
firstWorkerNode = (WorkerNode *) linitial(workerNodeList);
SendCommandToWorker(firstWorkerNode->workerName, firstWorkerNode->workerPort,
command);
}
/*
* SendCommandToWorkers sends a command to all workers in
* parallel. Commands are committed on the workers when the local

View File

@ -31,6 +31,7 @@
#include "distributed/multi_router_executor.h"
#include "distributed/relay_utility.h"
#include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
@ -179,6 +180,10 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
ListCell *shardIntervalCell = NULL;
int processedShardIntervalCount = 0;
int totalShardIntervalCount = list_length(shardIntervalList);
WorkerNode *firstWorkerNode = GetFirstPrimaryWorkerNode();
int connectionFlags = 0;
const char *superuser = CitusExtensionOwnerName();
MultiConnection *firstWorkerConnection = NULL;
appendStringInfo(lockCommand, "SELECT lock_shard_resources(%d, ARRAY[", lockmode);
@ -198,7 +203,26 @@ LockShardListResourcesOnFirstWorker(LOCKMODE lockmode, List *shardIntervalList)
appendStringInfo(lockCommand, "])");
SendCommandToFirstWorker(lockCommand->data);
/* need to hold the lock until commit */
BeginOrContinueCoordinatedTransaction();
/*
* Use the superuser connection to make sure we are allowed to lock.
* This also helps ensure we only use one connection.
*/
firstWorkerConnection = GetNodeUserDatabaseConnection(connectionFlags,
firstWorkerNode->workerName,
firstWorkerNode->workerPort,
superuser, NULL);
/* the SELECT .. FOR UPDATE breaks if we lose the connection */
MarkRemoteTransactionCritical(firstWorkerConnection);
/* make sure we are in a tranasaction block to hold the lock until commit */
RemoteTransactionBeginIfNecessary(firstWorkerConnection);
/* grab the lock on the first worker node */
ExecuteCriticalRemoteCommand(firstWorkerConnection, lockCommand->data);
}
@ -378,6 +402,12 @@ LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode)
ClusterHasKnownMetadataWorkers() &&
!IsFirstWorkerNode())
{
/*
* When there is metadata, all nodes can write to the reference table,
* but the writes need to be serialised. To achieve that, all nodes will
* take the shard resource lock on the first worker node via RPC, except
* for the first worker node which will just take it the regular way.
*/
LockShardListResourcesOnFirstWorker(lockMode, referencedShardIntervalList);
}

View File

@ -78,6 +78,7 @@ extern bool WorkerNodeIsPrimary(WorkerNode *worker);
extern bool WorkerNodeIsSecondary(WorkerNode *worker);
extern bool WorkerNodeIsReadable(WorkerNode *worker);
extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);

View File

@ -34,7 +34,6 @@ extern List * TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE
extern void SendCommandToWorker(char *nodeName, int32 nodePort, const char *command);
extern void SendCommandToWorkerAsUser(char *nodeName, int32 nodePort,
const char *nodeUser, const char *command);
extern void SendCommandToFirstWorker(char *command);
extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, const char *command);
extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet,
List *commandList);

View File

@ -12,7 +12,37 @@ INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05');
-- SELECT .. FOR UPDATE should work on coordinator (takes lock on first worker)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
END;
\c - - - :worker_1_port
-- SELECT .. FOR UPDATE should work on first worker (takes lock on self)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
END;
-- run some queries on top of the data
SELECT
*
@ -569,6 +599,21 @@ INSERT INTO reference_table_test_second VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test_third VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test_third VALUES (5, 5.0, '5', '2016-12-05');
\c - - - :worker_2_port
-- SELECT .. FOR UPDATE should work on second worker (takes lock on first worker)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
value_1 | value_2
---------+---------
1 | 1
(1 row)
END;
-- some very basic tests
SELECT
DISTINCT t1.value_1

View File

@ -10,7 +10,22 @@ INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
INSERT INTO reference_table_test VALUES (5, 5.0, '5', '2016-12-05');
-- SELECT .. FOR UPDATE should work on coordinator (takes lock on first worker)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
END;
\c - - - :worker_1_port
-- SELECT .. FOR UPDATE should work on first worker (takes lock on self)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
END;
-- run some queries on top of the data
SELECT
*
@ -340,6 +355,13 @@ INSERT INTO reference_table_test_third VALUES (5, 5.0, '5', '2016-12-05');
\c - - - :worker_2_port
-- SELECT .. FOR UPDATE should work on second worker (takes lock on first worker)
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
BEGIN;
SELECT value_1, value_2 FROM reference_table_test ORDER BY value_1, value_2 LIMIT 1 FOR UPDATE;
END;
-- some very basic tests
SELECT
DISTINCT t1.value_1