mirror of https://github.com/citusdata/citus.git
Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
|
4a1255fd10 | |
|
67004edf43 | |
|
789d441296 | |
|
6d06e9760a | |
|
74f0dd0c25 | |
|
e777daad22 | |
|
4e373fadd8 | |
|
35703d5e61 |
104
CHANGELOG.md
104
CHANGELOG.md
|
@ -1,3 +1,107 @@
|
||||||
|
### citus v9.5.1 (December 1, 2020) ###
|
||||||
|
|
||||||
|
* Enables PostgreSQL's parallel queries on EXPLAIN ANALYZE
|
||||||
|
|
||||||
|
* Fixes a bug that could cause excessive memory consumption when a partition is
|
||||||
|
created
|
||||||
|
|
||||||
|
* Fixes a bug that triggers subplan executions unnecessarily with cursors
|
||||||
|
|
||||||
|
* Fixes a segfault in connection management due to invalid connection hash
|
||||||
|
entries
|
||||||
|
|
||||||
|
### citus v9.5.0 (November 10, 2020) ###
|
||||||
|
|
||||||
|
* Adds support for PostgreSQL 13
|
||||||
|
|
||||||
|
* Removes the task-tracker executor
|
||||||
|
|
||||||
|
* Introduces citus local tables
|
||||||
|
|
||||||
|
* Introduces `undistribute_table` UDF to convert tables back to postgres tables
|
||||||
|
|
||||||
|
* Adds support for `EXPLAIN (ANALYZE) EXECUTE` and `EXPLAIN EXECUTE`
|
||||||
|
|
||||||
|
* Adds support for `EXPLAIN (ANALYZE, WAL)` for PG13
|
||||||
|
|
||||||
|
* Sorts the output of `EXPLAIN (ANALYZE)` by execution duration.
|
||||||
|
|
||||||
|
* Adds support for CREATE TABLE ... USING table_access_method
|
||||||
|
|
||||||
|
* Adds support for `WITH TIES` option in SELECT and INSERT SELECT queries
|
||||||
|
|
||||||
|
* Avoids taking multi-shard locks on workers
|
||||||
|
|
||||||
|
* Enforces `citus.max_shared_pool_size` config in COPY queries
|
||||||
|
|
||||||
|
* Enables custom aggregates with multiple parameters to be executed on workers
|
||||||
|
|
||||||
|
* Enforces `citus.max_intermediate_result_size` in local execution
|
||||||
|
|
||||||
|
* Improves cost estimation of INSERT SELECT plans
|
||||||
|
|
||||||
|
* Introduces delegation of procedures that read from reference tables
|
||||||
|
|
||||||
|
* Prevents pull-push execution for simple pushdownable subqueries
|
||||||
|
|
||||||
|
* Improves error message when creating a foreign key to a local table
|
||||||
|
|
||||||
|
* Makes `citus_prepare_pg_upgrade` idempotent by dropping transition tables
|
||||||
|
|
||||||
|
* Disallows `ON TRUE` outer joins with reference & distributed tables when
|
||||||
|
reference table is outer relation to avoid incorrect results
|
||||||
|
|
||||||
|
* Disallows field indirection in INSERT/UPDATE queries to avoid incorrect
|
||||||
|
results
|
||||||
|
|
||||||
|
* Disallows volatile functions in UPDATE subqueries to avoid incorrect results
|
||||||
|
|
||||||
|
* Fixes CREATE INDEX CONCURRENTLY crash with local execution
|
||||||
|
|
||||||
|
* Fixes `citus_finish_pg_upgrade` to drop all backup tables
|
||||||
|
|
||||||
|
* Fixes a bug that cause failures when `RECURSIVE VIEW` joined reference table
|
||||||
|
|
||||||
|
* Fixes DROP SEQUENCE failures when metadata syncing is enabled
|
||||||
|
|
||||||
|
* Fixes a bug that caused CREATE TABLE with CHECK constraint to fail
|
||||||
|
|
||||||
|
* Fixes a bug that could cause VACUUM to deadlock
|
||||||
|
|
||||||
|
* Fixes master_update_node failure when no background worker slots are available
|
||||||
|
|
||||||
|
* Fixes a bug that caused replica identity to not be propagated on shard repair
|
||||||
|
|
||||||
|
* Fixes a bug that could cause crashes after connection timeouts
|
||||||
|
|
||||||
|
* Fixes a bug that could cause crashes with certain compile flags
|
||||||
|
|
||||||
|
* Fixes a bug that could cause deadlocks on CREATE INDEX
|
||||||
|
|
||||||
|
* Fixes a bug with genetic query optimization in outer joins
|
||||||
|
|
||||||
|
* Fixes a crash when aggregating empty tables
|
||||||
|
|
||||||
|
* Fixes a crash with inserting domain constrained composite types
|
||||||
|
|
||||||
|
* Fixes a crash with multi-row & router INSERT's in local execution
|
||||||
|
|
||||||
|
* Fixes a possibility of doing temporary file cleanup more than once
|
||||||
|
|
||||||
|
* Fixes incorrect setting of join related fields
|
||||||
|
|
||||||
|
* Fixes memory issues around deparsing index commands
|
||||||
|
|
||||||
|
* Fixes reference table access tracking for sequential execution
|
||||||
|
|
||||||
|
* Fixes removal of a single node with only reference tables
|
||||||
|
|
||||||
|
* Fixes sending commands to coordinator when it is added as a worker
|
||||||
|
|
||||||
|
* Fixes write queries with const expressions and COLLATE in various places
|
||||||
|
|
||||||
|
* Fixes wrong cancellation message about distributed deadlock
|
||||||
|
|
||||||
### citus v9.4.2 (October 21, 2020) ###
|
### citus v9.4.2 (October 21, 2020) ###
|
||||||
|
|
||||||
* Fixes a bug that could lead to multiple maintenance daemons
|
* Fixes a bug that could lead to multiple maintenance daemons
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# Guess values for system-dependent variables and create Makefiles.
|
# Guess values for system-dependent variables and create Makefiles.
|
||||||
# Generated by GNU Autoconf 2.69 for Citus 9.5devel.
|
# Generated by GNU Autoconf 2.69 for Citus 9.5.1.
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
||||||
# Identity of this package.
|
# Identity of this package.
|
||||||
PACKAGE_NAME='Citus'
|
PACKAGE_NAME='Citus'
|
||||||
PACKAGE_TARNAME='citus'
|
PACKAGE_TARNAME='citus'
|
||||||
PACKAGE_VERSION='9.5devel'
|
PACKAGE_VERSION='9.5.1'
|
||||||
PACKAGE_STRING='Citus 9.5devel'
|
PACKAGE_STRING='Citus 9.5.1'
|
||||||
PACKAGE_BUGREPORT=''
|
PACKAGE_BUGREPORT=''
|
||||||
PACKAGE_URL=''
|
PACKAGE_URL=''
|
||||||
|
|
||||||
|
@ -664,6 +664,7 @@ infodir
|
||||||
docdir
|
docdir
|
||||||
oldincludedir
|
oldincludedir
|
||||||
includedir
|
includedir
|
||||||
|
runstatedir
|
||||||
localstatedir
|
localstatedir
|
||||||
sharedstatedir
|
sharedstatedir
|
||||||
sysconfdir
|
sysconfdir
|
||||||
|
@ -740,6 +741,7 @@ datadir='${datarootdir}'
|
||||||
sysconfdir='${prefix}/etc'
|
sysconfdir='${prefix}/etc'
|
||||||
sharedstatedir='${prefix}/com'
|
sharedstatedir='${prefix}/com'
|
||||||
localstatedir='${prefix}/var'
|
localstatedir='${prefix}/var'
|
||||||
|
runstatedir='${localstatedir}/run'
|
||||||
includedir='${prefix}/include'
|
includedir='${prefix}/include'
|
||||||
oldincludedir='/usr/include'
|
oldincludedir='/usr/include'
|
||||||
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
|
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
|
||||||
|
@ -992,6 +994,15 @@ do
|
||||||
| -silent | --silent | --silen | --sile | --sil)
|
| -silent | --silent | --silen | --sile | --sil)
|
||||||
silent=yes ;;
|
silent=yes ;;
|
||||||
|
|
||||||
|
-runstatedir | --runstatedir | --runstatedi | --runstated \
|
||||||
|
| --runstate | --runstat | --runsta | --runst | --runs \
|
||||||
|
| --run | --ru | --r)
|
||||||
|
ac_prev=runstatedir ;;
|
||||||
|
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
|
||||||
|
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
|
||||||
|
| --run=* | --ru=* | --r=*)
|
||||||
|
runstatedir=$ac_optarg ;;
|
||||||
|
|
||||||
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
|
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
|
||||||
ac_prev=sbindir ;;
|
ac_prev=sbindir ;;
|
||||||
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
|
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
|
||||||
|
@ -1129,7 +1140,7 @@ fi
|
||||||
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
|
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
|
||||||
datadir sysconfdir sharedstatedir localstatedir includedir \
|
datadir sysconfdir sharedstatedir localstatedir includedir \
|
||||||
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
|
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
|
||||||
libdir localedir mandir
|
libdir localedir mandir runstatedir
|
||||||
do
|
do
|
||||||
eval ac_val=\$$ac_var
|
eval ac_val=\$$ac_var
|
||||||
# Remove trailing slashes.
|
# Remove trailing slashes.
|
||||||
|
@ -1242,7 +1253,7 @@ if test "$ac_init_help" = "long"; then
|
||||||
# Omit some internal or obsolete options to make the list less imposing.
|
# Omit some internal or obsolete options to make the list less imposing.
|
||||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||||
cat <<_ACEOF
|
cat <<_ACEOF
|
||||||
\`configure' configures Citus 9.5devel to adapt to many kinds of systems.
|
\`configure' configures Citus 9.5.1 to adapt to many kinds of systems.
|
||||||
|
|
||||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
|
|
||||||
|
@ -1282,6 +1293,7 @@ Fine tuning of the installation directories:
|
||||||
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
|
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
|
||||||
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
|
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
|
||||||
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
|
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
|
||||||
|
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
|
||||||
--libdir=DIR object code libraries [EPREFIX/lib]
|
--libdir=DIR object code libraries [EPREFIX/lib]
|
||||||
--includedir=DIR C header files [PREFIX/include]
|
--includedir=DIR C header files [PREFIX/include]
|
||||||
--oldincludedir=DIR C header files for non-gcc [/usr/include]
|
--oldincludedir=DIR C header files for non-gcc [/usr/include]
|
||||||
|
@ -1303,7 +1315,7 @@ fi
|
||||||
|
|
||||||
if test -n "$ac_init_help"; then
|
if test -n "$ac_init_help"; then
|
||||||
case $ac_init_help in
|
case $ac_init_help in
|
||||||
short | recursive ) echo "Configuration of Citus 9.5devel:";;
|
short | recursive ) echo "Configuration of Citus 9.5.1:";;
|
||||||
esac
|
esac
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
|
|
||||||
|
@ -1403,7 +1415,7 @@ fi
|
||||||
test -n "$ac_init_help" && exit $ac_status
|
test -n "$ac_init_help" && exit $ac_status
|
||||||
if $ac_init_version; then
|
if $ac_init_version; then
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
Citus configure 9.5devel
|
Citus configure 9.5.1
|
||||||
generated by GNU Autoconf 2.69
|
generated by GNU Autoconf 2.69
|
||||||
|
|
||||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||||
|
@ -1886,7 +1898,7 @@ cat >config.log <<_ACEOF
|
||||||
This file contains any messages produced by compilers while
|
This file contains any messages produced by compilers while
|
||||||
running configure, to aid debugging if configure makes a mistake.
|
running configure, to aid debugging if configure makes a mistake.
|
||||||
|
|
||||||
It was created by Citus $as_me 9.5devel, which was
|
It was created by Citus $as_me 9.5.1, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
$ $0 $@
|
$ $0 $@
|
||||||
|
@ -5055,7 +5067,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
||||||
# report actual input values of CONFIG_FILES etc. instead of their
|
# report actual input values of CONFIG_FILES etc. instead of their
|
||||||
# values after options handling.
|
# values after options handling.
|
||||||
ac_log="
|
ac_log="
|
||||||
This file was extended by Citus $as_me 9.5devel, which was
|
This file was extended by Citus $as_me 9.5.1, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
CONFIG_FILES = $CONFIG_FILES
|
CONFIG_FILES = $CONFIG_FILES
|
||||||
|
@ -5117,7 +5129,7 @@ _ACEOF
|
||||||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||||
ac_cs_version="\\
|
ac_cs_version="\\
|
||||||
Citus config.status 9.5devel
|
Citus config.status 9.5.1
|
||||||
configured by $0, generated by GNU Autoconf 2.69,
|
configured by $0, generated by GNU Autoconf 2.69,
|
||||||
with options \\"\$ac_cs_config\\"
|
with options \\"\$ac_cs_config\\"
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
# everyone needing autoconf installed, the resulting files are checked
|
# everyone needing autoconf installed, the resulting files are checked
|
||||||
# into the SCM.
|
# into the SCM.
|
||||||
|
|
||||||
AC_INIT([Citus], [9.5devel])
|
AC_INIT([Citus], [9.5.1])
|
||||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||||
|
|
||||||
# we'll need sed and awk for some of the version commands
|
# we'll need sed and awk for some of the version commands
|
||||||
|
|
|
@ -161,6 +161,12 @@ AfterXactConnectionHandling(bool isCommit)
|
||||||
hash_seq_init(&status, ConnectionHash);
|
hash_seq_init(&status, ConnectionHash);
|
||||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
|
if (!entry->isValid)
|
||||||
|
{
|
||||||
|
/* skip invalid connection hash entries */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
AfterXactHostConnectionHandling(entry, isCommit);
|
AfterXactHostConnectionHandling(entry, isCommit);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -289,11 +295,24 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
*/
|
*/
|
||||||
|
|
||||||
ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
|
ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
|
||||||
if (!found)
|
if (!found || !entry->isValid)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We are just building hash entry or previously it was left in an
|
||||||
|
* invalid state as we couldn't allocate memory for it.
|
||||||
|
* So initialize entry->connections list here.
|
||||||
|
*/
|
||||||
|
entry->isValid = false;
|
||||||
entry->connections = MemoryContextAlloc(ConnectionContext,
|
entry->connections = MemoryContextAlloc(ConnectionContext,
|
||||||
sizeof(dlist_head));
|
sizeof(dlist_head));
|
||||||
dlist_init(entry->connections);
|
dlist_init(entry->connections);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If MemoryContextAlloc errors out -e.g. during an OOM-, entry->connections
|
||||||
|
* stays as NULL. So entry->isValid should be set to true right after we
|
||||||
|
* initialize entry->connections properly.
|
||||||
|
*/
|
||||||
|
entry->isValid = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if desired, check whether there's a usable connection */
|
/* if desired, check whether there's a usable connection */
|
||||||
|
@ -449,6 +468,12 @@ CloseAllConnectionsAfterTransaction(void)
|
||||||
hash_seq_init(&status, ConnectionHash);
|
hash_seq_init(&status, ConnectionHash);
|
||||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
|
if (!entry->isValid)
|
||||||
|
{
|
||||||
|
/* skip invalid connection hash entries */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
dlist_iter iter;
|
dlist_iter iter;
|
||||||
|
|
||||||
dlist_head *connections = entry->connections;
|
dlist_head *connections = entry->connections;
|
||||||
|
@ -483,7 +508,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
|
||||||
ConnectionHashEntry *entry =
|
ConnectionHashEntry *entry =
|
||||||
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
|
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
|
||||||
|
|
||||||
if (!found)
|
if (!found || !entry->isValid)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -509,6 +534,12 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
||||||
hash_seq_init(&status, ConnectionHash);
|
hash_seq_init(&status, ConnectionHash);
|
||||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||||
{
|
{
|
||||||
|
if (!entry->isValid)
|
||||||
|
{
|
||||||
|
/* skip invalid connection hash entries */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
dlist_iter iter;
|
dlist_iter iter;
|
||||||
|
|
||||||
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
||||||
|
@ -584,6 +615,12 @@ ShutdownAllConnections(void)
|
||||||
hash_seq_init(&status, ConnectionHash);
|
hash_seq_init(&status, ConnectionHash);
|
||||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
|
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
|
||||||
{
|
{
|
||||||
|
if (!entry->isValid)
|
||||||
|
{
|
||||||
|
/* skip invalid connection hash entries */
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
dlist_iter iter;
|
dlist_iter iter;
|
||||||
|
|
||||||
dlist_foreach(iter, entry->connections)
|
dlist_foreach(iter, entry->connections)
|
||||||
|
@ -1194,6 +1231,12 @@ FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
|
||||||
static void
|
static void
|
||||||
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||||
{
|
{
|
||||||
|
if (!entry || !entry->isValid)
|
||||||
|
{
|
||||||
|
/* callers only pass valid hash entries but let's be on the safe side */
|
||||||
|
ereport(ERROR, (errmsg("connection hash entry is NULL or invalid")));
|
||||||
|
}
|
||||||
|
|
||||||
dlist_mutable_iter iter;
|
dlist_mutable_iter iter;
|
||||||
int cachedConnectionCount = 0;
|
int cachedConnectionCount = 0;
|
||||||
|
|
||||||
|
|
|
@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution,
|
||||||
void
|
void
|
||||||
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
{
|
{
|
||||||
|
if (scanState->finishedPreScan)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Cursors (and hence RETURN QUERY syntax in pl/pgsql functions)
|
||||||
|
* may trigger AdaptiveExecutorPreExecutorRun() on every fetch
|
||||||
|
* operation. Though, we should only execute PreScan once.
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||||
LockPartitionsForDistributedPlan(distributedPlan);
|
LockPartitionsForDistributedPlan(distributedPlan);
|
||||||
|
|
||||||
ExecuteSubPlans(distributedPlan);
|
ExecuteSubPlans(distributedPlan);
|
||||||
|
|
||||||
|
scanState->finishedPreScan = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
|
||||||
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
|
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
|
||||||
scanState->PreExecScan = &CitusPreExecScan;
|
scanState->PreExecScan = &CitusPreExecScan;
|
||||||
|
|
||||||
|
scanState->finishedPreScan = false;
|
||||||
|
scanState->finishedRemoteScan = false;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan)
|
||||||
scanState->customScanState.methods =
|
scanState->customScanState.methods =
|
||||||
&NonPushableInsertSelectCustomExecMethods;
|
&NonPushableInsertSelectCustomExecMethods;
|
||||||
|
|
||||||
|
scanState->finishedPreScan = false;
|
||||||
|
scanState->finishedRemoteScan = false;
|
||||||
|
|
||||||
return (Node *) scanState;
|
return (Node *) scanState;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -254,8 +254,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl
|
||||||
static void InvalidateDistTableCache(void);
|
static void InvalidateDistTableCache(void);
|
||||||
static void InvalidateDistObjectCache(void);
|
static void InvalidateDistObjectCache(void);
|
||||||
static void InitializeTableCacheEntry(int64 shardId);
|
static void InitializeTableCacheEntry(int64 shardId);
|
||||||
static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType
|
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
tableType);
|
CitusTableType tableType);
|
||||||
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
|
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
|
||||||
|
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return IsCitusTableTypeInternal(tableEntry, tableType);
|
return IsCitusTableTypeCacheEntry(tableEntry, tableType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -320,7 +320,8 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
|
||||||
bool
|
bool
|
||||||
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||||
{
|
{
|
||||||
return IsCitusTableTypeInternal(tableEntry, tableType);
|
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
|
||||||
|
tableEntry->replicationModel, tableType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -329,47 +330,48 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
|
||||||
* the given table type group. For definition of table types, see CitusTableType.
|
* the given table type group. For definition of table types, see CitusTableType.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
|
CitusTableType tableType)
|
||||||
{
|
{
|
||||||
switch (tableType)
|
switch (tableType)
|
||||||
{
|
{
|
||||||
case HASH_DISTRIBUTED:
|
case HASH_DISTRIBUTED:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH;
|
return partitionMethod == DISTRIBUTE_BY_HASH;
|
||||||
}
|
}
|
||||||
|
|
||||||
case APPEND_DISTRIBUTED:
|
case APPEND_DISTRIBUTED:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
return partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||||
}
|
}
|
||||||
|
|
||||||
case RANGE_DISTRIBUTED:
|
case RANGE_DISTRIBUTED:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE;
|
return partitionMethod == DISTRIBUTE_BY_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
case DISTRIBUTED_TABLE:
|
case DISTRIBUTED_TABLE:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
return partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE ||
|
partitionMethod == DISTRIBUTE_BY_RANGE ||
|
||||||
tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||||
}
|
}
|
||||||
|
|
||||||
case REFERENCE_TABLE:
|
case REFERENCE_TABLE:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
tableEntry->replicationModel == REPLICATION_MODEL_2PC;
|
replicationModel == REPLICATION_MODEL_2PC;
|
||||||
}
|
}
|
||||||
|
|
||||||
case CITUS_LOCAL_TABLE:
|
case CITUS_LOCAL_TABLE:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
tableEntry->replicationModel != REPLICATION_MODEL_2PC;
|
replicationModel != REPLICATION_MODEL_2PC;
|
||||||
}
|
}
|
||||||
|
|
||||||
case CITUS_TABLE_WITH_NO_DIST_KEY:
|
case CITUS_TABLE_WITH_NO_DIST_KEY:
|
||||||
{
|
{
|
||||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
|
return partitionMethod == DISTRIBUTE_BY_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
case ANY_CITUS_TABLE_TYPE:
|
case ANY_CITUS_TABLE_TYPE:
|
||||||
|
@ -3706,12 +3708,25 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
||||||
while (HeapTupleIsValid(heapTuple))
|
while (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
bool isNull = false;
|
bool isNull = false;
|
||||||
Datum relationIdDatum = heap_getattr(heapTuple,
|
|
||||||
Anum_pg_dist_partition_logicalrelid,
|
Datum partMethodDatum =
|
||||||
tupleDescriptor, &isNull);
|
heap_getattr(heapTuple, Anum_pg_dist_partition_partmethod,
|
||||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
tupleDescriptor, &isNull);
|
||||||
if (IsCitusTableType(relationId, citusTableType))
|
Datum replicationModelDatum =
|
||||||
|
heap_getattr(heapTuple, Anum_pg_dist_partition_repmodel,
|
||||||
|
tupleDescriptor, &isNull);
|
||||||
|
|
||||||
|
Oid partitionMethod = DatumGetChar(partMethodDatum);
|
||||||
|
Oid replicationModel = DatumGetChar(replicationModelDatum);
|
||||||
|
|
||||||
|
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, citusTableType))
|
||||||
{
|
{
|
||||||
|
Datum relationIdDatum = heap_getattr(heapTuple,
|
||||||
|
Anum_pg_dist_partition_logicalrelid,
|
||||||
|
tupleDescriptor, &isNull);
|
||||||
|
|
||||||
|
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||||
|
|
||||||
relationIdList = lappend_oid(relationIdList, relationId);
|
relationIdList = lappend_oid(relationIdList, relationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1063,7 +1063,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
INSTR_TIME_SET_CURRENT(planStart);
|
INSTR_TIME_SET_CURRENT(planStart);
|
||||||
|
|
||||||
PlannedStmt *plan = pg_plan_query_compat(query, NULL, 0, NULL);
|
PlannedStmt *plan = pg_plan_query_compat(query, NULL, CURSOR_OPT_PARALLEL_OK, NULL);
|
||||||
|
|
||||||
INSTR_TIME_SET_CURRENT(planDuration);
|
INSTR_TIME_SET_CURRENT(planDuration);
|
||||||
INSTR_TIME_SUBTRACT(planDuration, planStart);
|
INSTR_TIME_SUBTRACT(planDuration, planStart);
|
||||||
|
|
|
@ -20,6 +20,7 @@ typedef struct CitusScanState
|
||||||
CustomScanState customScanState; /* underlying custom scan node */
|
CustomScanState customScanState; /* underlying custom scan node */
|
||||||
|
|
||||||
/* function that gets called before postgres starts its execution */
|
/* function that gets called before postgres starts its execution */
|
||||||
|
bool finishedPreScan; /* flag to check if the pre scan is finished */
|
||||||
void (*PreExecScan)(struct CitusScanState *scanState);
|
void (*PreExecScan)(struct CitusScanState *scanState);
|
||||||
|
|
||||||
DistributedPlan *distributedPlan; /* distributed execution plan */
|
DistributedPlan *distributedPlan; /* distributed execution plan */
|
||||||
|
|
|
@ -178,6 +178,9 @@ typedef struct ConnectionHashEntry
|
||||||
{
|
{
|
||||||
ConnectionHashKey key;
|
ConnectionHashKey key;
|
||||||
dlist_head *connections;
|
dlist_head *connections;
|
||||||
|
|
||||||
|
/* connections list is valid or not */
|
||||||
|
bool isValid;
|
||||||
} ConnectionHashEntry;
|
} ConnectionHashEntry;
|
||||||
|
|
||||||
/* hash entry for cached connection parameters */
|
/* hash entry for cached connection parameters */
|
||||||
|
|
|
@ -181,3 +181,6 @@ s/wrong data type: [0-9]+, expected [0-9]+/wrong data type: XXXX, expected XXXX/
|
||||||
|
|
||||||
# Errors with relation OID does not exist
|
# Errors with relation OID does not exist
|
||||||
s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/g
|
s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/g
|
||||||
|
|
||||||
|
# ignore DEBUG1 messages that Postgres generates
|
||||||
|
/^DEBUG: rehashing catalog cache id [0-9]+$/d
|
||||||
|
|
|
@ -0,0 +1,218 @@
|
||||||
|
CREATE SCHEMA cursors;
|
||||||
|
SET search_path TO cursors;
|
||||||
|
CREATE TABLE distributed_table (key int, value text);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||||
|
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||||
|
-- it sometime in the future, so be mindful
|
||||||
|
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||||
|
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
-- pretty basic query with cursors
|
||||||
|
-- Citus should plan/execute once and pull
|
||||||
|
-- the results to coordinator, then serve it
|
||||||
|
-- from the coordinator
|
||||||
|
BEGIN;
|
||||||
|
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||||
|
simple_cursor_on_dist_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH 50 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||||
|
cursor_with_intermediate_result_on_dist_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
1
|
||||||
|
2
|
||||||
|
3
|
||||||
|
4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||||
|
cursor_with_intermediate_result_on_dist_table_with_param
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
cursor_1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
-- also test with parameters
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
2
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
key
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
6
|
||||||
|
7
|
||||||
|
8
|
||||||
|
9
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||||
|
BEGIN
|
||||||
|
return query
|
||||||
|
WITH cte AS
|
||||||
|
(SELECT dt.value
|
||||||
|
FROM distributed_table dt
|
||||||
|
WHERE dt.value in
|
||||||
|
(SELECT value
|
||||||
|
FROM distributed_table p
|
||||||
|
GROUP BY p.value
|
||||||
|
HAVING count(*) > 0))
|
||||||
|
|
||||||
|
SELECT * FROM cte;
|
||||||
|
END;
|
||||||
|
$function$ ;
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
DEBUG: CTE cte is going to be inlined via distributed planning
|
||||||
|
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0)
|
||||||
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1001
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1001
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- suppress NOTICEs
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA cursors CASCADE;
|
|
@ -1216,6 +1216,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
||||||
DO UPDATE SET
|
DO UPDATE SET
|
||||||
cardinality = enriched.cardinality + excluded.cardinality,
|
cardinality = enriched.cardinality + excluded.cardinality,
|
||||||
sum = enriched.sum + excluded.sum;
|
sum = enriched.sum + excluded.sum;
|
||||||
|
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
|
||||||
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: performing repartitioned INSERT ... SELECT
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
|
|
@ -458,7 +458,7 @@ DROP TABLE prev_objects, extension_diff;
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
9.5devel
|
9.5.1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
|
|
|
@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition
|
||||||
test: multi_agg_type_conversion multi_count_type_conversion
|
test: multi_agg_type_conversion multi_count_type_conversion
|
||||||
test: multi_partition_pruning single_hash_repartition_join
|
test: multi_partition_pruning single_hash_repartition_join
|
||||||
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
||||||
test: multi_null_minmax_value_pruning
|
test: multi_null_minmax_value_pruning cursors
|
||||||
test: multi_query_directory_cleanup
|
test: multi_query_directory_cleanup
|
||||||
test: multi_task_assignment_policy multi_cross_shard
|
test: multi_task_assignment_policy multi_cross_shard
|
||||||
test: multi_utility_statements
|
test: multi_utility_statements
|
||||||
|
|
|
@ -0,0 +1,112 @@
|
||||||
|
CREATE SCHEMA cursors;
|
||||||
|
SET search_path TO cursors;
|
||||||
|
|
||||||
|
CREATE TABLE distributed_table (key int, value text);
|
||||||
|
SELECT create_distributed_table('distributed_table', 'key');
|
||||||
|
|
||||||
|
|
||||||
|
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||||
|
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||||
|
-- it sometime in the future, so be mindful
|
||||||
|
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||||
|
BEGIN
|
||||||
|
OPEN $1 FOR
|
||||||
|
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||||
|
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||||
|
RETURN $1;
|
||||||
|
END;
|
||||||
|
' LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
|
||||||
|
-- pretty basic query with cursors
|
||||||
|
-- Citus should plan/execute once and pull
|
||||||
|
-- the results to coordinator, then serve it
|
||||||
|
-- from the coordinator
|
||||||
|
BEGIN;
|
||||||
|
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
FETCH 50 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
FETCH 5 IN cursor_1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||||
|
|
||||||
|
-- multiple FETCH commands should not trigger re-running the subplans
|
||||||
|
-- also test with parameters
|
||||||
|
SET LOCAL citus.log_intermediate_results TO ON;
|
||||||
|
SET LOCAL client_min_messages TO DEBUG1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH 1 IN cursor_1;
|
||||||
|
FETCH ALL IN cursor_1;
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||||
|
BEGIN
|
||||||
|
return query
|
||||||
|
WITH cte AS
|
||||||
|
(SELECT dt.value
|
||||||
|
FROM distributed_table dt
|
||||||
|
WHERE dt.value in
|
||||||
|
(SELECT value
|
||||||
|
FROM distributed_table p
|
||||||
|
GROUP BY p.value
|
||||||
|
HAVING count(*) > 0))
|
||||||
|
|
||||||
|
SELECT * FROM cte;
|
||||||
|
END;
|
||||||
|
$function$ ;
|
||||||
|
|
||||||
|
SET citus.log_intermediate_results TO ON;
|
||||||
|
SET client_min_messages TO DEBUG1;
|
||||||
|
\set VERBOSITY terse
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
BEGIN;
|
||||||
|
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- suppress NOTICEs
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA cursors CASCADE;
|
Loading…
Reference in New Issue