mirror of https://github.com/citusdata/citus.git
Merge pull request #4683 from citusdata/int_results_on_separate_conns
Do not re-use connections for colocated intermediate results during COPYpull/4698/head
commit
1b5244c410
|
@ -121,7 +121,7 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||||
* 4MB is a good balance between memory usage and performance. Note that this
|
* 4MB is a good balance between memory usage and performance. Note that this
|
||||||
* is irrelevant in the common case where we open one connection per placement.
|
* is irrelevant in the common case where we open one connection per placement.
|
||||||
*/
|
*/
|
||||||
#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024)
|
int CopySwitchOverThresholdBytes = 4 * 1024 * 1024;
|
||||||
|
|
||||||
#define FILE_IS_OPEN(x) (x > -1)
|
#define FILE_IS_OPEN(x) (x > -1)
|
||||||
|
|
||||||
|
@ -280,7 +280,8 @@ static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
|
||||||
copyOutState, bool isColocatedIntermediateResult);
|
copyOutState, bool isColocatedIntermediateResult);
|
||||||
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
|
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
|
||||||
ShardPlacement *placement,
|
ShardPlacement *placement,
|
||||||
bool stopOnFailure);
|
bool stopOnFailure,
|
||||||
|
bool colocatedIntermediateResult);
|
||||||
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
|
static bool HasReachedAdaptiveExecutorPoolSize(List *connectionStateHash);
|
||||||
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
|
static MultiConnection * GetLeastUtilisedCopyConnection(List *connectionStateList,
|
||||||
char *nodeName, int nodePort);
|
char *nodeName, int nodePort);
|
||||||
|
@ -2291,7 +2292,9 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
/* define the template for the COPY statement that is sent to workers */
|
/* define the template for the COPY statement that is sent to workers */
|
||||||
CopyStmt *copyStatement = makeNode(CopyStmt);
|
CopyStmt *copyStatement = makeNode(CopyStmt);
|
||||||
|
|
||||||
if (copyDest->colocatedIntermediateResultIdPrefix != NULL)
|
bool colocatedIntermediateResults =
|
||||||
|
copyDest->colocatedIntermediateResultIdPrefix != NULL;
|
||||||
|
if (colocatedIntermediateResults)
|
||||||
{
|
{
|
||||||
copyStatement->relation = makeRangeVar(NULL,
|
copyStatement->relation = makeRangeVar(NULL,
|
||||||
copyDest->
|
copyDest->
|
||||||
|
@ -2329,6 +2332,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
|
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Colocated intermediate results do not honor citus.max_shared_pool_size,
|
||||||
|
* so we don't need to reserve any connections. Each result file is sent
|
||||||
|
* over a single connection.
|
||||||
|
*/
|
||||||
|
if (!colocatedIntermediateResults)
|
||||||
|
{
|
||||||
/*
|
/*
|
||||||
* For all the primary (e.g., writable) remote nodes, reserve a shared
|
* For all the primary (e.g., writable) remote nodes, reserve a shared
|
||||||
* connection. We do this upfront because we cannot know which nodes
|
* connection. We do this upfront because we cannot know which nodes
|
||||||
|
@ -2343,6 +2353,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
* COPY would fail hinting the user to change the relevant settiing.
|
* COPY would fail hinting the user to change the relevant settiing.
|
||||||
*/
|
*/
|
||||||
EnsureConnectionPossibilityForRemotePrimaryNodes();
|
EnsureConnectionPossibilityForRemotePrimaryNodes();
|
||||||
|
}
|
||||||
|
|
||||||
LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
|
LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
|
||||||
if (localCopyStatus == LOCAL_COPY_DISABLED)
|
if (localCopyStatus == LOCAL_COPY_DISABLED)
|
||||||
|
@ -2502,7 +2513,7 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
|
||||||
switchToCurrentPlacement = true;
|
switchToCurrentPlacement = true;
|
||||||
}
|
}
|
||||||
else if (currentPlacementState != activePlacementState &&
|
else if (currentPlacementState != activePlacementState &&
|
||||||
currentPlacementState->data->len > COPY_SWITCH_OVER_THRESHOLD)
|
currentPlacementState->data->len > CopySwitchOverThresholdBytes)
|
||||||
{
|
{
|
||||||
switchToCurrentPlacement = true;
|
switchToCurrentPlacement = true;
|
||||||
|
|
||||||
|
@ -3580,7 +3591,8 @@ InitializeCopyShardState(CopyShardState *shardState,
|
||||||
}
|
}
|
||||||
|
|
||||||
MultiConnection *connection =
|
MultiConnection *connection =
|
||||||
CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure);
|
CopyGetPlacementConnection(connectionStateHash, placement, stopOnFailure,
|
||||||
|
colocatedIntermediateResult);
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
{
|
{
|
||||||
failedPlacementCount++;
|
failedPlacementCount++;
|
||||||
|
@ -3691,11 +3703,40 @@ LogLocalCopyToFileExecution(uint64 shardId)
|
||||||
* then it reuses the connection. Otherwise, it requests a connection for placement.
|
* then it reuses the connection. Otherwise, it requests a connection for placement.
|
||||||
*/
|
*/
|
||||||
static MultiConnection *
|
static MultiConnection *
|
||||||
CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool
|
CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
|
||||||
stopOnFailure)
|
bool stopOnFailure, bool colocatedIntermediateResult)
|
||||||
{
|
{
|
||||||
uint32 connectionFlags = FOR_DML;
|
if (colocatedIntermediateResult)
|
||||||
char *nodeUser = CurrentUserName();
|
{
|
||||||
|
/*
|
||||||
|
* Colocated intermediate results are just files and not required to use
|
||||||
|
* the same connections with their co-located shards. So, we are free to
|
||||||
|
* use any connection we can get.
|
||||||
|
*
|
||||||
|
* Also, the current connection re-use logic does not know how to handle
|
||||||
|
* intermediate results as the intermediate results always truncates the
|
||||||
|
* existing files. That's why we we use one connection per intermediate
|
||||||
|
* result.
|
||||||
|
*
|
||||||
|
* Also note that we are breaking the guarantees of citus.shared_pool_size
|
||||||
|
* as we cannot rely on optional connections.
|
||||||
|
*/
|
||||||
|
uint32 connectionFlagsForIntermediateResult = 0;
|
||||||
|
MultiConnection *connection =
|
||||||
|
GetNodeConnection(connectionFlagsForIntermediateResult, placement->nodeName,
|
||||||
|
placement->nodePort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* As noted above, we want each intermediate file to go over
|
||||||
|
* a separate connection.
|
||||||
|
*/
|
||||||
|
ClaimConnectionExclusively(connection);
|
||||||
|
|
||||||
|
/* and, we cannot afford to handle failures when anything goes wrong */
|
||||||
|
MarkRemoteTransactionCritical(connection);
|
||||||
|
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Determine whether the task has to be assigned to a particular connection
|
* Determine whether the task has to be assigned to a particular connection
|
||||||
|
@ -3703,6 +3744,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
|
||||||
*/
|
*/
|
||||||
ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement,
|
ShardPlacementAccess *placementAccess = CreatePlacementAccess(placement,
|
||||||
PLACEMENT_ACCESS_DML);
|
PLACEMENT_ACCESS_DML);
|
||||||
|
uint32 connectionFlags = FOR_DML;
|
||||||
MultiConnection *connection =
|
MultiConnection *connection =
|
||||||
GetConnectionIfPlacementAccessedInXact(connectionFlags,
|
GetConnectionIfPlacementAccessedInXact(connectionFlags,
|
||||||
list_make1(placementAccess), NULL);
|
list_make1(placementAccess), NULL);
|
||||||
|
@ -3791,6 +3833,7 @@ CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement,
|
||||||
connectionFlags |= REQUIRE_CLEAN_CONNECTION;
|
connectionFlags |= REQUIRE_CLEAN_CONNECTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *nodeUser = CurrentUserName();
|
||||||
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -575,6 +575,22 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_STANDARD,
|
GUC_STANDARD,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomIntVariable(
|
||||||
|
"citus.copy_switchover_threshold",
|
||||||
|
gettext_noop("Sets the threshold for copy to be switched "
|
||||||
|
"over per connection."),
|
||||||
|
gettext_noop("Data size threshold to switch over the active placement for "
|
||||||
|
"a connection. If this is too low, overhead of starting COPY "
|
||||||
|
"commands will hurt the performance. If this is too high, "
|
||||||
|
"buffered data will use lots of memory. 4MB is a good balance "
|
||||||
|
"between memory usage and performance. Note that this is irrelevant "
|
||||||
|
"in the common case where we open one connection per placement."),
|
||||||
|
&CopySwitchOverThresholdBytes,
|
||||||
|
4 * 1024 * 1024, 1, INT_MAX,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_local_execution",
|
"citus.enable_local_execution",
|
||||||
gettext_noop("Enables queries on shards that are local to the current node "
|
gettext_noop("Enables queries on shards that are local to the current node "
|
||||||
|
|
|
@ -141,6 +141,10 @@ typedef struct CitusCopyDestReceiver
|
||||||
} CitusCopyDestReceiver;
|
} CitusCopyDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
/* managed via GUC, the default is 4MB */
|
||||||
|
extern int CopySwitchOverThresholdBytes;
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
List *columnNameList,
|
List *columnNameList,
|
||||||
|
|
|
@ -543,6 +543,40 @@ SELECT * FROM target_table ORDER BY 1;
|
||||||
10 | 0
|
10 | 0
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
|
-- make sure that even if COPY switchover happens
|
||||||
|
-- the results are correct
|
||||||
|
SET citus.copy_switchover_threshold TO 1;
|
||||||
|
TRUNCATE target_table;
|
||||||
|
-- load some data to make sure copy commands switch over connections
|
||||||
|
INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i;
|
||||||
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
-- make sure that SELECT only uses 1 connection 1 node
|
||||||
|
-- yet still COPY commands use 1 connection per co-located
|
||||||
|
-- intermediate result file
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: push down of limit count: 10000
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
SELECT DISTINCT col_2 FROM target_table;
|
||||||
|
col_2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *)
|
||||||
|
SELECT DISTINCT col_2 FROM cte_1;
|
||||||
|
DEBUG: generating subplan XXX_1 for CTE cte_1: INSERT INTO on_conflict.target_table (col_1, col_2) SELECT col_1, col_2 FROM on_conflict.target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = (excluded.col_2 OPERATOR(pg_catalog.+) 1) RETURNING target_table.col_1, target_table.col_2
|
||||||
|
DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries
|
||||||
|
DEBUG: push down of limit count: 10000
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT DISTINCT col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) cte_1
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
col_2
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
DROP SCHEMA on_conflict CASCADE;
|
DROP SCHEMA on_conflict CASCADE;
|
||||||
NOTICE: drop cascades to 7 other objects
|
NOTICE: drop cascades to 7 other objects
|
||||||
|
|
|
@ -495,7 +495,7 @@ BEGIN;
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size
|
-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size
|
||||||
-- in underlying COPY commands
|
-- in underlying COPY commands
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT pg_sleep(0.1);
|
SELECT pg_sleep(0.1);
|
||||||
|
@ -504,7 +504,9 @@ BEGIN;
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *;
|
-- make sure that we hit at least 4 shards per node, where 20 rows
|
||||||
|
-- is enough
|
||||||
|
INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *;
|
||||||
a
|
a
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
0
|
0
|
||||||
|
@ -518,10 +520,20 @@ BEGIN;
|
||||||
8
|
8
|
||||||
9
|
9
|
||||||
10
|
10
|
||||||
(11 rows)
|
11
|
||||||
|
12
|
||||||
|
13
|
||||||
|
14
|
||||||
|
15
|
||||||
|
16
|
||||||
|
17
|
||||||
|
18
|
||||||
|
19
|
||||||
|
20
|
||||||
|
(21 rows)
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
connection_count_to_node
|
connection_count_to_node > current_setting('citus.max_shared_pool_size')::int
|
||||||
FROM
|
FROM
|
||||||
citus_remote_connection_stats()
|
citus_remote_connection_stats()
|
||||||
WHERE
|
WHERE
|
||||||
|
@ -529,10 +541,10 @@ BEGIN;
|
||||||
database_name = 'regression'
|
database_name = 'regression'
|
||||||
ORDER BY
|
ORDER BY
|
||||||
hostname, port;
|
hostname, port;
|
||||||
connection_count_to_node
|
?column?
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
3
|
t
|
||||||
3
|
t
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
|
@ -314,5 +314,24 @@ WITH cte AS(
|
||||||
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
INSERT INTO target_table SELECT * FROM cte_2 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
||||||
SELECT * FROM target_table ORDER BY 1;
|
SELECT * FROM target_table ORDER BY 1;
|
||||||
|
|
||||||
|
-- make sure that even if COPY switchover happens
|
||||||
|
-- the results are correct
|
||||||
|
SET citus.copy_switchover_threshold TO 1;
|
||||||
|
TRUNCATE target_table;
|
||||||
|
|
||||||
|
-- load some data to make sure copy commands switch over connections
|
||||||
|
INSERT INTO target_table SELECT i,0 FROM generate_series(0,500)i;
|
||||||
|
|
||||||
|
-- make sure that SELECT only uses 1 connection 1 node
|
||||||
|
-- yet still COPY commands use 1 connection per co-located
|
||||||
|
-- intermediate result file
|
||||||
|
SET citus.max_adaptive_executor_pool_size TO 1;
|
||||||
|
|
||||||
|
INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1;
|
||||||
|
SELECT DISTINCT col_2 FROM target_table;
|
||||||
|
|
||||||
|
WITH cte_1 AS (INSERT INTO target_table SELECT * FROM target_table LIMIT 10000 ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1 RETURNING *)
|
||||||
|
SELECT DISTINCT col_2 FROM cte_1;
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
DROP SCHEMA on_conflict CASCADE;
|
DROP SCHEMA on_conflict CASCADE;
|
||||||
|
|
|
@ -340,14 +340,16 @@ BEGIN;
|
||||||
hostname, port;
|
hostname, port;
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
|
|
||||||
-- INSERT SELECT with RETURNING/ON CONFLICT clauses should honor shared_pool_size
|
-- INSERT SELECT with RETURNING/ON CONFLICT clauses does not honor shared_pool_size
|
||||||
-- in underlying COPY commands
|
-- in underlying COPY commands
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT pg_sleep(0.1);
|
SELECT pg_sleep(0.1);
|
||||||
INSERT INTO test SELECT i FROM generate_series(0,10) i RETURNING *;
|
-- make sure that we hit at least 4 shards per node, where 20 rows
|
||||||
|
-- is enough
|
||||||
|
INSERT INTO test SELECT i FROM generate_series(0,20) i RETURNING *;
|
||||||
|
|
||||||
SELECT
|
SELECT
|
||||||
connection_count_to_node
|
connection_count_to_node > current_setting('citus.max_shared_pool_size')::int
|
||||||
FROM
|
FROM
|
||||||
citus_remote_connection_stats()
|
citus_remote_connection_stats()
|
||||||
WHERE
|
WHERE
|
||||||
|
|
Loading…
Reference in New Issue