Compare commits

...

8 Commits
main ... v9.5.1

Author SHA1 Message Date
Onur Tirtir 4a1255fd10 Update CHANGELOG for 9.5.1
(cherry picked from commit dd3453ced5)

 Conflicts:
	CHANGELOG.md
2020-12-02 11:20:43 +03:00
Onur Tirtir 67004edf43 Bump version to 9.5.1 2020-12-02 11:20:43 +03:00
Onur Tirtir 789d441296 Handle invalid connection hash entries (#4362)
If MemoryContextAlloc errors out -e.g. during an OOM-, ConnectionHashEntry->connections
stays as NULL.

With this commit, we add isValid flag to ConnectionHashEntry that should be set to true
right after we allocate & initialize ConnectionHashEntry->connections list properly, and we
check it before accesing to ConnectionHashEntry->connections.
(cherry picked from commit 7f3d1182ed)
2020-12-01 11:07:12 +03:00
Önder Kalacı 6d06e9760a Enable parallel query on EXPLAIN ANALYZE (#4325)
It seems that we forgot to pass the revelant
flag to enable Postgres' parallel query
capabilities on the shards when user does
EXPLAIN ANALYZE on a distributed table.
(cherry picked from commit b0ddbbd33a)
2020-12-01 11:07:12 +03:00
Onder Kalaci 74f0dd0c25 Do not execute subplans multiple times with cursors
Before this commit, we let AdaptiveExecutorPreExecutorRun()
to be effective multiple times on every FETCH on cursors.
That does not affect the correctness of the query results,
but adds significant overhead.

(cherry picked from commit c433c66f2b)
2020-12-01 11:07:12 +03:00
Onder Kalaci e777daad22 Do not cache all the distributed table metadata during CitusTableTypeIdList()
CitusTableTypeIdList() function iterates on all the entries of pg_dist_partition
and loads all the metadata in to the cache. This can be quite memory intensive
especially when there are lots of distributed tables.

When partitioned tables are used, it is common to have many distributed tables
given that each partition also becomes a distributed table.

CitusTableTypeIdList() is used on every CREATE TABLE .. PARTITION OF.. command
as well. It means that, anytime a partition is created, Citus loads all the
metadata to the cache. Note that Citus typically only loads the accessed table's
metadata to the cache.

(cherry picked from commit 7accbff3f6)

 Conflicts:
	src/test/regress/bin/normalize.sed
2020-12-01 11:07:06 +03:00
Onur Tirtir 4e373fadd8 Update CHANGELOG for 9.5.0
(cherry picked from commit 52a5ab0751)
2020-11-11 16:02:17 +03:00
Hanefi Önaldı 35703d5e61
Bump Citus to 9.5.0 2020-11-09 13:16:05 +03:00
16 changed files with 567 additions and 37 deletions

View File

@ -1,3 +1,107 @@
### citus v9.5.1 (December 1, 2020) ###
* Enables PostgreSQL's parallel queries on EXPLAIN ANALYZE
* Fixes a bug that could cause excessive memory consumption when a partition is
created
* Fixes a bug that triggers subplan executions unnecessarily with cursors
* Fixes a segfault in connection management due to invalid connection hash
entries
### citus v9.5.0 (November 10, 2020) ###
* Adds support for PostgreSQL 13
* Removes the task-tracker executor
* Introduces citus local tables
* Introduces `undistribute_table` UDF to convert tables back to postgres tables
* Adds support for `EXPLAIN (ANALYZE) EXECUTE` and `EXPLAIN EXECUTE`
* Adds support for `EXPLAIN (ANALYZE, WAL)` for PG13
* Sorts the output of `EXPLAIN (ANALYZE)` by execution duration.
* Adds support for CREATE TABLE ... USING table_access_method
* Adds support for `WITH TIES` option in SELECT and INSERT SELECT queries
* Avoids taking multi-shard locks on workers
* Enforces `citus.max_shared_pool_size` config in COPY queries
* Enables custom aggregates with multiple parameters to be executed on workers
* Enforces `citus.max_intermediate_result_size` in local execution
* Improves cost estimation of INSERT SELECT plans
* Introduces delegation of procedures that read from reference tables
* Prevents pull-push execution for simple pushdownable subqueries
* Improves error message when creating a foreign key to a local table
* Makes `citus_prepare_pg_upgrade` idempotent by dropping transition tables
* Disallows `ON TRUE` outer joins with reference & distributed tables when
reference table is outer relation to avoid incorrect results
* Disallows field indirection in INSERT/UPDATE queries to avoid incorrect
results
* Disallows volatile functions in UPDATE subqueries to avoid incorrect results
* Fixes CREATE INDEX CONCURRENTLY crash with local execution
* Fixes `citus_finish_pg_upgrade` to drop all backup tables
* Fixes a bug that cause failures when `RECURSIVE VIEW` joined reference table
* Fixes DROP SEQUENCE failures when metadata syncing is enabled
* Fixes a bug that caused CREATE TABLE with CHECK constraint to fail
* Fixes a bug that could cause VACUUM to deadlock
* Fixes master_update_node failure when no background worker slots are available
* Fixes a bug that caused replica identity to not be propagated on shard repair
* Fixes a bug that could cause crashes after connection timeouts
* Fixes a bug that could cause crashes with certain compile flags
* Fixes a bug that could cause deadlocks on CREATE INDEX
* Fixes a bug with genetic query optimization in outer joins
* Fixes a crash when aggregating empty tables
* Fixes a crash with inserting domain constrained composite types
* Fixes a crash with multi-row & router INSERT's in local execution
* Fixes a possibility of doing temporary file cleanup more than once
* Fixes incorrect setting of join related fields
* Fixes memory issues around deparsing index commands
* Fixes reference table access tracking for sequential execution
* Fixes removal of a single node with only reference tables
* Fixes sending commands to coordinator when it is added as a worker
* Fixes write queries with const expressions and COLLATE in various places
* Fixes wrong cancellation message about distributed deadlock
### citus v9.4.2 (October 21, 2020) ###
* Fixes a bug that could lead to multiple maintenance daemons

32
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 9.5devel.
# Generated by GNU Autoconf 2.69 for Citus 9.5.1.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='9.5devel'
PACKAGE_STRING='Citus 9.5devel'
PACKAGE_VERSION='9.5.1'
PACKAGE_STRING='Citus 9.5.1'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -664,6 +664,7 @@ infodir
docdir
oldincludedir
includedir
runstatedir
localstatedir
sharedstatedir
sysconfdir
@ -740,6 +741,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -992,6 +994,15 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1129,7 +1140,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir
libdir localedir mandir runstatedir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@ -1242,7 +1253,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 9.5devel to adapt to many kinds of systems.
\`configure' configures Citus 9.5.1 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1282,6 +1293,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]
@ -1303,7 +1315,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 9.5devel:";;
short | recursive ) echo "Configuration of Citus 9.5.1:";;
esac
cat <<\_ACEOF
@ -1403,7 +1415,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 9.5devel
Citus configure 9.5.1
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1886,7 +1898,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 9.5devel, which was
It was created by Citus $as_me 9.5.1, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5055,7 +5067,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 9.5devel, which was
This file was extended by Citus $as_me 9.5.1, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5117,7 +5129,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 9.5devel
Citus config.status 9.5.1
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [9.5devel])
AC_INIT([Citus], [9.5.1])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -161,6 +161,12 @@ AfterXactConnectionHandling(bool isCommit)
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
if (!entry->isValid)
{
/* skip invalid connection hash entries */
continue;
}
AfterXactHostConnectionHandling(entry, isCommit);
/*
@ -289,11 +295,24 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
*/
ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
if (!found)
if (!found || !entry->isValid)
{
/*
* We are just building hash entry or previously it was left in an
* invalid state as we couldn't allocate memory for it.
* So initialize entry->connections list here.
*/
entry->isValid = false;
entry->connections = MemoryContextAlloc(ConnectionContext,
sizeof(dlist_head));
dlist_init(entry->connections);
/*
* If MemoryContextAlloc errors out -e.g. during an OOM-, entry->connections
* stays as NULL. So entry->isValid should be set to true right after we
* initialize entry->connections properly.
*/
entry->isValid = true;
}
/* if desired, check whether there's a usable connection */
@ -449,6 +468,12 @@ CloseAllConnectionsAfterTransaction(void)
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
if (!entry->isValid)
{
/* skip invalid connection hash entries */
continue;
}
dlist_iter iter;
dlist_head *connections = entry->connections;
@ -483,7 +508,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
ConnectionHashEntry *entry =
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
if (!found)
if (!found || !entry->isValid)
{
return false;
}
@ -509,6 +534,12 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
{
if (!entry->isValid)
{
/* skip invalid connection hash entries */
continue;
}
dlist_iter iter;
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
@ -584,6 +615,12 @@ ShutdownAllConnections(void)
hash_seq_init(&status, ConnectionHash);
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
{
if (!entry->isValid)
{
/* skip invalid connection hash entries */
continue;
}
dlist_iter iter;
dlist_foreach(iter, entry->connections)
@ -1194,6 +1231,12 @@ FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
static void
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
{
if (!entry || !entry->isValid)
{
/* callers only pass valid hash entries but let's be on the safe side */
ereport(ERROR, (errmsg("connection hash entry is NULL or invalid")));
}
dlist_mutable_iter iter;
int cachedConnectionCount = 0;

View File

@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution,
void
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
{
if (scanState->finishedPreScan)
{
/*
* Cursors (and hence RETURN QUERY syntax in pl/pgsql functions)
* may trigger AdaptiveExecutorPreExecutorRun() on every fetch
* operation. Though, we should only execute PreScan once.
*/
return;
}
DistributedPlan *distributedPlan = scanState->distributedPlan;
/*
@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
LockPartitionsForDistributedPlan(distributedPlan);
ExecuteSubPlans(distributedPlan);
scanState->finishedPreScan = true;
}

View File

@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
scanState->PreExecScan = &CitusPreExecScan;
scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
return (Node *) scanState;
}
@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan)
scanState->customScanState.methods =
&NonPushableInsertSelectCustomExecMethods;
scanState->finishedPreScan = false;
scanState->finishedRemoteScan = false;
return (Node *) scanState;
}

View File

@ -254,8 +254,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl
static void InvalidateDistTableCache(void);
static void InvalidateDistObjectCache(void);
static void InitializeTableCacheEntry(int64 shardId);
static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType
tableType);
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType);
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
@ -309,7 +309,7 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
{
return false;
}
return IsCitusTableTypeInternal(tableEntry, tableType);
return IsCitusTableTypeCacheEntry(tableEntry, tableType);
}
@ -320,7 +320,8 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
bool
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
{
return IsCitusTableTypeInternal(tableEntry, tableType);
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
tableEntry->replicationModel, tableType);
}
@ -329,47 +330,48 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
* the given table type group. For definition of table types, see CitusTableType.
*/
static bool
IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
CitusTableType tableType)
{
switch (tableType)
{
case HASH_DISTRIBUTED:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH;
return partitionMethod == DISTRIBUTE_BY_HASH;
}
case APPEND_DISTRIBUTED:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
return partitionMethod == DISTRIBUTE_BY_APPEND;
}
case RANGE_DISTRIBUTED:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE;
return partitionMethod == DISTRIBUTE_BY_RANGE;
}
case DISTRIBUTED_TABLE:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE ||
tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
return partitionMethod == DISTRIBUTE_BY_HASH ||
partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_APPEND;
}
case REFERENCE_TABLE:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
tableEntry->replicationModel == REPLICATION_MODEL_2PC;
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel == REPLICATION_MODEL_2PC;
}
case CITUS_LOCAL_TABLE:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
tableEntry->replicationModel != REPLICATION_MODEL_2PC;
return partitionMethod == DISTRIBUTE_BY_NONE &&
replicationModel != REPLICATION_MODEL_2PC;
}
case CITUS_TABLE_WITH_NO_DIST_KEY:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
return partitionMethod == DISTRIBUTE_BY_NONE;
}
case ANY_CITUS_TABLE_TYPE:
@ -3706,12 +3708,25 @@ CitusTableTypeIdList(CitusTableType citusTableType)
while (HeapTupleIsValid(heapTuple))
{
bool isNull = false;
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum);
if (IsCitusTableType(relationId, citusTableType))
Datum partMethodDatum =
heap_getattr(heapTuple, Anum_pg_dist_partition_partmethod,
tupleDescriptor, &isNull);
Datum replicationModelDatum =
heap_getattr(heapTuple, Anum_pg_dist_partition_repmodel,
tupleDescriptor, &isNull);
Oid partitionMethod = DatumGetChar(partMethodDatum);
Oid replicationModel = DatumGetChar(replicationModelDatum);
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, citusTableType))
{
Datum relationIdDatum = heap_getattr(heapTuple,
Anum_pg_dist_partition_logicalrelid,
tupleDescriptor, &isNull);
Oid relationId = DatumGetObjectId(relationIdDatum);
relationIdList = lappend_oid(relationIdList, relationId);
}

View File

@ -1063,7 +1063,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
INSTR_TIME_SET_CURRENT(planStart);
PlannedStmt *plan = pg_plan_query_compat(query, NULL, 0, NULL);
PlannedStmt *plan = pg_plan_query_compat(query, NULL, CURSOR_OPT_PARALLEL_OK, NULL);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);

View File

@ -20,6 +20,7 @@ typedef struct CitusScanState
CustomScanState customScanState; /* underlying custom scan node */
/* function that gets called before postgres starts its execution */
bool finishedPreScan; /* flag to check if the pre scan is finished */
void (*PreExecScan)(struct CitusScanState *scanState);
DistributedPlan *distributedPlan; /* distributed execution plan */

View File

@ -178,6 +178,9 @@ typedef struct ConnectionHashEntry
{
ConnectionHashKey key;
dlist_head *connections;
/* connections list is valid or not */
bool isValid;
} ConnectionHashEntry;
/* hash entry for cached connection parameters */

View File

@ -181,3 +181,6 @@ s/wrong data type: [0-9]+, expected [0-9]+/wrong data type: XXXX, expected XXXX/
# Errors with relation OID does not exist
s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/g
# ignore DEBUG1 messages that Postgres generates
/^DEBUG: rehashing catalog cache id [0-9]+$/d

View File

@ -0,0 +1,218 @@
CREATE SCHEMA cursors;
SET search_path TO cursors;
CREATE TABLE distributed_table (key int, value text);
SELECT create_distributed_table('distributed_table', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
-- it sometime in the future, so be mindful
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
-- pretty basic query with cursors
-- Citus should plan/execute once and pull
-- the results to coordinator, then serve it
-- from the coordinator
BEGIN;
SELECT simple_cursor_on_dist_table('cursor_1');
simple_cursor_on_dist_table
---------------------------------------------------------------------
cursor_1
(1 row)
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 5 IN cursor_1;
key
---------------------------------------------------------------------
0
1
2
3
4
(5 rows)
FETCH 50 IN cursor_1;
key
---------------------------------------------------------------------
5
6
7
8
9
(5 rows)
FETCH ALL IN cursor_1;
key
---------------------------------------------------------------------
(0 rows)
COMMIT;
BEGIN;
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
cursor_with_intermediate_result_on_dist_table
---------------------------------------------------------------------
cursor_1
(1 row)
-- multiple FETCH commands should not trigger re-running the subplans
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 5 IN cursor_1;
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
key
---------------------------------------------------------------------
0
1
2
3
4
(5 rows)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
5
(1 row)
FETCH ALL IN cursor_1;
key
---------------------------------------------------------------------
6
7
8
9
(4 rows)
FETCH 5 IN cursor_1;
key
---------------------------------------------------------------------
(0 rows)
COMMIT;
BEGIN;
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
cursor_with_intermediate_result_on_dist_table_with_param
---------------------------------------------------------------------
cursor_1
(1 row)
-- multiple FETCH commands should not trigger re-running the subplans
-- also test with parameters
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 1 IN cursor_1;
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
key
---------------------------------------------------------------------
0
(1 row)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
1
(1 row)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
2
(1 row)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
3
(1 row)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
4
(1 row)
FETCH 1 IN cursor_1;
key
---------------------------------------------------------------------
5
(1 row)
FETCH ALL IN cursor_1;
key
---------------------------------------------------------------------
6
7
8
9
(4 rows)
COMMIT;
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
BEGIN
return query
WITH cte AS
(SELECT dt.value
FROM distributed_table dt
WHERE dt.value in
(SELECT value
FROM distributed_table p
GROUP BY p.value
HAVING count(*) > 0))
SELECT * FROM cte;
END;
$function$ ;
SET citus.log_intermediate_results TO ON;
SET client_min_messages TO DEBUG1;
\set VERBOSITY terse
SELECT count(*) from (SELECT value_counter()) as foo;
DEBUG: CTE cte is going to be inlined via distributed planning
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0)
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count
---------------------------------------------------------------------
1001
(1 row)
BEGIN;
SELECT count(*) from (SELECT value_counter()) as foo;
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
count
---------------------------------------------------------------------
1001
(1 row)
COMMIT;
-- suppress NOTICEs
SET client_min_messages TO ERROR;
DROP SCHEMA cursors CASCADE;

View File

@ -1216,6 +1216,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
DO UPDATE SET
cardinality = enriched.cardinality + excluded.cardinality,
sum = enriched.sum + excluded.sum;
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: performing repartitioned INSERT ... SELECT

View File

@ -458,7 +458,7 @@ DROP TABLE prev_objects, extension_diff;
SHOW citus.version;
citus.version
---------------------------------------------------------------------
9.5devel
9.5.1
(1 row)
-- ensure no objects were created outside pg_catalog

View File

@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition
test: multi_agg_type_conversion multi_count_type_conversion
test: multi_partition_pruning single_hash_repartition_join
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
test: multi_null_minmax_value_pruning
test: multi_null_minmax_value_pruning cursors
test: multi_query_directory_cleanup
test: multi_task_assignment_policy multi_cross_shard
test: multi_utility_statements

View File

@ -0,0 +1,112 @@
CREATE SCHEMA cursors;
SET search_path TO cursors;
CREATE TABLE distributed_table (key int, value text);
SELECT create_distributed_table('distributed_table', 'key');
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
-- it sometime in the future, so be mindful
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
BEGIN
OPEN $1 FOR
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
RETURN $1;
END;
' LANGUAGE plpgsql;
-- pretty basic query with cursors
-- Citus should plan/execute once and pull
-- the results to coordinator, then serve it
-- from the coordinator
BEGIN;
SELECT simple_cursor_on_dist_table('cursor_1');
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 5 IN cursor_1;
FETCH 50 IN cursor_1;
FETCH ALL IN cursor_1;
COMMIT;
BEGIN;
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
-- multiple FETCH commands should not trigger re-running the subplans
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 5 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH ALL IN cursor_1;
FETCH 5 IN cursor_1;
COMMIT;
BEGIN;
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
-- multiple FETCH commands should not trigger re-running the subplans
-- also test with parameters
SET LOCAL citus.log_intermediate_results TO ON;
SET LOCAL client_min_messages TO DEBUG1;
FETCH 1 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH 1 IN cursor_1;
FETCH ALL IN cursor_1;
COMMIT;
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
BEGIN
return query
WITH cte AS
(SELECT dt.value
FROM distributed_table dt
WHERE dt.value in
(SELECT value
FROM distributed_table p
GROUP BY p.value
HAVING count(*) > 0))
SELECT * FROM cte;
END;
$function$ ;
SET citus.log_intermediate_results TO ON;
SET client_min_messages TO DEBUG1;
\set VERBOSITY terse
SELECT count(*) from (SELECT value_counter()) as foo;
BEGIN;
SELECT count(*) from (SELECT value_counter()) as foo;
COMMIT;
-- suppress NOTICEs
SET client_min_messages TO ERROR;
DROP SCHEMA cursors CASCADE;