mirror of https://github.com/citusdata/citus.git
Compare commits
8 Commits
Author | SHA1 | Date |
---|---|---|
|
4a1255fd10 | |
|
67004edf43 | |
|
789d441296 | |
|
6d06e9760a | |
|
74f0dd0c25 | |
|
e777daad22 | |
|
4e373fadd8 | |
|
35703d5e61 |
104
CHANGELOG.md
104
CHANGELOG.md
|
@ -1,3 +1,107 @@
|
|||
### citus v9.5.1 (December 1, 2020) ###
|
||||
|
||||
* Enables PostgreSQL's parallel queries on EXPLAIN ANALYZE
|
||||
|
||||
* Fixes a bug that could cause excessive memory consumption when a partition is
|
||||
created
|
||||
|
||||
* Fixes a bug that triggers subplan executions unnecessarily with cursors
|
||||
|
||||
* Fixes a segfault in connection management due to invalid connection hash
|
||||
entries
|
||||
|
||||
### citus v9.5.0 (November 10, 2020) ###
|
||||
|
||||
* Adds support for PostgreSQL 13
|
||||
|
||||
* Removes the task-tracker executor
|
||||
|
||||
* Introduces citus local tables
|
||||
|
||||
* Introduces `undistribute_table` UDF to convert tables back to postgres tables
|
||||
|
||||
* Adds support for `EXPLAIN (ANALYZE) EXECUTE` and `EXPLAIN EXECUTE`
|
||||
|
||||
* Adds support for `EXPLAIN (ANALYZE, WAL)` for PG13
|
||||
|
||||
* Sorts the output of `EXPLAIN (ANALYZE)` by execution duration.
|
||||
|
||||
* Adds support for CREATE TABLE ... USING table_access_method
|
||||
|
||||
* Adds support for `WITH TIES` option in SELECT and INSERT SELECT queries
|
||||
|
||||
* Avoids taking multi-shard locks on workers
|
||||
|
||||
* Enforces `citus.max_shared_pool_size` config in COPY queries
|
||||
|
||||
* Enables custom aggregates with multiple parameters to be executed on workers
|
||||
|
||||
* Enforces `citus.max_intermediate_result_size` in local execution
|
||||
|
||||
* Improves cost estimation of INSERT SELECT plans
|
||||
|
||||
* Introduces delegation of procedures that read from reference tables
|
||||
|
||||
* Prevents pull-push execution for simple pushdownable subqueries
|
||||
|
||||
* Improves error message when creating a foreign key to a local table
|
||||
|
||||
* Makes `citus_prepare_pg_upgrade` idempotent by dropping transition tables
|
||||
|
||||
* Disallows `ON TRUE` outer joins with reference & distributed tables when
|
||||
reference table is outer relation to avoid incorrect results
|
||||
|
||||
* Disallows field indirection in INSERT/UPDATE queries to avoid incorrect
|
||||
results
|
||||
|
||||
* Disallows volatile functions in UPDATE subqueries to avoid incorrect results
|
||||
|
||||
* Fixes CREATE INDEX CONCURRENTLY crash with local execution
|
||||
|
||||
* Fixes `citus_finish_pg_upgrade` to drop all backup tables
|
||||
|
||||
* Fixes a bug that cause failures when `RECURSIVE VIEW` joined reference table
|
||||
|
||||
* Fixes DROP SEQUENCE failures when metadata syncing is enabled
|
||||
|
||||
* Fixes a bug that caused CREATE TABLE with CHECK constraint to fail
|
||||
|
||||
* Fixes a bug that could cause VACUUM to deadlock
|
||||
|
||||
* Fixes master_update_node failure when no background worker slots are available
|
||||
|
||||
* Fixes a bug that caused replica identity to not be propagated on shard repair
|
||||
|
||||
* Fixes a bug that could cause crashes after connection timeouts
|
||||
|
||||
* Fixes a bug that could cause crashes with certain compile flags
|
||||
|
||||
* Fixes a bug that could cause deadlocks on CREATE INDEX
|
||||
|
||||
* Fixes a bug with genetic query optimization in outer joins
|
||||
|
||||
* Fixes a crash when aggregating empty tables
|
||||
|
||||
* Fixes a crash with inserting domain constrained composite types
|
||||
|
||||
* Fixes a crash with multi-row & router INSERT's in local execution
|
||||
|
||||
* Fixes a possibility of doing temporary file cleanup more than once
|
||||
|
||||
* Fixes incorrect setting of join related fields
|
||||
|
||||
* Fixes memory issues around deparsing index commands
|
||||
|
||||
* Fixes reference table access tracking for sequential execution
|
||||
|
||||
* Fixes removal of a single node with only reference tables
|
||||
|
||||
* Fixes sending commands to coordinator when it is added as a worker
|
||||
|
||||
* Fixes write queries with const expressions and COLLATE in various places
|
||||
|
||||
* Fixes wrong cancellation message about distributed deadlock
|
||||
|
||||
### citus v9.4.2 (October 21, 2020) ###
|
||||
|
||||
* Fixes a bug that could lead to multiple maintenance daemons
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
#! /bin/sh
|
||||
# Guess values for system-dependent variables and create Makefiles.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 9.5devel.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 9.5.1.
|
||||
#
|
||||
#
|
||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
|||
# Identity of this package.
|
||||
PACKAGE_NAME='Citus'
|
||||
PACKAGE_TARNAME='citus'
|
||||
PACKAGE_VERSION='9.5devel'
|
||||
PACKAGE_STRING='Citus 9.5devel'
|
||||
PACKAGE_VERSION='9.5.1'
|
||||
PACKAGE_STRING='Citus 9.5.1'
|
||||
PACKAGE_BUGREPORT=''
|
||||
PACKAGE_URL=''
|
||||
|
||||
|
@ -664,6 +664,7 @@ infodir
|
|||
docdir
|
||||
oldincludedir
|
||||
includedir
|
||||
runstatedir
|
||||
localstatedir
|
||||
sharedstatedir
|
||||
sysconfdir
|
||||
|
@ -740,6 +741,7 @@ datadir='${datarootdir}'
|
|||
sysconfdir='${prefix}/etc'
|
||||
sharedstatedir='${prefix}/com'
|
||||
localstatedir='${prefix}/var'
|
||||
runstatedir='${localstatedir}/run'
|
||||
includedir='${prefix}/include'
|
||||
oldincludedir='/usr/include'
|
||||
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
|
||||
|
@ -992,6 +994,15 @@ do
|
|||
| -silent | --silent | --silen | --sile | --sil)
|
||||
silent=yes ;;
|
||||
|
||||
-runstatedir | --runstatedir | --runstatedi | --runstated \
|
||||
| --runstate | --runstat | --runsta | --runst | --runs \
|
||||
| --run | --ru | --r)
|
||||
ac_prev=runstatedir ;;
|
||||
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
|
||||
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
|
||||
| --run=* | --ru=* | --r=*)
|
||||
runstatedir=$ac_optarg ;;
|
||||
|
||||
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
|
||||
ac_prev=sbindir ;;
|
||||
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
|
||||
|
@ -1129,7 +1140,7 @@ fi
|
|||
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
|
||||
datadir sysconfdir sharedstatedir localstatedir includedir \
|
||||
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
|
||||
libdir localedir mandir
|
||||
libdir localedir mandir runstatedir
|
||||
do
|
||||
eval ac_val=\$$ac_var
|
||||
# Remove trailing slashes.
|
||||
|
@ -1242,7 +1253,7 @@ if test "$ac_init_help" = "long"; then
|
|||
# Omit some internal or obsolete options to make the list less imposing.
|
||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||
cat <<_ACEOF
|
||||
\`configure' configures Citus 9.5devel to adapt to many kinds of systems.
|
||||
\`configure' configures Citus 9.5.1 to adapt to many kinds of systems.
|
||||
|
||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||
|
||||
|
@ -1282,6 +1293,7 @@ Fine tuning of the installation directories:
|
|||
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
|
||||
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
|
||||
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
|
||||
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
|
||||
--libdir=DIR object code libraries [EPREFIX/lib]
|
||||
--includedir=DIR C header files [PREFIX/include]
|
||||
--oldincludedir=DIR C header files for non-gcc [/usr/include]
|
||||
|
@ -1303,7 +1315,7 @@ fi
|
|||
|
||||
if test -n "$ac_init_help"; then
|
||||
case $ac_init_help in
|
||||
short | recursive ) echo "Configuration of Citus 9.5devel:";;
|
||||
short | recursive ) echo "Configuration of Citus 9.5.1:";;
|
||||
esac
|
||||
cat <<\_ACEOF
|
||||
|
||||
|
@ -1403,7 +1415,7 @@ fi
|
|||
test -n "$ac_init_help" && exit $ac_status
|
||||
if $ac_init_version; then
|
||||
cat <<\_ACEOF
|
||||
Citus configure 9.5devel
|
||||
Citus configure 9.5.1
|
||||
generated by GNU Autoconf 2.69
|
||||
|
||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
@ -1886,7 +1898,7 @@ cat >config.log <<_ACEOF
|
|||
This file contains any messages produced by compilers while
|
||||
running configure, to aid debugging if configure makes a mistake.
|
||||
|
||||
It was created by Citus $as_me 9.5devel, which was
|
||||
It was created by Citus $as_me 9.5.1, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
$ $0 $@
|
||||
|
@ -5055,7 +5067,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
|||
# report actual input values of CONFIG_FILES etc. instead of their
|
||||
# values after options handling.
|
||||
ac_log="
|
||||
This file was extended by Citus $as_me 9.5devel, which was
|
||||
This file was extended by Citus $as_me 9.5.1, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
CONFIG_FILES = $CONFIG_FILES
|
||||
|
@ -5117,7 +5129,7 @@ _ACEOF
|
|||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||
ac_cs_version="\\
|
||||
Citus config.status 9.5devel
|
||||
Citus config.status 9.5.1
|
||||
configured by $0, generated by GNU Autoconf 2.69,
|
||||
with options \\"\$ac_cs_config\\"
|
||||
|
||||
|
|
|
@ -5,7 +5,7 @@
|
|||
# everyone needing autoconf installed, the resulting files are checked
|
||||
# into the SCM.
|
||||
|
||||
AC_INIT([Citus], [9.5devel])
|
||||
AC_INIT([Citus], [9.5.1])
|
||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||
|
||||
# we'll need sed and awk for some of the version commands
|
||||
|
|
|
@ -161,6 +161,12 @@ AfterXactConnectionHandling(bool isCommit)
|
|||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
if (!entry->isValid)
|
||||
{
|
||||
/* skip invalid connection hash entries */
|
||||
continue;
|
||||
}
|
||||
|
||||
AfterXactHostConnectionHandling(entry, isCommit);
|
||||
|
||||
/*
|
||||
|
@ -289,11 +295,24 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
|||
*/
|
||||
|
||||
ConnectionHashEntry *entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
|
||||
if (!found)
|
||||
if (!found || !entry->isValid)
|
||||
{
|
||||
/*
|
||||
* We are just building hash entry or previously it was left in an
|
||||
* invalid state as we couldn't allocate memory for it.
|
||||
* So initialize entry->connections list here.
|
||||
*/
|
||||
entry->isValid = false;
|
||||
entry->connections = MemoryContextAlloc(ConnectionContext,
|
||||
sizeof(dlist_head));
|
||||
dlist_init(entry->connections);
|
||||
|
||||
/*
|
||||
* If MemoryContextAlloc errors out -e.g. during an OOM-, entry->connections
|
||||
* stays as NULL. So entry->isValid should be set to true right after we
|
||||
* initialize entry->connections properly.
|
||||
*/
|
||||
entry->isValid = true;
|
||||
}
|
||||
|
||||
/* if desired, check whether there's a usable connection */
|
||||
|
@ -449,6 +468,12 @@ CloseAllConnectionsAfterTransaction(void)
|
|||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
if (!entry->isValid)
|
||||
{
|
||||
/* skip invalid connection hash entries */
|
||||
continue;
|
||||
}
|
||||
|
||||
dlist_iter iter;
|
||||
|
||||
dlist_head *connections = entry->connections;
|
||||
|
@ -483,7 +508,7 @@ ConnectionAvailableToNode(char *hostName, int nodePort, const char *userName,
|
|||
ConnectionHashEntry *entry =
|
||||
(ConnectionHashEntry *) hash_search(ConnectionHash, &key, HASH_FIND, &found);
|
||||
|
||||
if (!found)
|
||||
if (!found || !entry->isValid)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
@ -509,6 +534,12 @@ CloseNodeConnectionsAfterTransaction(char *nodeName, int nodePort)
|
|||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != 0)
|
||||
{
|
||||
if (!entry->isValid)
|
||||
{
|
||||
/* skip invalid connection hash entries */
|
||||
continue;
|
||||
}
|
||||
|
||||
dlist_iter iter;
|
||||
|
||||
if (strcmp(entry->key.hostname, nodeName) != 0 || entry->key.port != nodePort)
|
||||
|
@ -584,6 +615,12 @@ ShutdownAllConnections(void)
|
|||
hash_seq_init(&status, ConnectionHash);
|
||||
while ((entry = (ConnectionHashEntry *) hash_seq_search(&status)) != NULL)
|
||||
{
|
||||
if (!entry->isValid)
|
||||
{
|
||||
/* skip invalid connection hash entries */
|
||||
continue;
|
||||
}
|
||||
|
||||
dlist_iter iter;
|
||||
|
||||
dlist_foreach(iter, entry->connections)
|
||||
|
@ -1194,6 +1231,12 @@ FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry)
|
|||
static void
|
||||
AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit)
|
||||
{
|
||||
if (!entry || !entry->isValid)
|
||||
{
|
||||
/* callers only pass valid hash entries but let's be on the safe side */
|
||||
ereport(ERROR, (errmsg("connection hash entry is NULL or invalid")));
|
||||
}
|
||||
|
||||
dlist_mutable_iter iter;
|
||||
int cachedConnectionCount = 0;
|
||||
|
||||
|
|
|
@ -660,6 +660,16 @@ static void SetAttributeInputMetadata(DistributedExecution *execution,
|
|||
void
|
||||
AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
||||
{
|
||||
if (scanState->finishedPreScan)
|
||||
{
|
||||
/*
|
||||
* Cursors (and hence RETURN QUERY syntax in pl/pgsql functions)
|
||||
* may trigger AdaptiveExecutorPreExecutorRun() on every fetch
|
||||
* operation. Though, we should only execute PreScan once.
|
||||
*/
|
||||
return;
|
||||
}
|
||||
|
||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||
|
||||
/*
|
||||
|
@ -670,6 +680,8 @@ AdaptiveExecutorPreExecutorRun(CitusScanState *scanState)
|
|||
LockPartitionsForDistributedPlan(distributedPlan);
|
||||
|
||||
ExecuteSubPlans(distributedPlan);
|
||||
|
||||
scanState->finishedPreScan = true;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -558,6 +558,9 @@ AdaptiveExecutorCreateScan(CustomScan *scan)
|
|||
scanState->customScanState.methods = &AdaptiveExecutorCustomExecMethods;
|
||||
scanState->PreExecScan = &CitusPreExecScan;
|
||||
|
||||
scanState->finishedPreScan = false;
|
||||
scanState->finishedRemoteScan = false;
|
||||
|
||||
return (Node *) scanState;
|
||||
}
|
||||
|
||||
|
@ -578,6 +581,9 @@ NonPushableInsertSelectCreateScan(CustomScan *scan)
|
|||
scanState->customScanState.methods =
|
||||
&NonPushableInsertSelectCustomExecMethods;
|
||||
|
||||
scanState->finishedPreScan = false;
|
||||
scanState->finishedRemoteScan = false;
|
||||
|
||||
return (Node *) scanState;
|
||||
}
|
||||
|
||||
|
|
|
@ -254,8 +254,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl
|
|||
static void InvalidateDistTableCache(void);
|
||||
static void InvalidateDistObjectCache(void);
|
||||
static void InitializeTableCacheEntry(int64 shardId);
|
||||
static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType
|
||||
tableType);
|
||||
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||
CitusTableType tableType);
|
||||
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
|
||||
|
||||
|
||||
|
@ -309,7 +309,7 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
|
|||
{
|
||||
return false;
|
||||
}
|
||||
return IsCitusTableTypeInternal(tableEntry, tableType);
|
||||
return IsCitusTableTypeCacheEntry(tableEntry, tableType);
|
||||
}
|
||||
|
||||
|
||||
|
@ -320,7 +320,8 @@ IsCitusTableType(Oid relationId, CitusTableType tableType)
|
|||
bool
|
||||
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||
{
|
||||
return IsCitusTableTypeInternal(tableEntry, tableType);
|
||||
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
|
||||
tableEntry->replicationModel, tableType);
|
||||
}
|
||||
|
||||
|
||||
|
@ -329,47 +330,48 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
|
|||
* the given table type group. For definition of table types, see CitusTableType.
|
||||
*/
|
||||
static bool
|
||||
IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||
CitusTableType tableType)
|
||||
{
|
||||
switch (tableType)
|
||||
{
|
||||
case HASH_DISTRIBUTED:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH;
|
||||
return partitionMethod == DISTRIBUTE_BY_HASH;
|
||||
}
|
||||
|
||||
case APPEND_DISTRIBUTED:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||
return partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||
}
|
||||
|
||||
case RANGE_DISTRIBUTED:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE;
|
||||
return partitionMethod == DISTRIBUTE_BY_RANGE;
|
||||
}
|
||||
|
||||
case DISTRIBUTED_TABLE:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||
tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE ||
|
||||
tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||
return partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||
partitionMethod == DISTRIBUTE_BY_RANGE ||
|
||||
partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||
}
|
||||
|
||||
case REFERENCE_TABLE:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||
tableEntry->replicationModel == REPLICATION_MODEL_2PC;
|
||||
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||
replicationModel == REPLICATION_MODEL_2PC;
|
||||
}
|
||||
|
||||
case CITUS_LOCAL_TABLE:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||
tableEntry->replicationModel != REPLICATION_MODEL_2PC;
|
||||
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||
replicationModel != REPLICATION_MODEL_2PC;
|
||||
}
|
||||
|
||||
case CITUS_TABLE_WITH_NO_DIST_KEY:
|
||||
{
|
||||
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
|
||||
return partitionMethod == DISTRIBUTE_BY_NONE;
|
||||
}
|
||||
|
||||
case ANY_CITUS_TABLE_TYPE:
|
||||
|
@ -3706,12 +3708,25 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
|||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
bool isNull = false;
|
||||
Datum relationIdDatum = heap_getattr(heapTuple,
|
||||
Anum_pg_dist_partition_logicalrelid,
|
||||
tupleDescriptor, &isNull);
|
||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||
if (IsCitusTableType(relationId, citusTableType))
|
||||
|
||||
Datum partMethodDatum =
|
||||
heap_getattr(heapTuple, Anum_pg_dist_partition_partmethod,
|
||||
tupleDescriptor, &isNull);
|
||||
Datum replicationModelDatum =
|
||||
heap_getattr(heapTuple, Anum_pg_dist_partition_repmodel,
|
||||
tupleDescriptor, &isNull);
|
||||
|
||||
Oid partitionMethod = DatumGetChar(partMethodDatum);
|
||||
Oid replicationModel = DatumGetChar(replicationModelDatum);
|
||||
|
||||
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, citusTableType))
|
||||
{
|
||||
Datum relationIdDatum = heap_getattr(heapTuple,
|
||||
Anum_pg_dist_partition_logicalrelid,
|
||||
tupleDescriptor, &isNull);
|
||||
|
||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||
|
||||
relationIdList = lappend_oid(relationIdList, relationId);
|
||||
}
|
||||
|
||||
|
|
|
@ -1063,7 +1063,7 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
|
|||
|
||||
INSTR_TIME_SET_CURRENT(planStart);
|
||||
|
||||
PlannedStmt *plan = pg_plan_query_compat(query, NULL, 0, NULL);
|
||||
PlannedStmt *plan = pg_plan_query_compat(query, NULL, CURSOR_OPT_PARALLEL_OK, NULL);
|
||||
|
||||
INSTR_TIME_SET_CURRENT(planDuration);
|
||||
INSTR_TIME_SUBTRACT(planDuration, planStart);
|
||||
|
|
|
@ -20,6 +20,7 @@ typedef struct CitusScanState
|
|||
CustomScanState customScanState; /* underlying custom scan node */
|
||||
|
||||
/* function that gets called before postgres starts its execution */
|
||||
bool finishedPreScan; /* flag to check if the pre scan is finished */
|
||||
void (*PreExecScan)(struct CitusScanState *scanState);
|
||||
|
||||
DistributedPlan *distributedPlan; /* distributed execution plan */
|
||||
|
|
|
@ -178,6 +178,9 @@ typedef struct ConnectionHashEntry
|
|||
{
|
||||
ConnectionHashKey key;
|
||||
dlist_head *connections;
|
||||
|
||||
/* connections list is valid or not */
|
||||
bool isValid;
|
||||
} ConnectionHashEntry;
|
||||
|
||||
/* hash entry for cached connection parameters */
|
||||
|
|
|
@ -181,3 +181,6 @@ s/wrong data type: [0-9]+, expected [0-9]+/wrong data type: XXXX, expected XXXX/
|
|||
|
||||
# Errors with relation OID does not exist
|
||||
s/relation with OID [0-9]+ does not exist/relation with OID XXXX does not exist/g
|
||||
|
||||
# ignore DEBUG1 messages that Postgres generates
|
||||
/^DEBUG: rehashing catalog cache id [0-9]+$/d
|
||||
|
|
|
@ -0,0 +1,218 @@
|
|||
CREATE SCHEMA cursors;
|
||||
SET search_path TO cursors;
|
||||
CREATE TABLE distributed_table (key int, value text);
|
||||
SELECT create_distributed_table('distributed_table', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||
-- it sometime in the future, so be mindful
|
||||
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR
|
||||
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR
|
||||
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
-- pretty basic query with cursors
|
||||
-- Citus should plan/execute once and pull
|
||||
-- the results to coordinator, then serve it
|
||||
-- from the coordinator
|
||||
BEGIN;
|
||||
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||
simple_cursor_on_dist_table
|
||||
---------------------------------------------------------------------
|
||||
cursor_1
|
||||
(1 row)
|
||||
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 5 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
(5 rows)
|
||||
|
||||
FETCH 50 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
(5 rows)
|
||||
|
||||
FETCH ALL IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||
cursor_with_intermediate_result_on_dist_table
|
||||
---------------------------------------------------------------------
|
||||
cursor_1
|
||||
(1 row)
|
||||
|
||||
-- multiple FETCH commands should not trigger re-running the subplans
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 5 IN cursor_1;
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
(5 rows)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
FETCH ALL IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
(4 rows)
|
||||
|
||||
FETCH 5 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
COMMIT;
|
||||
BEGIN;
|
||||
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||
cursor_with_intermediate_result_on_dist_table_with_param
|
||||
---------------------------------------------------------------------
|
||||
cursor_1
|
||||
(1 row)
|
||||
|
||||
-- multiple FETCH commands should not trigger re-running the subplans
|
||||
-- also test with parameters
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 1 IN cursor_1;
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
2
|
||||
(1 row)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
4
|
||||
(1 row)
|
||||
|
||||
FETCH 1 IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
5
|
||||
(1 row)
|
||||
|
||||
FETCH ALL IN cursor_1;
|
||||
key
|
||||
---------------------------------------------------------------------
|
||||
6
|
||||
7
|
||||
8
|
||||
9
|
||||
(4 rows)
|
||||
|
||||
COMMIT;
|
||||
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||
BEGIN
|
||||
return query
|
||||
WITH cte AS
|
||||
(SELECT dt.value
|
||||
FROM distributed_table dt
|
||||
WHERE dt.value in
|
||||
(SELECT value
|
||||
FROM distributed_table p
|
||||
GROUP BY p.value
|
||||
HAVING count(*) > 0))
|
||||
|
||||
SELECT * FROM cte;
|
||||
END;
|
||||
$function$ ;
|
||||
SET citus.log_intermediate_results TO ON;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
\set VERBOSITY terse
|
||||
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||
DEBUG: CTE cte is going to be inlined via distributed planning
|
||||
DEBUG: generating subplan XXX_1 for subquery SELECT value FROM cursors.distributed_table p GROUP BY value HAVING (count(*) OPERATOR(pg_catalog.>) 0)
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT value FROM (SELECT dt.value FROM cursors.distributed_table dt WHERE (dt.value OPERATOR(pg_catalog.=) ANY (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(value text)))) cte
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1001
|
||||
(1 row)
|
||||
|
||||
BEGIN;
|
||||
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1001
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
-- suppress NOTICEs
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA cursors CASCADE;
|
|
@ -1216,6 +1216,7 @@ ON CONFLICT(c1, c2, c3, c4, c5, c6)
|
|||
DO UPDATE SET
|
||||
cardinality = enriched.cardinality + excluded.cardinality,
|
||||
sum = enriched.sum + excluded.sum;
|
||||
DEBUG: rehashing catalog cache id 14 for pg_opclass; 17 tups, 8 buckets at character 224
|
||||
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
|
|
|
@ -458,7 +458,7 @@ DROP TABLE prev_objects, extension_diff;
|
|||
SHOW citus.version;
|
||||
citus.version
|
||||
---------------------------------------------------------------------
|
||||
9.5devel
|
||||
9.5.1
|
||||
(1 row)
|
||||
|
||||
-- ensure no objects were created outside pg_catalog
|
||||
|
|
|
@ -108,7 +108,7 @@ test: ch_bench_subquery_repartition
|
|||
test: multi_agg_type_conversion multi_count_type_conversion
|
||||
test: multi_partition_pruning single_hash_repartition_join
|
||||
test: multi_join_pruning multi_hash_pruning intermediate_result_pruning
|
||||
test: multi_null_minmax_value_pruning
|
||||
test: multi_null_minmax_value_pruning cursors
|
||||
test: multi_query_directory_cleanup
|
||||
test: multi_task_assignment_policy multi_cross_shard
|
||||
test: multi_utility_statements
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
CREATE SCHEMA cursors;
|
||||
SET search_path TO cursors;
|
||||
|
||||
CREATE TABLE distributed_table (key int, value text);
|
||||
SELECT create_distributed_table('distributed_table', 'key');
|
||||
|
||||
|
||||
-- load some data, but not very small amounts because RETURN QUERY in plpgsql
|
||||
-- hard codes the cursor fetch to 50 rows on PG 12, though they might increase
|
||||
-- it sometime in the future, so be mindful
|
||||
INSERT INTO distributed_table SELECT i % 10, i::text FROM generate_series(0, 1000) i;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION simple_cursor_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR SELECT DISTINCT key FROM distributed_table ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table(cursor_name refcursor) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR
|
||||
WITH cte_1 AS (SELECT * FROM distributed_table OFFSET 0)
|
||||
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION cursor_with_intermediate_result_on_dist_table_with_param(cursor_name refcursor, filter text) RETURNS refcursor AS '
|
||||
BEGIN
|
||||
OPEN $1 FOR
|
||||
WITH cte_1 AS (SELECT * FROM distributed_table WHERE value < $2 OFFSET 0)
|
||||
SELECT DISTINCT key FROM distributed_table WHERE value in (SELECT value FROM cte_1) ORDER BY 1;
|
||||
RETURN $1;
|
||||
END;
|
||||
' LANGUAGE plpgsql;
|
||||
|
||||
|
||||
-- pretty basic query with cursors
|
||||
-- Citus should plan/execute once and pull
|
||||
-- the results to coordinator, then serve it
|
||||
-- from the coordinator
|
||||
BEGIN;
|
||||
SELECT simple_cursor_on_dist_table('cursor_1');
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 5 IN cursor_1;
|
||||
FETCH 50 IN cursor_1;
|
||||
FETCH ALL IN cursor_1;
|
||||
COMMIT;
|
||||
|
||||
|
||||
BEGIN;
|
||||
SELECT cursor_with_intermediate_result_on_dist_table('cursor_1');
|
||||
|
||||
-- multiple FETCH commands should not trigger re-running the subplans
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 5 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH ALL IN cursor_1;
|
||||
FETCH 5 IN cursor_1;
|
||||
COMMIT;
|
||||
|
||||
|
||||
BEGIN;
|
||||
SELECT cursor_with_intermediate_result_on_dist_table_with_param('cursor_1', '600');
|
||||
|
||||
-- multiple FETCH commands should not trigger re-running the subplans
|
||||
-- also test with parameters
|
||||
SET LOCAL citus.log_intermediate_results TO ON;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH 1 IN cursor_1;
|
||||
FETCH ALL IN cursor_1;
|
||||
|
||||
COMMIT;
|
||||
|
||||
CREATE OR REPLACE FUNCTION value_counter() RETURNS TABLE(counter text) LANGUAGE PLPGSQL AS $function$
|
||||
BEGIN
|
||||
return query
|
||||
WITH cte AS
|
||||
(SELECT dt.value
|
||||
FROM distributed_table dt
|
||||
WHERE dt.value in
|
||||
(SELECT value
|
||||
FROM distributed_table p
|
||||
GROUP BY p.value
|
||||
HAVING count(*) > 0))
|
||||
|
||||
SELECT * FROM cte;
|
||||
END;
|
||||
$function$ ;
|
||||
|
||||
SET citus.log_intermediate_results TO ON;
|
||||
SET client_min_messages TO DEBUG1;
|
||||
\set VERBOSITY terse
|
||||
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||
BEGIN;
|
||||
SELECT count(*) from (SELECT value_counter()) as foo;
|
||||
COMMIT;
|
||||
|
||||
-- suppress NOTICEs
|
||||
SET client_min_messages TO ERROR;
|
||||
DROP SCHEMA cursors CASCADE;
|
Loading…
Reference in New Issue