Compare commits

...

16 Commits
main ... v9.0.2

Author SHA1 Message Date
Hanefi Onaldi b79743fd99
Bump version to 9.0.2 2020-03-06 16:12:16 +03:00
Hanefi Onaldi de6068b2c4
Update CHANGELOG for 9.0.2 2020-03-06 16:12:16 +03:00
Marco Slot 637c6b8b80 Fix create_distributed_function call in multi_mx_call test 2019-11-11 15:59:19 +01:00
Marco Slot 14848be5db Add tests for distributing functions with replication_model statement 2019-11-11 15:06:13 +01:00
Marco Slot 7fcbe92c59 Disallow distributed functions with distribution arguments unless replication_model is streaming 2019-11-11 14:52:57 +01:00
Marco Slot f110a9fbe7 Do not try to sync metadata on standby coordinator 2019-11-11 14:52:25 +01:00
Jelte Fennema 1c7e02280f Include fmgr.h, don't duplicate FunctionCallInfo typedef 2019-11-11 14:51:16 +01:00
Jelte Fennema 36248f9203 Better check for compiler flag capability
When adding a `-Wno-some-error` flag, instead of trying to check for
`-Wno-some-error` support this checks for `-Wsome-error` support.
This is done because currently there's the a problem when compiling
citus with `./configure CFLAGS=-Werror`. If there's a warning (which is
then converted to an error) in addition to this error, the output will contain
the following message with GCC 7:

```
cc1: error: unrecognized command line option ‘-Wno-gnu-variable-sized-type-not-at-end’ [-Werror]
```

This is because of the following behaviour of GCC in case of unknown warnings:

> When an unrecognized warning option is requested (e.g., -Wunknown-warning), GCC
> emits a diagnostic stating that the option is not recognized.  However, if the
> -Wno- form is used, the behavior is slightly different: no diagnostic is
> produced for -Wno-unknown-warning unless other diagnostics are being produced.
> This allows the use of new -Wno- options with old compilers, but if something
> goes wrong, the compiler warns that an unrecognized option is present.

By changing the check to `-Wsome-error`, the check will actually fail
when the warning is not supported. Instead of silently being ignored
when checking, but then coming up when another error happens.
2019-11-11 14:51:12 +01:00
Hanefi Onaldi a853a824a5
Bump citus to 9.0.1 2019-10-25 10:36:18 +03:00
Marco Slot 6204ebcee9
Fix run_command_on_colocated_placements tests
(cherry picked from commit b8c8fd4612)
2019-10-25 10:06:26 +03:00
Marco Slot 2f201a98bb
Revoke usage from the citus schema
(cherry picked from commit 04040e0a37)
2019-10-25 10:06:09 +03:00
Hanefi Onaldi 23c26e9cbf
Remove PG10 to PG11 upgrade test 2019-10-23 11:33:28 +03:00
Onder Kalaci f8417d184c Fix memory leak on ReceiveResults
It turns out that TupleDescGetAttInMetadata() allocates quite a lot
of memory. And, if the target list is long and there are too many rows
returning, the leak becomes appereant.

You can reproduce the issue wout the fix with the following commands:

```SQL

CREATE TABLE users_table (user_id int, time timestamp, value_1 int, value_2 int, value_3 float, value_4 bigint);
SELECT create_distributed_table('users_table', 'user_id');

insert into users_table SELECT i, now(), i, i, i, i FROM generate_series(0,99999)i;

-- load faster

-- 200,000
INSERT INTO users_table SELECT * FROM users_table;

-- 400,000
INSERT INTO users_table SELECT * FROM users_table;

-- 800,000
INSERT INTO users_table SELECT * FROM users_table;

-- 1,600,000
INSERT INTO users_table SELECT * FROM users_table;

-- 3,200,000
INSERT INTO users_table SELECT * FROM users_table;

-- 6,400,000
INSERT INTO users_table SELECT * FROM users_table;

-- 12,800,000
INSERT INTO users_table SELECT * FROM users_table;

-- making the target list entry wider speeds up the leak to show up
 select *,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,*,* FROM users_table ;

 ```
2019-10-22 17:37:19 +02:00
Onur Tirtir e760dd69ac Re-add runstatedir option to configure 2019-10-10 11:12:57 +03:00
Hanefi Onaldi 566283d202
Bump citus to 9.0.0
(cherry picked from commit 2a47cff7b7)
2019-10-09 14:01:56 +03:00
Hanefi Onaldi 9747ca40ae
Bump Citus version to 9.0.0 2019-10-07 21:00:01 +03:00
27 changed files with 431 additions and 180 deletions

View File

@ -140,17 +140,6 @@ jobs:
name: 'Install and Test (check-failure)'
command: 'chown -R circleci:circleci /home/circleci && install-and-test-ext check-failure'
test-10-11_check-pg-upgrade:
docker:
- image: 'citus/pgupgradetester:latest'
working_directory: /home/circleci/project
steps:
- attach_workspace:
at: .
- run:
name: 'Install and test postgres upgrade'
command: 'chown -R circleci:circleci /home/circleci && install-and-test-ext --target check-pg-upgrade --old-pg-version 10 --new-pg-version 11'
test-11-12_check-pg-upgrade:
docker:
- image: 'citus/pgupgradetester:latest'
@ -341,8 +330,6 @@ workflows:
- test-12_check-non-adaptive-isolation:
requires: [build]
- test-10-11_check-pg-upgrade:
requires: [build]
- test-11-12_check-pg-upgrade:
requires: [build]

View File

@ -1,3 +1,75 @@
### citus v9.0.2 (March 6, 2020) ###
* Fixes build errors on EL/OL 6 based distros
* Fixes a bug that caused maintenance daemon to fail on standby nodes
* Disallows distributed function creation when replication_model is `statement`
### citus v9.0.1 (October 25, 2019) ###
* Fixes a memory leak in the executor
* Revokes usage from the citus schema from public
### citus v9.0.0 (October 7, 2019) ###
* Adds support for PostgreSQL 12
* Adds UDFs to help with PostgreSQL upgrades
* Distributes types to worker nodes
* Introduces `create_distributed_function` UDF
* Introduces local query execution for Citus MX
* Implements infrastructure for routing `CALL` to MX workers
* Implements infrastructure for routing `SELECT function()` to MX workers
* Adds support for foreign key constraints between reference tables
* Adds a feature flag to turn off `CREATE TYPE` propagation
* Adds option `citus.single_shard_commit_protocol`
* Adds support for `EXPLAIN SUMMARY`
* Adds support for `GENERATE ALWAYS AS STORED`
* Adds support for `serial` and `smallserial` in MX mode
* Adds support for anon composite types on the target list in router queries
* Avoids race condition between `create_reference_table` & `master_add_node`
* Fixes a bug in schemas of distributed sequence definitions
* Fixes a bug that caused `run_command_on_colocated_placements` to fail
* Fixes a bug that leads to various issues when a connection is lost
* Fixes a schema leak on `CREATE INDEX` statement
* Fixes assert failure in bare `SELECT FROM reference table FOR UPDATE` in MX
* Makes `master_update_node` MX compatible
* Prevents `pg_dist_colocation` from multiple records for reference tables
* Prevents segfault in `worker_partition_protocol` edgecase
* Propagates `ALTER FUNCTION` statements for distributed functions
* Propagates `CREATE OR REPLACE FUNCTION` for distributed functions
* Propagates `REINDEX` on tables & indexes
* Provides a GUC to turn of the new dependency propagation functionality
* Uses 2PC in adaptive executor when dealing with replication factors above 1
### citus v8.3.2 (August 09, 2019) ###
* Fixes performance issues by skipping unnecessary relation access recordings

120
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 9.0devel.
# Generated by GNU Autoconf 2.69 for Citus 9.0.2.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -9,7 +9,7 @@
# This configure script is free software; the Free Software Foundation
# gives unlimited permission to copy, distribute and modify it.
#
# Copyright (c) 2012-2017, Citus Data, Inc.
# Copyright (c) Citus Data, Inc.
## -------------------- ##
## M4sh Initialization. ##
## -------------------- ##
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='9.0devel'
PACKAGE_STRING='Citus 9.0devel'
PACKAGE_VERSION='9.0.2'
PACKAGE_STRING='Citus 9.0.2'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -662,7 +662,6 @@ infodir
docdir
oldincludedir
includedir
runstatedir
localstatedir
sharedstatedir
sysconfdir
@ -739,7 +738,6 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -992,15 +990,6 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1138,7 +1127,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir runstatedir
libdir localedir mandir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@ -1251,7 +1240,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 9.0devel to adapt to many kinds of systems.
\`configure' configures Citus 9.0.2 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1291,7 +1280,6 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]
@ -1313,7 +1301,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 9.0devel:";;
short | recursive ) echo "Configuration of Citus 9.0.2:";;
esac
cat <<\_ACEOF
@ -1413,14 +1401,14 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 9.0devel
Citus configure 9.0.2
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
This configure script is free software; the Free Software Foundation
gives unlimited permission to copy, distribute and modify it.
Copyright (c) 2012-2017, Citus Data, Inc.
Copyright (c) Citus Data, Inc.
_ACEOF
exit
fi
@ -1896,7 +1884,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 9.0devel, which was
It was created by Citus $as_me 9.0.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -3639,7 +3627,11 @@ if ${citusac_cv_prog_cc_cflags__Wall+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wall"
flag=-Wall
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3674,7 +3666,11 @@ if ${citusac_cv_prog_cc_cflags__Wextra+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wextra"
flag=-Wextra
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3710,7 +3706,11 @@ if ${citusac_cv_prog_cc_cflags__Wno_unused_parameter+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wno-unused-parameter"
flag=-Wno-unused-parameter
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3745,7 +3745,11 @@ if ${citusac_cv_prog_cc_cflags__Wno_sign_compare+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wno-sign-compare"
flag=-Wno-sign-compare
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3780,7 +3784,11 @@ if ${citusac_cv_prog_cc_cflags__Wno_missing_field_initializers+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wno-missing-field-initializers"
flag=-Wno-missing-field-initializers
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3815,7 +3823,11 @@ if ${citusac_cv_prog_cc_cflags__Wno_clobbered+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wno-clobbered"
flag=-Wno-clobbered
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3850,7 +3862,11 @@ if ${citusac_cv_prog_cc_cflags__Wno_gnu_variable_sized_type_not_at_end+:} false;
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wno-gnu-variable-sized-type-not-at-end"
flag=-Wno-gnu-variable-sized-type-not-at-end
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3886,7 +3902,11 @@ if ${citusac_cv_prog_cc_cflags__Wdeclaration_after_statement+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wdeclaration-after-statement"
flag=-Wdeclaration-after-statement
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3921,7 +3941,11 @@ if ${citusac_cv_prog_cc_cflags__Wendif_labels+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wendif-labels"
flag=-Wendif-labels
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3956,7 +3980,11 @@ if ${citusac_cv_prog_cc_cflags__Wmissing_format_attribute+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wmissing-format-attribute"
flag=-Wmissing-format-attribute
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -3991,7 +4019,11 @@ if ${citusac_cv_prog_cc_cflags__Wmissing_declarations+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wmissing-declarations"
flag=-Wmissing-declarations
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -4026,7 +4058,11 @@ if ${citusac_cv_prog_cc_cflags__Wmissing_prototypes+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wmissing-prototypes"
flag=-Wmissing-prototypes
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -4061,7 +4097,11 @@ if ${citusac_cv_prog_cc_cflags__Wshadow+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Wshadow"
flag=-Wshadow
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -4096,7 +4136,11 @@ if ${citusac_cv_prog_cc_cflags__Werror_vla+:} false; then :
$as_echo_n "(cached) " >&6
else
citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS -Werror=vla"
flag=-Werror=vla
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
cat confdefs.h - <<_ACEOF >conftest.$ac_ext
@ -4788,7 +4832,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 9.0devel, which was
This file was extended by Citus $as_me 9.0.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -4850,7 +4894,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 9.0devel
Citus config.status 9.0.2
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,8 +5,8 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [9.0devel])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
AC_INIT([Citus], [9.0.2])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands
AC_PROG_SED
@ -136,7 +136,11 @@ AC_DEFUN([CITUSAC_PROG_CC_CFLAGS_OPT],
[define([Ac_cachevar], [AS_TR_SH([citusac_cv_prog_cc_cflags_$1])])dnl
AC_CACHE_CHECK([whether $CC supports $1], [Ac_cachevar],
[citusac_save_CFLAGS=$CFLAGS
CFLAGS="$citusac_save_CFLAGS $1"
flag=$1
case $flag in -Wno*)
flag=-W$(echo $flag | cut -c 6-)
esac
CFLAGS="$citusac_save_CFLAGS $flag"
ac_save_c_werror_flag=$ac_c_werror_flag
ac_c_werror_flag=yes
_AC_COMPILE_IFELSE([AC_LANG_PROGRAM()],

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '9.0-1'
default_version = '9.0-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -347,6 +347,8 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) == 0)
{
Oid colocatedTableId = InvalidOid;
/* check for default colocation group */
colocationId = ColocationId(ShardCount, ShardReplicationFactor,
distributionArgumentOid);
@ -361,6 +363,22 @@ GetFunctionColocationId(Oid functionOid, char *colocateWithTableName,
errhint("Provide a distributed table via \"colocate_with\" "
"option to create_distributed_function()")));
}
colocatedTableId = ColocatedTableId(colocationId);
if (colocatedTableId != InvalidOid)
{
EnsureFunctionCanBeColocatedWithTable(functionOid, distributionArgumentOid,
colocatedTableId);
}
else if (ReplicationModel == REPLICATION_MODEL_COORDINATOR)
{
/* streaming replication model is required for metadata syncing */
ereport(ERROR, (errmsg("cannot create a function with a distribution "
"argument when citus.replication_model is "
"'statement'"),
errhint("Set citus.replication_model to 'streaming' "
"before creating distributed tables")));
}
}
else
{
@ -417,7 +435,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
"with distributed tables that are created using "
"streaming replication model."),
errhint("When distributing tables make sure that "
"\"citus.replication_model\" is set to \"streaming\"")));
"citus.replication_model = 'streaming'")));
}
/*

View File

@ -262,6 +262,17 @@ typedef struct DistributedExecution
/* statistics on distributed execution */
DistributedExecutionStats *executionStats;
/*
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
*
* columnArray field is reset/calculated per row, so might be useless for other
* contexts. The benefit of keeping it here is to avoid allocating the array
* over and over again.
*/
AttInMetadata *attributeInputMetadata;
char **columnArray;
} DistributedExecution;
/*
@ -850,6 +861,19 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList, bool hasRetu
execution->connectionSetChanged = false;
execution->waitFlagsChanged = false;
/* allocate execution specific data once, on the ExecutorState memory context */
if (tupleDescriptor != NULL)
{
execution->attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
execution->columnArray =
(char **) palloc0(tupleDescriptor->natts * sizeof(char *));
}
else
{
execution->attributeInputMetadata = NULL;
execution->columnArray = NULL;
}
if (ShouldExecuteTasksLocally(taskList))
{
bool readOnlyPlan = !TaskListModifiesDatabase(modLevel, taskList);
@ -3016,23 +3040,27 @@ ReceiveResults(WorkerSession *session, bool storeRows)
DistributedExecution *execution = workerPool->distributedExecution;
DistributedExecutionStats *executionStats = execution->executionStats;
TupleDesc tupleDescriptor = execution->tupleDescriptor;
AttInMetadata *attributeInputMetadata = NULL;
AttInMetadata *attributeInputMetadata = execution->attributeInputMetadata;
uint32 expectedColumnCount = 0;
char **columnArray = NULL;
char **columnArray = execution->columnArray;
Tuplestorestate *tupleStore = execution->tupleStore;
MemoryContext ioContext = NULL;
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"ReceiveResults",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
if (tupleDescriptor != NULL)
{
attributeInputMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
expectedColumnCount = tupleDescriptor->natts;
columnArray = (char **) palloc0(expectedColumnCount * sizeof(char *));
}
/*
* We use this context while converting each row fetched from remote node
* into tuple. The context is reseted on every row, thus we create it at the
* start of the loop and reset on every iteration.
*/
ioContext = AllocSetContextCreate(CurrentMemoryContext,
"IoContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
while (!PQisBusy(connection->pgConn))
{
@ -3113,7 +3141,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
for (rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
HeapTuple heapTuple = NULL;
MemoryContext oldContext = NULL;
MemoryContext oldContextPerRow = NULL;
memset(columnArray, 0, columnCount * sizeof(char *));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
@ -3139,11 +3167,11 @@ ReceiveResults(WorkerSession *session, bool storeRows)
* protects us from any memory leaks that might be present in I/O functions
* called by BuildTupleFromCStrings.
*/
oldContext = MemoryContextSwitchTo(ioContext);
oldContextPerRow = MemoryContextSwitchTo(ioContext);
heapTuple = BuildTupleFromCStrings(attributeInputMetadata, columnArray);
MemoryContextSwitchTo(oldContext);
MemoryContextSwitchTo(oldContextPerRow);
tuplestore_puttuple(tupleStore, heapTuple);
MemoryContextReset(ioContext);
@ -3159,11 +3187,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
}
if (columnArray != NULL)
{
pfree(columnArray);
}
/* the context is local to the function, so not needed anymore */
MemoryContextDelete(ioContext);
return fetchDone;

View File

@ -46,6 +46,7 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "distributed/shard_pruning.h"
@ -148,7 +149,6 @@ typedef union \
} FunctionCall2InfoData;
#else
typedef FunctionCallInfoData FunctionCall2InfoData;
typedef FunctionCallInfoData *FunctionCallInfo;
#endif
/*

View File

@ -0,0 +1,10 @@
-- Using the citus schema is a bad idea since many environments use "citus"
-- as the main user and the "citus" schema then sits in front of the
-- search_path.
REVOKE USAGE ON SCHEMA citus FROM public;
-- redefine distributed_tables_colocated to avoid using citus schema
#include "udfs/distributed_tables_colocated/9.0-2.sql"
-- type was used in old version of distributed_tables_colocated
DROP TYPE citus.colocation_placement_type;

View File

@ -0,0 +1,22 @@
--
-- distributed_tables_colocated returns true if given tables are co-located, false otherwise.
-- The function checks shard definitions, matches shard placements for given tables.
--
CREATE OR REPLACE FUNCTION pg_catalog.distributed_tables_colocated(table1 regclass,
table2 regclass)
RETURNS bool
LANGUAGE plpgsql
AS $function$
DECLARE
table1_colocationid int;
table2_colocationid int;
BEGIN
SELECT colocationid INTO table1_colocationid
FROM pg_catalog.pg_dist_partition WHERE logicalrelid = table1;
SELECT colocationid INTO table2_colocationid
FROM pg_catalog.pg_dist_partition WHERE logicalrelid = table2;
RETURN table1_colocationid = table2_colocationid;
END;
$function$;

View File

@ -0,0 +1,22 @@
--
-- distributed_tables_colocated returns true if given tables are co-located, false otherwise.
-- The function checks shard definitions, matches shard placements for given tables.
--
CREATE OR REPLACE FUNCTION pg_catalog.distributed_tables_colocated(table1 regclass,
table2 regclass)
RETURNS bool
LANGUAGE plpgsql
AS $function$
DECLARE
table1_colocationid int;
table2_colocationid int;
BEGIN
SELECT colocationid INTO table1_colocationid
FROM pg_catalog.pg_dist_partition WHERE logicalrelid = table1;
SELECT colocationid INTO table2_colocationid
FROM pg_catalog.pg_dist_partition WHERE logicalrelid = table2;
RETURN table1_colocationid = table2_colocationid;
END;
$function$;

View File

@ -366,8 +366,9 @@ CitusMaintenanceDaemonMain(Datum main_arg)
}
#endif
if (MetadataSyncTriggeredCheckAndReset(myDbData) ||
GetCurrentTimestamp() >= nextMetadataSyncTime)
if (!RecoveryInProgress() &&
(MetadataSyncTriggeredCheckAndReset(myDbData) ||
GetCurrentTimestamp() >= nextMetadataSyncTime))
{
bool metadataSyncFailed = false;
int64 nextTimeout = 0;

View File

@ -55,6 +55,9 @@ CREATE TYPE dup_result AS (f1 int, f2 text);
CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
@ -73,9 +76,21 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
-- make sure to propagate ddl propagation after we have setup our functions, this will
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
-- use an unusual type to force a new colocation group
CREATE TABLE statement_table(id int2);
SET citus.replication_model TO 'statement';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('statement_table','id');
create_distributed_table
--------------------------
(1 row)
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
create_distributed_table
--------------------------
@ -120,7 +135,24 @@ select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'pr
f
(1 row)
SELECT create_distributed_function('dup(int)', '$1');
-- try to co-locate with a table that uses statement-based replication
SELECT create_distributed_function('increment(int2)', '$1');
ERROR: cannot colocate function "increment" and table "statement_table"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
ERROR: cannot colocate function "increment" and table "statement_table"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
BEGIN;
SET LOCAL citus.replication_model TO 'statement';
DROP TABLE statement_table;
SELECT create_distributed_function('increment(int2)', '$1');
ERROR: cannot create a function with a distribution argument when citus.replication_model is 'statement'
HINT: Set citus.replication_model to 'streaming' before creating distributed tables
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
create_distributed_function
-----------------------------
@ -133,7 +165,7 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY
localhost | 57638 | t | (42,"42 is text")
(2 rows)
SELECT create_distributed_function('add(int,int)', '$1');
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
create_distributed_function
-----------------------------
@ -455,7 +487,7 @@ SELECT create_distributed_table('replicated_table_func_test', 'a');
SELECT create_distributed_function('add_with_param_names(int, int)', '$1', colocate_with:='replicated_table_func_test');
ERROR: cannot colocate function "add_with_param_names" and table "replicated_table_func_test"
DETAIL: Citus currently only supports colocating function with distributed tables that are created using streaming replication model.
HINT: When distributing tables make sure that "citus.replication_model" is set to "streaming"
HINT: When distributing tables make sure that citus.replication_model = 'streaming'
-- a function can be colocated with a different distribution argument type
-- as long as there is a coercion path
SET citus.shard_replication_factor TO 1;
@ -607,9 +639,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
@ -621,9 +653,9 @@ SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition
(3 rows)
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port
DROP USER functionuser;
SELECT run_command_on_workers($$DROP USER functionuser;$$);

View File

@ -29,13 +29,15 @@ END;
$proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_function('raise_info(text)', '$1');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------

View File

@ -68,6 +68,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -193,6 +194,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -315,6 +317,7 @@ step s2-public-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -447,6 +450,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -574,6 +578,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -698,6 +703,7 @@ step s2-create-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -837,6 +843,7 @@ step s1-add-worker:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -986,6 +993,7 @@ step s3-use-schema:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
@ -1132,6 +1140,7 @@ step s3-create-schema2:
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
@ -1506,6 +1515,7 @@ step s2-create-type:
step s2-create-table-with-type:
CREATE TABLE t1 (a int, b tt1);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');

View File

@ -421,8 +421,8 @@ SELECT create_distributed_table('check_colocated', 'key', 'hash');
(1 row)
CREATE TABLE second_table (key int);
SET citus.shard_count TO 4;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
create_distributed_table
--------------------------
@ -433,15 +433,13 @@ SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_tab
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- even when the difference is in replication factor, an error is thrown
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
master_drop_all_shards
------------------------
4
(1 row)
SELECT master_create_worker_shards('second_table', 5, 1);
master_create_worker_shards
-----------------------------
DROP TABLE second_table;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 5;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
create_distributed_table
--------------------------
(1 row)
@ -449,15 +447,13 @@ SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_tab
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- when everything matches, the command is run!
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
master_drop_all_shards
------------------------
5
(1 row)
SELECT master_create_worker_shards('second_table', 5, 2);
master_create_worker_shards
-----------------------------
DROP TABLE second_table;
SET citus.shard_replication_factor TO 2;
SET citus.shard_count TO 5;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
create_distributed_table
--------------------------
(1 row)
@ -477,36 +473,6 @@ SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_tab
localhost | 57638 | 1240009 | 1240023 | t | 1
(10 rows)
-- when a placement is invalid considers the tables to not be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
ERROR: tables check_colocated and second_table are not co-located
-- when matching placement is also invalid, considers the tables to be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
nodename | nodeport | shardid1 | shardid2 | success | result
-----------+----------+----------+----------+---------+--------
localhost | 57638 | 1240005 | 1240019 | t | 1
localhost | 57637 | 1240006 | 1240020 | t | 1
localhost | 57638 | 1240006 | 1240020 | t | 1
localhost | 57637 | 1240007 | 1240021 | t | 1
localhost | 57638 | 1240007 | 1240021 | t | 1
localhost | 57637 | 1240008 | 1240022 | t | 1
localhost | 57638 | 1240008 | 1240022 | t | 1
localhost | 57637 | 1240009 | 1240023 | t | 1
localhost | 57638 | 1240009 | 1240023 | t | 1
(9 rows)
DROP TABLE check_colocated CASCADE;
DROP TABLE second_table CASCADE;
-- runs on all shards

View File

@ -113,7 +113,7 @@ ALTER EXTENSION citus UPDATE TO '9.0-1';
SHOW citus.version;
citus.version
---------------
9.0devel
9.0.2
(1 row)
-- ensure no objects were created outside pg_catalog

View File

@ -33,7 +33,7 @@ SELECT create_distributed_table('test', 'id');
(1 row)
CREATE TABLE test_coloc (id integer, val integer);
SELECT create_distributed_table('test_coloc', 'id', colocate_with := 'none');
SELECT create_distributed_table('test_coloc', 'id', colocate_with := 'test');
create_distributed_table
--------------------------
@ -128,6 +128,8 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO full_access;
GRANT ALL ON SCHEMA full_access_user_schema TO full_access;
GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
@ -487,18 +489,25 @@ SELECT wait_until_metadata_sync();
(1 row)
-- now, make sure that the user can use the function
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
create_distributed_table
--------------------------
(1 row)
-- now, make sure that the user can use the function
-- created in the transaction
BEGIN;
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
LANGUAGE plpgsql AS 'begin return current_user; end;';
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------
(1 row)
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
usage_access_func_second
--------------------------
usage_access
@ -517,7 +526,7 @@ SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
(1 row)
-- superuser creates the distributed function that is owned by a regular user
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
create_distributed_function
-----------------------------
@ -683,7 +692,7 @@ ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_0000
-- different user should not be able to fetch partition file
SET ROLE usage_access;
SELECT worker_fetch_partition_file(42, 1, 1, 1, 'localhost', :worker_1_port);
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.17996": No such file or directory
WARNING: could not open file "base/pgsql_job_cache/job_0042/task_000001/p_00001.37457": No such file or directory
CONTEXT: while executing command on localhost:57637
ERROR: could not receive file "base/pgsql_job_cache/job_0042/task_000001/p_00001" from localhost:57637
-- only the user whom created the files should be able to fetch
@ -722,7 +731,7 @@ RESET ROLE;
-- test that the super user is unable to read the contents of the intermediate file,
-- although it does create the table
SELECT worker_merge_files_into_table(42, 1, ARRAY['a'], ARRAY['integer']);
WARNING: Task file "task_000001.17982" does not have expected suffix ".10"
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
worker_merge_files_into_table
-------------------------------
@ -764,7 +773,7 @@ SELECT worker_merge_files_and_run_query(42, 1,
'CREATE TABLE task_000001_merge(merge_column_0 int)',
'CREATE TABLE task_000001 (a) AS SELECT sum(merge_column_0) FROM task_000001_merge'
);
WARNING: Task file "task_000001.17982" does not have expected suffix ".10"
WARNING: Task file "task_000001.36145" does not have expected suffix ".10"
worker_merge_files_and_run_query
----------------------------------
@ -834,7 +843,8 @@ DROP TABLE
my_role_table_with_data,
singleshard,
test,
test_coloc;
test_coloc,
colocation_table;
DROP USER full_access;
DROP USER read_access;
DROP USER no_access;

View File

@ -398,7 +398,7 @@ SET client_min_messages TO DEBUG1;
--
CREATE FUNCTION mx_call_add(int, int) RETURNS int
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
SELECT create_distributed_function('mx_call_add(int,int)', '$1', 'mx_call_dist_table_1');
DEBUG: switching to sequential query execution mode
DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands
create_distributed_function

View File

@ -498,6 +498,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
SET citus.replication_model = 'streaming';
--
-- Test non-const parameter values
--

View File

@ -91,6 +91,7 @@ step "s2-create-table"
{
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
}
@ -104,6 +105,7 @@ step "s2-create-table-with-type"
{
CREATE TABLE t1 (a int, b tt1);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
}

View File

@ -55,6 +55,10 @@ CREATE FUNCTION dup(int) RETURNS dup_result
AS $$ SELECT $1, CAST($1 AS text) || ' is text' $$
LANGUAGE SQL;
CREATE FUNCTION increment(int2) RETURNS int
AS $$ SELECT $1 + 1$$
LANGUAGE SQL;
CREATE FUNCTION add_with_param_names(val1 integer, val2 integer) RETURNS integer
AS 'select $1 + $2;'
LANGUAGE SQL
@ -77,9 +81,17 @@ CREATE FUNCTION add_mixed_param_names(integer, val1 integer) RETURNS integer
-- allow alter statements to be propagated and keep the functions in sync across machines
SET citus.enable_ddl_propagation TO on;
-- functions are distributed by int arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id int);
SELECT create_distributed_table('colocation_table','id');
-- use an unusual type to force a new colocation group
CREATE TABLE statement_table(id int2);
SET citus.replication_model TO 'statement';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('statement_table','id');
-- create a table uses streaming-based replication (can be synced)
CREATE TABLE streaming_table(id int);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('streaming_table','id');
-- make sure that none of the active and primary nodes hasmetadata
-- at the start of the test
@ -98,10 +110,20 @@ SELECT * FROM run_command_on_workers('SELECT function_tests.add_mixed_param_name
-- since the function doesn't have a parameter
select bool_or(hasmetadata) from pg_dist_node WHERE isactive AND noderole = 'primary';
SELECT create_distributed_function('dup(int)', '$1');
-- try to co-locate with a table that uses statement-based replication
SELECT create_distributed_function('increment(int2)', '$1');
SELECT create_distributed_function('increment(int2)', '$1', colocate_with := 'statement_table');
BEGIN;
SET LOCAL citus.replication_model TO 'statement';
DROP TABLE statement_table;
SELECT create_distributed_function('increment(int2)', '$1');
END;
-- try to co-locate with a table that uses streaming replication
SELECT create_distributed_function('dup(int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.dup(42);') ORDER BY 1,2;
SELECT create_distributed_function('add(int,int)', '$1');
SELECT create_distributed_function('add(int,int)', '$1', colocate_with := 'streaming_table');
SELECT * FROM run_command_on_workers('SELECT function_tests.add(2,3);') ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('function_tests.add(int,int)');
@ -318,17 +340,17 @@ DROP SCHEMA function_tests2 CASCADE;
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :worker_2_port
SET client_min_messages TO error; -- suppress cascading objects dropping
UPDATE pg_dist_local_group SET groupid = 0;
SELECT worker_drop_distributed_table(logicalrelid::text) FROM pg_dist_partition WHERE logicalrelid::text ILIKE '%replicated_table_func_test%';
TRUNCATE pg_dist_node;
DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE;
TRUNCATE pg_dist_node;
\c - - - :master_port

View File

@ -27,9 +27,11 @@ $proc$;
-- procedures are distributed by text arguments, when run in isolation it is not guaranteed a table actually exists.
CREATE TABLE colocation_table(id text);
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('colocation_table','id');
SELECT create_distributed_function('raise_info(text)', '$1');
SELECT create_distributed_function('raise_info(text)', '$1', colocate_with := 'colocation_table');
SELECT * FROM run_command_on_workers($$CALL procedure_tests.raise_info('hello');$$) ORDER BY 1,2;
SELECT public.verify_function_is_same_on_workers('procedure_tests.raise_info(text)');

View File

@ -220,38 +220,30 @@ DROP TABLE check_placements CASCADE;
-- make sure run_on_all_colocated_placements correctly detects colocation
CREATE TABLE check_colocated (key int);
SELECT create_distributed_table('check_colocated', 'key', 'hash');
CREATE TABLE second_table (key int);
SET citus.shard_count TO 4;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- even when the difference is in replication factor, an error is thrown
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
SELECT master_create_worker_shards('second_table', 5, 1);
DROP TABLE second_table;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 5;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when everything matches, the command is run!
SELECT master_drop_all_shards('second_table'::regclass, current_schema(), 'second_table');
SELECT master_create_worker_shards('second_table', 5, 2);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when a placement is invalid considers the tables to not be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'second_table'::regclass
ORDER BY 1 ASC LIMIT 1
);
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
-- when matching placement is also invalid, considers the tables to be colocated
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = (
SELECT shardid FROM pg_dist_shard
WHERE nodeport = :worker_1_port AND logicalrelid = 'check_colocated'::regclass
ORDER BY 1 ASC LIMIT 1
);
DROP TABLE second_table;
SET citus.shard_replication_factor TO 2;
SET citus.shard_count TO 5;
CREATE TABLE second_table (key int);
SELECT create_distributed_table('second_table', 'key', 'hash');
SELECT * FROM run_command_on_colocated_placements('check_colocated', 'second_table',
'select 1');
DROP TABLE check_colocated CASCADE;
DROP TABLE second_table CASCADE;

View File

@ -25,7 +25,7 @@ CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
CREATE TABLE test_coloc (id integer, val integer);
SELECT create_distributed_table('test_coloc', 'id', colocate_with := 'none');
SELECT create_distributed_table('test_coloc', 'id', colocate_with := 'test');
SET citus.shard_count TO 1;
CREATE TABLE singleshard (id integer, val integer);
@ -96,6 +96,9 @@ GRANT USAGE ON SCHEMA full_access_user_schema TO usage_access;
\c - - - :master_port
SET citus.replication_model TO 'streaming';
SET citus.shard_replication_factor TO 1;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
@ -300,12 +303,15 @@ SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE pron
SELECT wait_until_metadata_sync();
-- now, make sure that the user can use the function
CREATE TABLE colocation_table(id text);
SELECT create_distributed_table('colocation_table','id');
-- now, make sure that the user can use the function
-- created in the transaction
BEGIN;
CREATE FUNCTION usage_access_func_second(key int, variadic v int[]) RETURNS text
LANGUAGE plpgsql AS 'begin return current_user; end;';
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_second(int,int[])', '$1', colocate_with := 'colocation_table');
SELECT usage_access_func_second(1, 2,3,4,5) FROM full_access_user_schema.t1 LIMIT 1;
@ -321,7 +327,7 @@ CREATE FUNCTION usage_access_func_third(key int, variadic v int[]) RETURNS text
SELECT usesuper FROM pg_user where usename IN (SELECT current_user);
-- superuser creates the distributed function that is owned by a regular user
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1');
SELECT create_distributed_function('usage_access_func_third(int,int[])', '$1', colocate_with := 'colocation_table');
SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third';
SELECT run_command_on_workers($$SELECT proowner::regrole FROM pg_proc WHERE proname = 'usage_access_func_third'$$);
@ -503,7 +509,8 @@ DROP TABLE
my_role_table_with_data,
singleshard,
test,
test_coloc;
test_coloc,
colocation_table;
DROP USER full_access;
DROP USER read_access;
DROP USER no_access;

View File

@ -167,7 +167,7 @@ SET client_min_messages TO DEBUG1;
--
CREATE FUNCTION mx_call_add(int, int) RETURNS int
AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE;
SELECT create_distributed_function('mx_call_add(int,int)', '$1');
SELECT create_distributed_function('mx_call_add(int,int)', '$1', 'mx_call_dist_table_1');
-- non-const distribution parameters cannot be pushed down
call multi_mx_call.mx_call_proc(2, mx_call_add(3, 4));

View File

@ -211,6 +211,7 @@ select start_metadata_sync_to_node('localhost', :worker_2_port);
\c - - - :master_port
SET search_path to multi_mx_function_call_delegation, public;
SET client_min_messages TO DEBUG1;
SET citus.replication_model = 'streaming';
--
-- Test non-const parameter values