Merge pull request #3703 from citusdata/get_rid_of_side_channel

Move connection establishment for intermediate results after query execution
pull/3719/head
Önder Kalacı 2020-04-07 17:21:30 +02:00 committed by GitHub
commit 9fb83d6e5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 146 additions and 154 deletions

View File

@ -54,7 +54,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
static void ResetConnection(MultiConnection *connection); static void ResetConnection(MultiConnection *connection);
static void DefaultCitusNoticeProcessor(void *arg, const char *message); static void DefaultCitusNoticeProcessor(void *arg, const char *message);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
static void GivePurposeToConnection(MultiConnection *connection, int flags);
static bool RemoteTransactionIdle(MultiConnection *connection); static bool RemoteTransactionIdle(MultiConnection *connection);
static int EventSetSizeForConnectionList(List *connections); static int EventSetSizeForConnectionList(List *connections);
@ -314,8 +313,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
connection = FindAvailableConnection(entry->connections, flags); connection = FindAvailableConnection(entry->connections, flags);
if (connection) if (connection)
{ {
GivePurposeToConnection(connection, flags);
return connection; return connection;
} }
} }
@ -330,7 +327,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
dlist_push_tail(entry->connections, &connection->connectionNode); dlist_push_tail(entry->connections, &connection->connectionNode);
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
GivePurposeToConnection(connection, flags);
return connection; return connection;
} }
@ -338,10 +334,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
/* /*
* FindAvailableConnection searches the given list of connections for one that * FindAvailableConnection searches the given list of connections for one that
* is not claimed exclusively or marked as a side channel. If the caller passed * is not claimed exclusively.
* the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not
* been used to access shard placements and that connectoin will only be returned
* in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed.
* *
* If no connection is available, FindAvailableConnection returns NULL. * If no connection is available, FindAvailableConnection returns NULL.
*/ */
@ -382,56 +375,13 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
continue; continue;
} }
if ((flags & REQUIRE_SIDECHANNEL) != 0) return connection;
{
if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL ||
connection->purpose == CONNECTION_PURPOSE_ANY)
{
/* side channel must not have been used to access data */
Assert(!ConnectionUsedForAnyPlacements(connection));
return connection;
}
}
else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS ||
connection->purpose == CONNECTION_PURPOSE_ANY)
{
/* can use this connection to access data */
return connection;
}
} }
return NULL; return NULL;
} }
/*
* GivePurposeToConnection gives purpose to a connection if it does not already
* have a purpose. More specifically, it marks the connection as a sidechannel
* if the REQUIRE_SIDECHANNEL flag is set.
*/
static void
GivePurposeToConnection(MultiConnection *connection, int flags)
{
if (connection->purpose != CONNECTION_PURPOSE_ANY)
{
/* connection already has a purpose */
return;
}
if ((flags & REQUIRE_SIDECHANNEL) != 0)
{
/* connection should not be used for data access */
connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL;
}
else
{
/* connection should be used for data access */
connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS;
}
}
/* /*
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
* connections. This is mainly done when citus.node_conninfo changes. * connections. This is mainly done when citus.node_conninfo changes.
@ -1071,7 +1021,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
false); false);
connection->connectionStart = GetCurrentTimestamp(); connection->connectionStart = GetCurrentTimestamp();
connection->connectionId = connectionId++; connection->connectionId = connectionId++;
connection->purpose = CONNECTION_PURPOSE_ANY;
/* /*
* To avoid issues with interrupts not getting caught all our connections * To avoid issues with interrupts not getting caught all our connections
@ -1228,7 +1177,6 @@ ResetConnection(MultiConnection *connection)
/* reset copy state */ /* reset copy state */
connection->copyBytesWrittenSinceLastFlush = 0; connection->copyBytesWrittenSinceLastFlush = 0;
connection->purpose = CONNECTION_PURPOSE_ANY;
UnclaimConnection(connection); UnclaimConnection(connection);
} }

View File

@ -82,6 +82,7 @@ typedef struct RemoteFileDestReceiver
static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor); TupleDesc inputTupleDescriptor);
static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest);
static StringInfo ConstructCopyResultStatement(const char *resultId); static StringInfo ConstructCopyResultStatement(const char *resultId);
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
@ -233,14 +234,9 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
const char *resultId = resultDest->resultId;
const char *delimiterCharacter = "\t"; const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N"; const char *nullPrintCharacter = "\\N";
List *initialNodeList = resultDest->initialNodeList;
List *connectionList = NIL;
resultDest->tupleDescriptor = inputTupleDescriptor; resultDest->tupleDescriptor = inputTupleDescriptor;
/* define how tuples will be serialised */ /* define how tuples will be serialised */
@ -255,6 +251,21 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, resultDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary); copyOutState->binary);
}
/*
* PrepareIntermediateResultBroadcast gets a RemoteFileDestReceiver and does
* the necessary initilizations including initiating the remote connnections
* and creating the local file, which is necessary (it might be both).
*/
static void
PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest)
{
List *initialNodeList = resultDest->initialNodeList;
const char *resultId = resultDest->resultId;
List *connectionList = NIL;
CopyOutState copyOutState = resultDest->copyOutState;
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {
@ -274,17 +285,11 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, initialNodeList) foreach_ptr(workerNode, initialNodeList)
{ {
int flags = 0;
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
/*
* We prefer to use a connection that is not associcated with
* any placements. The reason is that we claim this connection
* exclusively and that would prevent the consecutive DML/DDL
* use the same connection.
*/
int flags = REQUIRE_SIDECHANNEL;
MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort); MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort);
ClaimConnectionExclusively(connection); ClaimConnectionExclusively(connection);
MarkRemoteTransactionCritical(connection); MarkRemoteTransactionCritical(connection);
@ -365,6 +370,15 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) dest;
if (resultDest->tuplesSent == 0)
{
/*
* We get the first tuple, lets initialize the remote connections
* and/or the local file.
*/
PrepareIntermediateResultBroadcast(resultDest);
}
TupleDesc tupleDescriptor = resultDest->tupleDescriptor; TupleDesc tupleDescriptor = resultDest->tupleDescriptor;
List *connectionList = resultDest->connectionList; List *connectionList = resultDest->connectionList;
@ -434,6 +448,17 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
{ {
RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver; RemoteFileDestReceiver *resultDest = (RemoteFileDestReceiver *) destReceiver;
if (resultDest->tuplesSent == 0)
{
/*
* We have not received any tuples (when the intermediate result
* returns zero rows). Still, we want to create the necessary
* intermediate result files even if they are empty, as the query
* execution requires the files to be present.
*/
PrepareIntermediateResultBroadcast(resultDest);
}
List *connectionList = resultDest->connectionList; List *connectionList = resultDest->connectionList;
CopyOutState copyOutState = resultDest->copyOutState; CopyOutState copyOutState = resultDest->copyOutState;
@ -770,11 +795,32 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
int statOK = stat(resultFileName, &fileStat); int statOK = stat(resultFileName, &fileStat);
if (statOK != 0) if (statOK != 0)
{ {
ereport(ERROR, (errcode_for_file_access(), /*
errmsg("result \"%s\" does not exist", resultId))); * When the file does not exist, it could mean two different things.
* First -- and a lot more common -- case is that a failure happened
* in a concurrent backend on the same distributed transaction. And,
* one of the backends in that transaction has already been roll
* backed, which has already removed the file. If we throw an error
* here, the user might see this error instead of the actual error
* message. Instead, we prefer to WARN the user and pretend that the
* file has no data in it. In the end, the user would see the actual
* error message for the failure.
*
* Second, in case of any bugs in intermediate result broadcasts,
* we could try to read a non-existing file. That is most likely
* to happen during development.
*/
ereport(WARNING, (errcode_for_file_access(),
errmsg("Query could not find the intermediate result file "
"\"%s\", it was mostly likely deleted due to an "
"error in a parallel process within the same "
"distributed transaction", resultId)));
}
else
{
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor,
tupleStore);
} }
ReadFileIntoTupleStore(resultFileName, copyFormat, tupleDescriptor, tupleStore);
} }
tuplestore_donestoring(tupleStore); tuplestore_donestoring(tupleStore);

View File

@ -52,28 +52,9 @@ enum MultiConnectionMode
/* open a connection per (co-located set of) placement(s) */ /* open a connection per (co-located set of) placement(s) */
CONNECTION_PER_PLACEMENT = 1 << 3, CONNECTION_PER_PLACEMENT = 1 << 3,
OUTSIDE_TRANSACTION = 1 << 4, OUTSIDE_TRANSACTION = 1 << 4
/* connection has not been used to access data */
REQUIRE_SIDECHANNEL = 1 << 5
}; };
/*
* ConnectionPurpose defines what a connection is used for during the
* current transaction. This is primarily to not allocate connections
* that are needed for data access to other purposes.
*/
typedef enum ConnectionPurpose
{
/* connection can be used for any purpose */
CONNECTION_PURPOSE_ANY,
/* connection can be used to access placements */
CONNECTION_PURPOSE_DATA_ACCESS,
/* connection can be used for auxiliary functions, but not data access */
CONNECTION_PURPOSE_SIDECHANNEL
} ConnectionPurpose;
typedef enum MultiConnectionState typedef enum MultiConnectionState
{ {
@ -116,9 +97,6 @@ typedef struct MultiConnection
/* is the connection currently in use, and shouldn't be used by anything else */ /* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively; bool claimedExclusively;
/* defines the purpose of the connection */
ConnectionPurpose purpose;
/* time connection establishment was started, for timeout */ /* time connection establishment was started, for timeout */
TimestampTz connectionStart; TimestampTz connectionStart;

View File

@ -6,6 +6,7 @@ SET SEARCH_PATH = multi_shard;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.next_shard_id TO 201000; SET citus.next_shard_id TO 201000;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
-- do not cache any connections -- do not cache any connections
SET citus.max_cached_conns_per_worker TO 0; SET citus.max_cached_conns_per_worker TO 0;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');

View File

@ -33,7 +33,11 @@ SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series
(1 row) (1 row)
SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int, x2 int);
ERROR: result "squares" does not exist WARNING: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
BEGIN; BEGIN;
CREATE TABLE interesting_squares (user_id text, interested_in text); CREATE TABLE interesting_squares (user_id text, interested_in text);
SELECT create_distributed_table('interesting_squares', 'user_id'); SELECT create_distributed_table('interesting_squares', 'user_id');
@ -83,30 +87,20 @@ ORDER BY x;
(3 rows) (3 rows)
END; END;
CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE '%does not exist%' THEN
RAISE 'Task failed to execute';
ELSIF SQLERRM LIKE '%could not receive query results%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- don't print the worker port
\set VERBOSITY terse
SET client_min_messages TO ERROR;
-- files should now be cleaned up -- files should now be cleaned up
SELECT raise_failed_execution_int_result($$ SET client_min_messages TO DEBUG;
SELECT x, x2 SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon' WHERE user_id = 'jon' OR true
ORDER BY x; ORDER BY x;
$$); DEBUG: Router planner cannot handle multi-shard select queries
ERROR: Task failed to execute DEBUG: Query could not find the intermediate result file "squares", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
\set VERBOSITY DEFAULT DETAIL: WARNING from localhost:xxxxx
SET client_min_messages TO DEFAULT; x | x2
---------------------------------------------------------------------
(0 rows)
RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding -- try to read the file as text, will fail because of binary encoding
BEGIN; BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');
@ -314,7 +308,11 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri
(1 row) (1 row)
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
-- error behaviour, and also check that results are deleted on rollback -- error behaviour, and also check that results are deleted on rollback
BEGIN; BEGIN;
SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s'); SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_series(1,3) s');
@ -325,10 +323,24 @@ SELECT create_intermediate_result('squares_1', 'SELECT s, s*s FROM generate_seri
SAVEPOINT s1; SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['notexistingfile', 'squares_1'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
1 | 1
2 | 4
3 | 9
(3 rows)
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'notexistingfile'], 'binary') AS res (x int, x2 int);
ERROR: result "notexistingfile" does not exist WARNING: Query could not find the intermediate result file "notexistingfile", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
1 | 1
2 | 4
3 | 9
(3 rows)
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', NULL], 'binary') AS res (x int, x2 int);
ERROR: null array element not allowed in this context ERROR: null array element not allowed in this context
@ -348,7 +360,11 @@ SELECT count(*) FROM read_intermediate_results(ARRAY[]::text[], 'binary') AS res
END; END;
SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
-- Test non-binary format: read_intermediate_results(..., 'text') -- Test non-binary format: read_intermediate_results(..., 'text')
BEGIN; BEGIN;
-- ROW(...) types switch the output format to text -- ROW(...) types switch the output format to text
@ -481,7 +497,12 @@ SELECT store_intermediate_result_on_node('localhost', :worker_1_port,
SAVEPOINT s1; SAVEPOINT s1;
-- results aren't available on coordinator yet -- results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 2 should fail -- fetch from worker 2 should fail
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port); SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_2_port);
@ -490,7 +511,12 @@ CONTEXT: while executing command on localhost:xxxxx
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
-- still, results aren't available on coordinator yet -- still, results aren't available on coordinator yet
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
-- fetch from worker 1 should succeed -- fetch from worker 1 should succeed
SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port); SELECT * FROM fetch_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'localhost', :worker_1_port);
@ -538,11 +564,15 @@ ERROR: worker array object cannot contain null values
END; END;
-- results should have been deleted after transaction commit -- results should have been deleted after transaction commit
SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int); SELECT * FROM read_intermediate_results(ARRAY['squares_1', 'squares_2']::text[], 'binary') AS res (x int, x2 int);
ERROR: result "squares_1" does not exist WARNING: Query could not find the intermediate result file "squares_1", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
WARNING: Query could not find the intermediate result file "squares_2", it was mostly likely deleted due to an error in a parallel process within the same distributed transaction
x | x2
---------------------------------------------------------------------
(0 rows)
DROP SCHEMA intermediate_results CASCADE; DROP SCHEMA intermediate_results CASCADE;
NOTICE: drop cascades to 5 other objects NOTICE: drop cascades to 4 other objects
DETAIL: drop cascades to table interesting_squares DETAIL: drop cascades to table interesting_squares
drop cascades to function raise_failed_execution_int_result(text)
drop cascades to type square_type drop cascades to type square_type
drop cascades to table stored_squares drop cascades to table stored_squares
drop cascades to table squares drop cascades to table squares

View File

@ -1011,6 +1011,11 @@ LEFT JOIN
WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); WHERE d1.user_id = d2.user_id )) AS bar USING (user_id);
ERROR: cannot pushdown the subquery ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
-- some test with failures
WITH a AS (SELECT * FROM users_table LIMIT 10)
SELECT user_id/0 FROM users_table JOIN a USING (user_id);
ERROR: division by zero
CONTEXT: while executing command on localhost:xxxxx
RESET citus.enable_cte_inlining; RESET citus.enable_cte_inlining;
DROP VIEW basic_view; DROP VIEW basic_view;
DROP VIEW cte_view; DROP VIEW cte_view;

View File

@ -7,6 +7,7 @@ SET SEARCH_PATH = multi_shard;
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
SET citus.next_shard_id TO 201000; SET citus.next_shard_id TO 201000;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.max_adaptive_executor_pool_size TO 1;
-- do not cache any connections -- do not cache any connections
SET citus.max_cached_conns_per_worker TO 0; SET citus.max_cached_conns_per_worker TO 0;

View File

@ -44,34 +44,14 @@ JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x int,
ORDER BY x; ORDER BY x;
END; END;
CREATE FUNCTION raise_failed_execution_int_result(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE '%does not exist%' THEN
RAISE 'Task failed to execute';
ELSIF SQLERRM LIKE '%could not receive query results%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- don't print the worker port
\set VERBOSITY terse
SET client_min_messages TO ERROR;
-- files should now be cleaned up -- files should now be cleaned up
SELECT raise_failed_execution_int_result($$ SET client_min_messages TO DEBUG;
SELECT x, x2 SELECT x, x2
FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in) FROM interesting_squares JOIN (SELECT * FROM read_intermediate_result('squares', 'binary') AS res (x text, x2 int)) squares ON (x = interested_in)
WHERE user_id = 'jon' WHERE user_id = 'jon' OR true
ORDER BY x; ORDER BY x;
$$);
\set VERBOSITY DEFAULT
SET client_min_messages TO DEFAULT;
RESET client_min_messages;
-- try to read the file as text, will fail because of binary encoding -- try to read the file as text, will fail because of binary encoding
BEGIN; BEGIN;
SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s'); SELECT create_intermediate_result('squares', 'SELECT s, s*s FROM generate_series(1,5) s');

View File

@ -695,8 +695,11 @@ LEFT JOIN
FROM distinct_undistribured d2 FROM distinct_undistribured d2
WHERE d1.user_id = d2.user_id )) AS bar USING (user_id); WHERE d1.user_id = d2.user_id )) AS bar USING (user_id);
RESET citus.enable_cte_inlining; -- some test with failures
WITH a AS (SELECT * FROM users_table LIMIT 10)
SELECT user_id/0 FROM users_table JOIN a USING (user_id);
RESET citus.enable_cte_inlining;
DROP VIEW basic_view; DROP VIEW basic_view;
DROP VIEW cte_view; DROP VIEW cte_view;