mirror of https://github.com/citusdata/citus.git
Merge pull request #2199 from citusdata/seq_realtime_select
Realtime executor honours multi_shard_modify_modepull/2194/head
commit
aeaa28c005
|
@ -20,6 +20,7 @@
|
|||
#include "commands/dbcommands.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/multi_server_executor.h"
|
||||
#include "distributed/placement_connection.h"
|
||||
|
@ -188,7 +189,17 @@ MultiClientPlacementConnectStart(List *placementAccessList, const char *userName
|
|||
MultiConnection *connection = NULL;
|
||||
ConnStatusType connStatusType = CONNECTION_OK;
|
||||
int32 connectionId = AllocateConnectionId();
|
||||
int connectionFlags = CONNECTION_PER_PLACEMENT; /* no cached connections for now */
|
||||
int connectionFlags = 0;
|
||||
|
||||
/*
|
||||
* Although we're opening connections for SELECT queries, we're relying
|
||||
* on multi_shard_modify_mode GUC. The name of the GUC is unfortunate, but,
|
||||
* adding one more GUC (or renaming the GUC) would make the UX even worse.
|
||||
*/
|
||||
if (MultiShardConnectionType == PARALLEL_CONNECTION)
|
||||
{
|
||||
connectionFlags = CONNECTION_PER_PLACEMENT;
|
||||
}
|
||||
|
||||
if (connectionId == INVALID_CONNECTION_ID)
|
||||
{
|
||||
|
|
|
@ -40,7 +40,10 @@
|
|||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
/* controls the connection type for multi shard update/delete queries */
|
||||
/*
|
||||
* Controls the connection type for multi shard modifications, DDLs
|
||||
* TRUNCATE and real-time SELECT queries.
|
||||
*/
|
||||
int MultiShardConnectionType = PARALLEL_CONNECTION;
|
||||
|
||||
|
||||
|
|
|
@ -901,23 +901,33 @@ LookupWorkerForTask(HTAB *workerHash, Task *task, TaskExecution *taskExecution)
|
|||
/*
|
||||
* WorkerConnectionsExhausted determines if the current query has exhausted the
|
||||
* maximum number of open connections that can be made to a worker.
|
||||
*
|
||||
* Note that the function takes sequential exection of the queries into account
|
||||
* as well. In other words, in the sequential mode, the connections are considered
|
||||
* to be exahusted when there is already a connection opened to the given worker.
|
||||
*/
|
||||
static bool
|
||||
WorkerConnectionsExhausted(WorkerNodeState *workerNodeState)
|
||||
{
|
||||
bool reachedLimit = false;
|
||||
|
||||
/*
|
||||
* A worker cannot accept more than max_connections connections. If we have a
|
||||
* small number of workers with many shards, then a single query could exhaust
|
||||
* max_connections unless we throttle here. We use the value of max_connections
|
||||
* on the master as a proxy for the worker configuration to avoid introducing a
|
||||
* new configuration value.
|
||||
*/
|
||||
if (workerNodeState->openConnectionCount >= MaxConnections)
|
||||
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION &&
|
||||
workerNodeState->openConnectionCount >= 1)
|
||||
{
|
||||
reachedLimit = true;
|
||||
}
|
||||
else if (MultiShardConnectionType == PARALLEL_CONNECTION &&
|
||||
workerNodeState->openConnectionCount >= MaxConnections)
|
||||
{
|
||||
/*
|
||||
* A worker cannot accept more than max_connections connections. If we have a
|
||||
* small number of workers with many shards, then a single query could exhaust
|
||||
* max_connections unless we throttle here. We use the value of max_connections
|
||||
* on the master as a proxy for the worker configuration to avoid introducing a
|
||||
* new configuration value.
|
||||
*/
|
||||
reachedLimit = true;
|
||||
}
|
||||
|
||||
return reachedLimit;
|
||||
}
|
||||
|
|
|
@ -340,6 +340,22 @@ ROLLBACK;
|
|||
BEGIN;
|
||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||
ROLLBACK;
|
||||
-- sequential real-time queries should be successfully executed
|
||||
-- since the queries are sent over the same connection
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||
id | pg_advisory_lock
|
||||
----+------------------
|
||||
6 |
|
||||
5 |
|
||||
4 |
|
||||
3 |
|
||||
2 |
|
||||
1 |
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
alter system set deadlock_timeout TO DEFAULT;
|
||||
|
|
|
@ -348,6 +348,22 @@ ROLLBACK;
|
|||
BEGIN;
|
||||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||
ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||
ROLLBACK;
|
||||
-- sequential real-time queries should be successfully executed
|
||||
-- since the queries are sent over the same connection
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||
id | pg_advisory_lock
|
||||
----+------------------
|
||||
6 |
|
||||
5 |
|
||||
4 |
|
||||
3 |
|
||||
2 |
|
||||
1 |
|
||||
(6 rows)
|
||||
|
||||
ROLLBACK;
|
||||
SET client_min_messages TO DEFAULT;
|
||||
alter system set deadlock_timeout TO DEFAULT;
|
||||
|
|
|
@ -105,6 +105,23 @@ SELECT count(*) FROM second_raw_table;
|
|||
0
|
||||
(1 row)
|
||||
|
||||
-- sequential insert followed by a sequential real-time query should be fine
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
WITH ids_inserted AS
|
||||
(
|
||||
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
|
||||
)
|
||||
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
|
||||
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
||||
DEBUG: generating subplan 17_1 for CTE ids_inserted: INSERT INTO with_transactions.raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING raw_table.tenant_id
|
||||
DEBUG: Plan 17 query after replacing subqueries and CTEs: SELECT income FROM with_transactions.second_raw_table WHERE (tenant_id OPERATOR(pg_catalog.=) ANY (SELECT ids_inserted.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('17_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id integer)) ids_inserted)) ORDER BY income DESC LIMIT 3
|
||||
DEBUG: push down of limit count: 3
|
||||
income
|
||||
--------
|
||||
(0 rows)
|
||||
|
||||
ROLLBACK;
|
||||
RESET client_min_messages;
|
||||
RESET citus.shard_count;
|
||||
DROP SCHEMA with_transactions CASCADE;
|
||||
|
|
|
@ -213,6 +213,13 @@ BEGIN;
|
|||
SELECT id, pg_advisory_lock(15) FROM test_table;
|
||||
ROLLBACK;
|
||||
|
||||
-- sequential real-time queries should be successfully executed
|
||||
-- since the queries are sent over the same connection
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
SELECT id, pg_advisory_lock(15) FROM test_table ORDER BY 1 DESC;
|
||||
ROLLBACK;
|
||||
|
||||
SET client_min_messages TO DEFAULT;
|
||||
alter system set deadlock_timeout TO DEFAULT;
|
||||
SELECT pg_reload_conf();
|
||||
|
|
|
@ -70,6 +70,16 @@ SELECT count(*) FROM raw_table;
|
|||
SELECT count(*) FROM raw_table WHERE created_at = '2001-02-10 20:00:00';
|
||||
SELECT count(*) FROM second_raw_table;
|
||||
|
||||
-- sequential insert followed by a sequential real-time query should be fine
|
||||
BEGIN;
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
WITH ids_inserted AS
|
||||
(
|
||||
INSERT INTO raw_table (tenant_id) VALUES (11), (12), (13), (14) RETURNING tenant_id
|
||||
)
|
||||
SELECT income FROM second_raw_table WHERE tenant_id IN (SELECT * FROM ids_inserted) ORDER BY 1 DESC LIMIT 3;
|
||||
ROLLBACK;
|
||||
|
||||
RESET client_min_messages;
|
||||
RESET citus.shard_count;
|
||||
DROP SCHEMA with_transactions CASCADE;
|
||||
|
|
Loading…
Reference in New Issue