Realtime executor honours multi_shard_modify_mode

We're relying on multi_shard_modify_mode GUC for real-time SELECTs.
The name of the GUC is unfortunate, but, adding one more GUC
(or renaming the GUC) would make the UX even worse. Given that this
mode is mostly important for transaction blocks that involve modification
/DDL queries along with real-time SELECTs, we can live with the confusion.
pull/2199/head
Onder Kalaci 2018-06-05 13:52:54 +03:00
parent d918556dca
commit a5370f5bb0
8 changed files with 100 additions and 10 deletions

View File

@ -20,6 +20,7 @@
#include "commands/dbcommands.h"
#include "distributed/metadata_cache.h"
#include "distributed/connection_management.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_server_executor.h"
#include "distributed/placement_connection.h"
@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
int32 connectionId = AllocateConnectionId();
int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */
int connectionFlags = 0;
/*
* Although we're opening connections for SELECT queries, we're relying
* on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but,
* adding one more GUC (or renaming the GUC) would make the UX even worse.
*/
if (MultiShardConnectionType == PARALLEL_CONNECTION)
{
connectionFlags = CONNECTION_PER_PLACEMENT;
}
if (connectionId == INVALID_CONNECTION_ID)
{

View File

@ -40,7 +40,10 @@
#include "utils/memutils.h"
/* controls the connection type for multi shard update/delete queries */
/*
* Controls the connection type for multi shard modifications, DDLs
* TRUNCATE and real-time SELECT queries.
*/
int MultiShardConnectionType = PARALLEL_CONNECTION;

View File

@ -901,12 +901,24 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution)
/*
* WorkerConnectionsExhausted determines if the current query has exhausted the
* maximum number of open connections that can be made to a worker.
*
* Note that the function takes sequential exection of the queries into account
* as well. In other words, in the sequential mode, the connections are considered
* to be exahusted when there is already a connection opened to the given worker.
*/
static bool
WorkerConnectionsExhausted(WorkerNodeState *workerNodeState)
{
bool reachedLimit = false;
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION &&
workerNodeState->openConnectionCount >= 1)
{
reachedLimit = true;
}
else if (MultiShardConnectionType == PARALLEL_CONNECTION &&
workerNodeState->openConnectionCount >= MaxConnections)
{
/*
* A worker cannot accept more than max_connections connections. If we have a
* small number of workers with many shards, then a single query could exhaust
@ -914,8 +926,6 @@ WorkerConnectionsExhausted(WorkerNodeState *workerNodeState)
* on the master as a proxy for the worker configuration to avoid introducing a
* new configuration value.
*/
if (workerNodeState->openConnectionCount >= MaxConnections)
{
reachedLimit = true;
}

View File

@ -340,6 +340,22 @@ ROLLBACK;
BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ERROR: canceling the transaction since it was involved in a distributed deadlock
ROLLBACK;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
id | pg_advisory_lock
----+------------------
6 |
5 |
4 |
3 |
2 |
1 |
(6 rows)
ROLLBACK;
SET client_min_messages TO DEFAULT;
alter system set deadlock_timeout TO DEFAULT;

View File

@ -348,6 +348,22 @@ ROLLBACK;
BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ERROR: canceling the transaction since it was involved in a distributed deadlock
ROLLBACK;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
id | pg_advisory_lock
----+------------------
6 |
5 |
4 |
3 |
2 |
1 |
(6 rows)
ROLLBACK;
SET client_min_messages TO DEFAULT;
alter system set deadlock_timeout TO DEFAULT;

View File

@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table;
0
(1 row)
-- sequential insert followed by a sequential real-time query should be fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH ids_inserted AS
(
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
)
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3
DEBUG: push down of limit count: 3
income
--------
(0 rows)
ROLLBACK;
RESET client_min_messages;
RESET citus.shard_count;
DROP SCHEMA with_transactions CASCADE;

View File

@ -213,6 +213,13 @@ BEGIN;
SELECT id, pg_advisory_lock(15) FROM test_table;
ROLLBACK;
-- sequential real-time queries should be successfully executed
-- since the queries are sent over the same connection
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
ROLLBACK;
SET client_min_messages TO DEFAULT;
alter system set deadlock_timeout TO DEFAULT;
SELECT pg_reload_conf();

View File

@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table;
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
SELECT count(*) FROM second_raw_table;
-- sequential insert followed by a sequential real-time query should be fine
BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
WITH ids_inserted AS
(
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
)
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
ROLLBACK;
RESET client_min_messages;
RESET citus.shard_count;
DROP SCHEMA with_transactions CASCADE;