Compare commits

...

9 Commits
main ... v7.4.1

15 changed files with 160 additions and 136 deletions

View File

@ -1,3 +1,9 @@
### citus v7.4.1 (June 20, 2018) ###
* Fixes a bug that could cause transactions to incorrectly proceed after failure
* Fixes a bug on INSERT ... SELECT queries in prepared statements
### citus v7.4.0 (May 15, 2018) ### ### citus v7.4.0 (May 15, 2018) ###
* Adds support for non-pushdownable subqueries and CTEs in UPDATE/DELETE queries * Adds support for non-pushdownable subqueries and CTEs in UPDATE/DELETE queries

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh #! /bin/sh
# Guess values for system-dependent variables and create Makefiles. # Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 7.4devel. # Generated by GNU Autoconf 2.69 for Citus 7.4.1.
# #
# #
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc. # Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package. # Identity of this package.
PACKAGE_NAME='Citus' PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus' PACKAGE_TARNAME='citus'
PACKAGE_VERSION='7.4devel' PACKAGE_VERSION='7.4.1'
PACKAGE_STRING='Citus 7.4devel' PACKAGE_STRING='Citus 7.4.1'
PACKAGE_BUGREPORT='' PACKAGE_BUGREPORT=''
PACKAGE_URL='' PACKAGE_URL=''
@ -1239,7 +1239,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing. # Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh. # This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF cat <<_ACEOF
\`configure' configures Citus 7.4devel to adapt to many kinds of systems. \`configure' configures Citus 7.4.1 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]... Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1300,7 +1300,7 @@ fi
if test -n "$ac_init_help"; then if test -n "$ac_init_help"; then
case $ac_init_help in case $ac_init_help in
short | recursive ) echo "Configuration of Citus 7.4devel:";; short | recursive ) echo "Configuration of Citus 7.4.1:";;
esac esac
cat <<\_ACEOF cat <<\_ACEOF
@ -1400,7 +1400,7 @@ fi
test -n "$ac_init_help" && exit $ac_status test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then if $ac_init_version; then
cat <<\_ACEOF cat <<\_ACEOF
Citus configure 7.4devel Citus configure 7.4.1
generated by GNU Autoconf 2.69 generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc. Copyright (C) 2012 Free Software Foundation, Inc.
@ -1883,7 +1883,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake. running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 7.4devel, which was It was created by Citus $as_me 7.4.1, which was
generated by GNU Autoconf 2.69. Invocation command line was generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@ $ $0 $@
@ -4701,7 +4701,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their # report actual input values of CONFIG_FILES etc. instead of their
# values after options handling. # values after options handling.
ac_log=" ac_log="
This file was extended by Citus $as_me 7.4devel, which was This file was extended by Citus $as_me 7.4.1, which was
generated by GNU Autoconf 2.69. Invocation command line was generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES CONFIG_FILES = $CONFIG_FILES
@ -4763,7 +4763,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`" ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\ ac_cs_version="\\
Citus config.status 7.4devel Citus config.status 7.4.1
configured by $0, generated by GNU Autoconf 2.69, configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\" with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked # everyone needing autoconf installed, the resulting files are checked
# into the SCM. # into the SCM.
AC_INIT([Citus], [7.4devel]) AC_INIT([Citus], [7.4.1])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.]) AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
# we'll need sed and awk for some of the version commands # we'll need sed and awk for some of the version commands

View File

@ -101,6 +101,7 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
int partitionColumnIndex = -1; int partitionColumnIndex = -1;
CitusCopyDestReceiver *copyDest = NULL; CitusCopyDestReceiver *copyDest = NULL;
Query *queryCopy = NULL;
partitionMethod = PartitionMethod(targetRelationId); partitionMethod = PartitionMethod(targetRelationId);
if (partitionMethod == DISTRIBUTE_BY_NONE) if (partitionMethod == DISTRIBUTE_BY_NONE)
@ -135,7 +136,14 @@ ExecuteSelectIntoRelation(Oid targetRelationId, List *insertTargetList,
partitionColumnIndex, executorState, partitionColumnIndex, executorState,
stopOnFailure); stopOnFailure);
ExecuteQueryIntoDestReceiver(selectQuery, paramListInfo, (DestReceiver *) copyDest); /*
* Make a copy of the query, since ExecuteQueryIntoDestReceiver may scribble on it
* and we want it to be replanned every time if it is stored in a prepared
* statement.
*/
queryCopy = copyObject(selectQuery);
ExecuteQueryIntoDestReceiver(queryCopy, paramListInfo, (DestReceiver *) copyDest);
executorState->es_processed = copyDest->tuplesSent; executorState->es_processed = copyDest->tuplesSent;

View File

@ -1297,18 +1297,18 @@ SendQueryInSingleRowMode(MultiConnection *connection, char *query,
if (querySent == 0) if (querySent == 0)
{ {
const bool raiseErrors = false; const bool raiseIfTransactionIsCritical = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors); HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical);
return false; return false;
} }
singleRowMode = PQsetSingleRowMode(connection->pgConn); singleRowMode = PQsetSingleRowMode(connection->pgConn);
if (singleRowMode == 0) if (singleRowMode == 0)
{ {
const bool raiseErrors = false; const bool raiseIfTransactionIsCritical = true;
HandleRemoteTransactionConnectionError(connection, raiseErrors); HandleRemoteTransactionConnectionError(connection, raiseIfTransactionIsCritical);
return false; return false;
} }
@ -1450,6 +1450,10 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
/*
* Mark transaction as failed, but don't throw an error. This allows us
* to give a more meaningful error message below.
*/
MarkRemoteTransactionFailed(connection, false); MarkRemoteTransactionFailed(connection, false);
/* /*
@ -1460,7 +1464,8 @@ StoreQueryResult(CitusScanState *scanState, MultiConnection *connection,
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (isConstraintViolation || failOnError) if (isConstraintViolation || failOnError ||
IsRemoteTransactionCritical(connection))
{ {
ReportResultError(connection, result, ERROR); ReportResultError(connection, result, ERROR);
} }
@ -1575,6 +1580,11 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
int category = 0; int category = 0;
bool isConstraintViolation = false; bool isConstraintViolation = false;
/*
* Mark transaction as failed, but don't throw an error even if the
* transaction is critical. This allows us to give a more meaningful
* error message below.
*/
MarkRemoteTransactionFailed(connection, false); MarkRemoteTransactionFailed(connection, false);
/* /*
@ -1585,7 +1595,8 @@ ConsumeQueryResult(MultiConnection *connection, bool failOnError, int64 *rows)
category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION); category = ERRCODE_TO_CATEGORY(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION);
isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category); isConstraintViolation = SqlStateMatchesCategory(sqlStateString, category);
if (isConstraintViolation || failOnError) if (isConstraintViolation || failOnError ||
IsRemoteTransactionCritical(connection))
{ {
ReportResultError(connection, result, ERROR); ReportResultError(connection, result, ERROR);
} }

View File

@ -43,7 +43,6 @@ static void StartRemoteTransactionSavepointRollback(MultiConnection *connection,
static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection, static void FinishRemoteTransactionSavepointRollback(MultiConnection *connection,
SubTransactionId subId); SubTransactionId subId);
static void CheckTransactionHealth(void);
static void Assign2PCIdentifier(MultiConnection *connection); static void Assign2PCIdentifier(MultiConnection *connection);
static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit); static void WarnAboutLeakedPreparedTransaction(MultiConnection *connection, bool commit);
@ -721,6 +720,19 @@ MarkRemoteTransactionCritical(struct MultiConnection *connection)
} }
/*
* IsRemoteTransactionCritical returns whether the remote transaction on
* the given connection has been marked as critical.
*/
bool
IsRemoteTransactionCritical(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
return transaction->transactionCritical;
}
/* /*
* CloseRemoteTransaction handles closing a connection that, potentially, is * CloseRemoteTransaction handles closing a connection that, potentially, is
* part of a coordinated transaction. This should only ever be called from * part of a coordinated transaction. This should only ever be called from
@ -824,12 +836,6 @@ CoordinatedRemoteTransactionsCommit(void)
List *connectionList = NIL; List *connectionList = NIL;
bool raiseInterrupts = false; bool raiseInterrupts = false;
/*
* Before starting to commit on any of the nodes - after which we can't
* completely roll-back anymore - check that things are in a good state.
*/
CheckTransactionHealth();
/* /*
* Issue appropriate transaction commands to remote nodes. If everything * Issue appropriate transaction commands to remote nodes. If everything
* went well that's going to be COMMIT or COMMIT PREPARED, if individual * went well that's going to be COMMIT or COMMIT PREPARED, if individual
@ -1216,13 +1222,13 @@ FinishRemoteTransactionSavepointRollback(MultiConnection *connection, SubTransac
/* /*
* CheckTransactionHealth checks if any of the participating transactions in a * CheckRemoteTransactionsHealth checks if any of the participating transactions in a
* coordinated transaction failed, and what consequence that should have. * coordinated transaction failed, and what consequence that should have.
* This needs to be called before the coordinated transaction commits (but * This needs to be called before the coordinated transaction commits (but
* after they've been PREPAREd if 2PC is in use). * after they've been PREPAREd if 2PC is in use).
*/ */
static void void
CheckTransactionHealth(void) CheckRemoteTransactionsHealth(void)
{ {
dlist_iter iter; dlist_iter iter;

View File

@ -283,9 +283,17 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
{ {
CoordinatedRemoteTransactionsPrepare(); CoordinatedRemoteTransactionsPrepare();
CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED; CurrentCoordinatedTransactionState = COORD_TRANS_PREPARED;
/*
* Make sure we did not have any failures on connections marked as
* critical before committing.
*/
CheckRemoteTransactionsHealth();
} }
else else
{ {
CheckRemoteTransactionsHealth();
/* /*
* Have to commit remote transactions in PRE_COMMIT, to allow * Have to commit remote transactions in PRE_COMMIT, to allow
* us to mark failed placements as invalid. Better don't use * us to mark failed placements as invalid. Better don't use

View File

@ -115,6 +115,7 @@ extern void HandleRemoteTransactionResultError(struct MultiConnection *connectio
extern void MarkRemoteTransactionFailed(struct MultiConnection *connection, extern void MarkRemoteTransactionFailed(struct MultiConnection *connection,
bool allowErrorPromotion); bool allowErrorPromotion);
extern void MarkRemoteTransactionCritical(struct MultiConnection *connection); extern void MarkRemoteTransactionCritical(struct MultiConnection *connection);
extern bool IsRemoteTransactionCritical(struct MultiConnection *connection);
/* /*
@ -129,6 +130,7 @@ extern void ResetRemoteTransaction(struct MultiConnection *connection);
extern void CoordinatedRemoteTransactionsPrepare(void); extern void CoordinatedRemoteTransactionsPrepare(void);
extern void CoordinatedRemoteTransactionsCommit(void); extern void CoordinatedRemoteTransactionsCommit(void);
extern void CoordinatedRemoteTransactionsAbort(void); extern void CoordinatedRemoteTransactionsAbort(void);
extern void CheckRemoteTransactionsHealth(void);
/* remote savepoint commands */ /* remote savepoint commands */
extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId); extern void CoordinatedRemoteTransactionsSavepointBegin(SubTransactionId subId);

View File

@ -140,7 +140,7 @@ ALTER EXTENSION citus UPDATE TO '7.4-3';
SHOW citus.version; SHOW citus.version;
citus.version citus.version
--------------- ---------------
7.4devel 7.4.1
(1 row) (1 row)
-- ensure no objects were created outside pg_catalog -- ensure no objects were created outside pg_catalog

View File

@ -2715,6 +2715,19 @@ SELECT * FROM coerce_agg;
2 | {2,2,2} 2 | {2,2,2}
(2 rows) (2 rows)
-- INSERT..SELECT + prepared statements + recursive planning
BEGIN;
PREPARE prepared_recursive_insert_select AS
INSERT INTO users_table
SELECT * FROM users_table
WHERE value_1 IN (SELECT value_2 FROM events_table OFFSET 0);
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
ROLLBACK;
-- wrap in a transaction to improve performance -- wrap in a transaction to improve performance
BEGIN; BEGIN;
DROP TABLE coerce_events; DROP TABLE coerce_events;

View File

@ -72,9 +72,8 @@ DEBUG: generating subplan 10_1 for subquery SELECT x FROM recursive_set_local.t
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT (i.i OPERATOR(pg_catalog./) 0) FROM generate_series(0, 100) i(i) ORDER BY 1 DESC DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT intermediate_result.x FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer) INTERSECT SELECT (i.i OPERATOR(pg_catalog./) 0) FROM generate_series(0, 100) i(i) ORDER BY 1 DESC
DEBUG: Creating router plan DEBUG: Creating router plan
DEBUG: Plan is router executable DEBUG: Plan is router executable
WARNING: division by zero ERROR: division by zero
CONTEXT: while executing command on localhost:57637 CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
-- we should be able to run set operations with generate series and local tables as well -- we should be able to run set operations with generate series and local tables as well
((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC; ((SELECT x FROM local_test) UNION ALL (SELECT x FROM test)) INTERSECT (SELECT i FROM generate_series(0, 100) i) ORDER BY 1 DESC;
DEBUG: generating subplan 12_1 for subquery SELECT x FROM recursive_set_local.local_test DEBUG: generating subplan 12_1 for subquery SELECT x FROM recursive_set_local.local_test

View File

@ -500,7 +500,7 @@ ORDER BY
1 | 3.2857142857142857 | 1.00000000000000000000 1 | 3.2857142857142857 | 1.00000000000000000000
(6 rows) (6 rows)
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
AVG(avg(value_1)) OVER (PARTITION BY user_id, max(user_id), MIN(value_2)), AVG(avg(value_1)) OVER (PARTITION BY user_id, max(user_id), MIN(value_2)),
@ -511,26 +511,26 @@ GROUP BY
1 1
ORDER BY ORDER BY
3 DESC, 2 DESC, 1 DESC; 3 DESC, 2 DESC, 1 DESC;
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------- -----------------------------------------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0) Sort
Sort Key: remote_scan.avg_1 DESC, remote_scan.avg DESC, remote_scan.user_id DESC Sort Key: remote_scan.avg_1 DESC, remote_scan.avg DESC, remote_scan.user_id DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id Group Key: remote_scan.user_id
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=5.06..5.80 rows=33 width=128) -> WindowAgg
-> Sort (cost=5.06..5.14 rows=33 width=128) -> Sort
Sort Key: users_table.user_id, (min(users_table.user_id)), (avg(users_table.value_1)) Sort Key: users_table.user_id, (min(users_table.user_id)), (avg(users_table.value_1))
-> WindowAgg (cost=3.48..4.22 rows=33 width=128) -> WindowAgg
-> Sort (cost=3.48..3.56 rows=33 width=96) -> Sort
Sort Key: users_table.user_id, (max(users_table.user_id)), (min(users_table.value_2)) Sort Key: users_table.user_id, (max(users_table.user_id)), (min(users_table.value_2))
-> HashAggregate (cost=2.07..2.65 rows=33 width=96) -> HashAggregate
Group Key: users_table.user_id Group Key: users_table.user_id
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(18 rows) (18 rows)
SELECT SELECT
@ -579,36 +579,6 @@ ORDER BY
6 | 5 | 3.0000000000000000 6 | 5 | 3.0000000000000000
(32 rows) (32 rows)
EXPLAIN
SELECT
user_id,
1 + sum(value_1),
1 + AVG(value_2) OVER (partition by user_id)
FROM
users_table
GROUP BY
user_id, value_2
ORDER BY
user_id, value_2;
QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0)
Sort Key: remote_scan.user_id, remote_scan.worker_column_4
-> HashAggregate (cost=0.00..0.00 rows=0 width=0)
Group Key: remote_scan.user_id, remote_scan.worker_column_4
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0)
Task Count: 4
Tasks Shown: One of 4
-> Task
Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=2.16..3.40 rows=33 width=48)
-> GroupAggregate (cost=2.16..2.82 rows=33 width=16)
Group Key: users_table.user_id, users_table.value_2
-> Sort (cost=2.16..2.24 rows=33 width=12)
Sort Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12)
(15 rows)
SELECT SELECT
user_id, user_id,
1 + sum(value_1), 1 + sum(value_1),
@ -719,7 +689,7 @@ ORDER BY user_id, avg(value_1) DESC;
6 | 1.00000000000000000000 | 5 6 | 1.00000000000000000000 | 5
(32 rows) (32 rows)
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -730,21 +700,21 @@ GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC; ORDER BY user_id, avg(value_1) DESC;
QUERY PLAN QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------
Sort (cost=0.00..0.00 rows=0 width=0) Sort
Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5 Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=3.32..4.14 rows=33 width=80) -> WindowAgg
-> Sort (cost=3.32..3.40 rows=33 width=56) -> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1)))) Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate (cost=1.91..2.49 rows=33 width=56) -> HashAggregate
Group Key: users_table.user_id, users_table.value_2 Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(15 rows) (15 rows)
-- order by in the window function is same as avg(value_1) DESC -- order by in the window function is same as avg(value_1) DESC
@ -793,7 +763,7 @@ ORDER BY user_id, avg(value_1) DESC;
(32 rows) (32 rows)
-- limit is not pushed down to worker !! -- limit is not pushed down to worker !!
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -805,25 +775,25 @@ ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
QUERY PLAN QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0) Limit
-> Sort (cost=0.00..0.00 rows=0 width=0) -> Sort
Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5 Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=3.32..4.14 rows=33 width=80) -> WindowAgg
-> Sort (cost=3.32..3.40 rows=33 width=56) -> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1)))) Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate (cost=1.91..2.49 rows=33 width=56) -> HashAggregate
Group Key: users_table.user_id, users_table.value_2 Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(16 rows) (16 rows)
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -835,25 +805,25 @@ ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
QUERY PLAN QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0) Limit
-> Sort (cost=0.00..0.00 rows=0 width=0) -> Sort
Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5 Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=3.32..4.14 rows=33 width=80) -> WindowAgg
-> Sort (cost=3.32..3.40 rows=33 width=56) -> Sort
Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1)))) Sort Key: users_table.user_id, (('1'::numeric / ('1'::numeric + avg(users_table.value_1))))
-> HashAggregate (cost=1.91..2.49 rows=33 width=56) -> HashAggregate
Group Key: users_table.user_id, users_table.value_2 Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(16 rows) (16 rows)
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -865,25 +835,25 @@ ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
QUERY PLAN QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0) Limit
-> Sort (cost=0.00..0.00 rows=0 width=0) -> Sort
Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5 Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=3.15..3.98 rows=33 width=48) -> WindowAgg
-> Sort (cost=3.15..3.23 rows=33 width=40) -> Sort
Sort Key: users_table.user_id, ((1 / (1 + sum(users_table.value_2)))) Sort Key: users_table.user_id, ((1 / (1 + sum(users_table.value_2))))
-> HashAggregate (cost=1.83..2.32 rows=33 width=40) -> HashAggregate
Group Key: users_table.user_id, users_table.value_2 Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(16 rows) (16 rows)
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -895,21 +865,21 @@ ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
QUERY PLAN QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Limit (cost=0.00..0.00 rows=0 width=0) Limit
-> Sort (cost=0.00..0.00 rows=0 width=0) -> Sort
Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC Sort Key: remote_scan.user_id, (pg_catalog.sum(((pg_catalog.sum(remote_scan.avg) / pg_catalog.sum(remote_scan.avg_1)))) / pg_catalog.sum(remote_scan.rank)) DESC
-> HashAggregate (cost=0.00..0.00 rows=0 width=0) -> HashAggregate
Group Key: remote_scan.user_id, remote_scan.worker_column_5 Group Key: remote_scan.user_id, remote_scan.worker_column_5
-> Custom Scan (Citus Real-Time) (cost=0.00..0.00 rows=0 width=0) -> Custom Scan (Citus Real-Time)
Task Count: 4 Task Count: 4
Tasks Shown: One of 4 Tasks Shown: One of 4
-> Task -> Task
Node: host=localhost port=57637 dbname=regression Node: host=localhost port=57637 dbname=regression
-> WindowAgg (cost=2.90..3.56 rows=33 width=40) -> WindowAgg
-> Sort (cost=2.90..2.99 rows=33 width=32) -> Sort
Sort Key: users_table.user_id, (sum(users_table.value_2)) Sort Key: users_table.user_id, (sum(users_table.value_2))
-> HashAggregate (cost=1.74..2.07 rows=33 width=32) -> HashAggregate
Group Key: users_table.user_id, users_table.value_2 Group Key: users_table.user_id, users_table.value_2
-> Seq Scan on users_table_1400256 users_table (cost=0.00..1.33 rows=33 width=12) -> Seq Scan on users_table_1400256 users_table
(16 rows) (16 rows)

View File

@ -106,9 +106,8 @@ WITH cte AS (
SELECT user_id FROM users_table WHERE value_2 IN (1, 2) SELECT user_id FROM users_table WHERE value_2 IN (1, 2)
) )
SELECT (SELECT * FROM cte); SELECT (SELECT * FROM cte);
WARNING: more than one row returned by a subquery used as an expression ERROR: more than one row returned by a subquery used as an expression
CONTEXT: while executing command on localhost:57637 CONTEXT: while executing command on localhost:57637
ERROR: could not receive query results
WITH cte_basic AS ( WITH cte_basic AS (
SELECT user_id FROM users_table WHERE user_id = 1 SELECT user_id FROM users_table WHERE user_id = 1
) )

View File

@ -2082,6 +2082,20 @@ LIMIT 5;
SELECT * FROM coerce_agg; SELECT * FROM coerce_agg;
-- INSERT..SELECT + prepared statements + recursive planning
BEGIN;
PREPARE prepared_recursive_insert_select AS
INSERT INTO users_table
SELECT * FROM users_table
WHERE value_1 IN (SELECT value_2 FROM events_table OFFSET 0);
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
EXECUTE prepared_recursive_insert_select;
ROLLBACK;
-- wrap in a transaction to improve performance -- wrap in a transaction to improve performance
BEGIN; BEGIN;
DROP TABLE coerce_events; DROP TABLE coerce_events;

View File

@ -288,7 +288,7 @@ GROUP BY
ORDER BY ORDER BY
3 DESC, 2 DESC, 1 DESC; 3 DESC, 2 DESC, 1 DESC;
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
AVG(avg(value_1)) OVER (PARTITION BY user_id, max(user_id), MIN(value_2)), AVG(avg(value_1)) OVER (PARTITION BY user_id, max(user_id), MIN(value_2)),
@ -311,18 +311,6 @@ GROUP BY
ORDER BY ORDER BY
user_id, value_2; user_id, value_2;
EXPLAIN
SELECT
user_id,
1 + sum(value_1),
1 + AVG(value_2) OVER (partition by user_id)
FROM
users_table
GROUP BY
user_id, value_2
ORDER BY
user_id, value_2;
SELECT SELECT
user_id, user_id,
1 + sum(value_1), 1 + sum(value_1),
@ -355,7 +343,7 @@ FROM
GROUP BY user_id, value_2 GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC; ORDER BY user_id, avg(value_1) DESC;
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -376,7 +364,7 @@ GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC; ORDER BY user_id, avg(value_1) DESC;
-- limit is not pushed down to worker !! -- limit is not pushed down to worker !!
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -387,7 +375,7 @@ GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -398,7 +386,7 @@ GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),
@ -409,7 +397,7 @@ GROUP BY user_id, value_2
ORDER BY user_id, avg(value_1) DESC ORDER BY user_id, avg(value_1) DESC
LIMIT 5; LIMIT 5;
EXPLAIN EXPLAIN (COSTS FALSE)
SELECT SELECT
user_id, user_id,
avg(value_1), avg(value_1),