mirror of https://github.com/citusdata/citus.git
Compare commits
24 Commits
Author | SHA1 | Date |
---|---|---|
|
20f84f2396 | |
|
32b8235da3 | |
|
16bb2c618f | |
|
54a893523e | |
|
57d51b280e | |
|
ba00e930ea | |
|
c5371965f6 | |
|
1e3969cae1 | |
|
0fa6049cce | |
|
2a1ae6489b | |
|
507964dde3 | |
|
2de7b85b89 | |
|
0b0c0fef25 | |
|
606e2b18d7 | |
|
67f058c5f6 | |
|
4392cc2f9c | |
|
ca8a4dc735 | |
|
da0b98c991 | |
|
480797e600 | |
|
2311a3614a | |
|
c22cbb7a13 | |
|
5d805dba27 | |
|
b08106b5cf | |
|
87817aec9d |
|
@ -22,7 +22,7 @@ matrix:
|
||||||
- env: PGVERSION=10
|
- env: PGVERSION=10
|
||||||
- env: PGVERSION=11
|
- env: PGVERSION=11
|
||||||
allow_failures:
|
allow_failures:
|
||||||
- env: PGVERSION=11
|
- env: PGVERSION=9.6
|
||||||
before_install:
|
before_install:
|
||||||
- git clone -b v0.7.9 --depth 1 https://github.com/citusdata/tools.git
|
- git clone -b v0.7.9 --depth 1 https://github.com/citusdata/tools.git
|
||||||
- sudo make -C tools install
|
- sudo make -C tools install
|
||||||
|
|
82
CHANGELOG.md
82
CHANGELOG.md
|
@ -1,3 +1,85 @@
|
||||||
|
### citus v8.0.3 (January 9, 2019) ###
|
||||||
|
|
||||||
|
* Fixes maintenance daemon panic due to unreleased spinlock
|
||||||
|
|
||||||
|
* Fixes an issue with having clause when used with complex joins
|
||||||
|
|
||||||
|
### citus v8.0.2 (December 13, 2018) ###
|
||||||
|
|
||||||
|
* Fixes a bug that could cause maintenance daemon panic
|
||||||
|
|
||||||
|
* Fixes crashes caused by stack size increase under high memory load
|
||||||
|
|
||||||
|
### citus v7.5.4 (December 11, 2018) ###
|
||||||
|
|
||||||
|
* Fixes a bug that could cause maintenance daemon panic
|
||||||
|
|
||||||
|
### citus v8.0.1 (November 27, 2018) ###
|
||||||
|
|
||||||
|
* Execute SQL tasks using worker_execute_sql_task UDF when using task-tracker
|
||||||
|
|
||||||
|
### citus v7.5.3 (November 27, 2018) ###
|
||||||
|
|
||||||
|
* Execute SQL tasks using worker_execute_sql_task UDF when using task-tracker
|
||||||
|
|
||||||
|
### citus v7.5.2 (November 14, 2018) ###
|
||||||
|
|
||||||
|
* Fixes inconsistent metadata error when shard metadata caching get interrupted
|
||||||
|
|
||||||
|
* Fixes a bug that could cause memory leak
|
||||||
|
|
||||||
|
* Fixes a bug that prevents recovering wrong transactions in MX
|
||||||
|
|
||||||
|
* Fixes a bug to prevent wrong memory accesses on Citus MX under very high load
|
||||||
|
|
||||||
|
* Fixes crashes caused by stack size increase under high memory load
|
||||||
|
|
||||||
|
### citus v8.0.0 (October 31, 2018) ###
|
||||||
|
|
||||||
|
* Adds support for PostgreSQL 11
|
||||||
|
|
||||||
|
* Adds support for applying DML operations on reference tables from MX nodes
|
||||||
|
|
||||||
|
* Adds distributed locking to truncated MX tables
|
||||||
|
|
||||||
|
* Adds support for running TRUNCATE command from MX worker nodes
|
||||||
|
|
||||||
|
* Adds views to provide insight about the distributed transactions
|
||||||
|
|
||||||
|
* Adds support for TABLESAMPLE in router queries
|
||||||
|
|
||||||
|
* Adds support for INCLUDE option in index creation
|
||||||
|
|
||||||
|
* Adds option to allow simple DML commands from hot standby
|
||||||
|
|
||||||
|
* Adds support for partitioned tables with replication factor > 1
|
||||||
|
|
||||||
|
* Prevents a deadlock on concurrent DROP TABLE and SELECT on Citus MX
|
||||||
|
|
||||||
|
* Fixes a bug that prevents recovering wrong transactions in MX
|
||||||
|
|
||||||
|
* Fixes a bug to prevent wrong memory accesses on Citus MX under very high load
|
||||||
|
|
||||||
|
* Fixes a bug in MX mode, calling DROP SCHEMA with existing partitioned table
|
||||||
|
|
||||||
|
* Fixes a bug that could cause modifying CTEs to select wrong execution mode
|
||||||
|
|
||||||
|
* Fixes a bug preventing rollback in CREATE PROCEDURE
|
||||||
|
|
||||||
|
* Fixes a bug on not being able to drop index on a partitioned table
|
||||||
|
|
||||||
|
* Fixes a bug on TRUNCATE when there is a foreign key to a reference table
|
||||||
|
|
||||||
|
* Fixes a performance issue in prepared INSERT..SELECT
|
||||||
|
|
||||||
|
* Fixes a bug which causes errors on DROP DATABASE IF EXISTS
|
||||||
|
|
||||||
|
* Fixes a bug to remove intermediate result directory in pull-push execution
|
||||||
|
|
||||||
|
* Improves query pushdown planning performance
|
||||||
|
|
||||||
|
* Evaluate functions anywhere in query
|
||||||
|
|
||||||
### citus v7.5.1 (August 28, 2018) ###
|
### citus v7.5.1 (August 28, 2018) ###
|
||||||
|
|
||||||
* Improves query pushdown planning performance
|
* Improves query pushdown planning performance
|
||||||
|
|
1
Makefile
1
Makefile
|
@ -116,6 +116,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
|
||||||
src/backend/distributed/worker/worker_file_access_protocol.o \
|
src/backend/distributed/worker/worker_file_access_protocol.o \
|
||||||
src/backend/distributed/worker/worker_merge_protocol.o \
|
src/backend/distributed/worker/worker_merge_protocol.o \
|
||||||
src/backend/distributed/worker/worker_partition_protocol.o \
|
src/backend/distributed/worker/worker_partition_protocol.o \
|
||||||
|
src/backend/distributed/worker/worker_sql_task_protocol.o \
|
||||||
src/backend/distributed/worker/worker_truncate_trigger_protocol.o \
|
src/backend/distributed/worker/worker_truncate_trigger_protocol.o \
|
||||||
$(WIN32RES)
|
$(WIN32RES)
|
||||||
|
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# Guess values for system-dependent variables and create Makefiles.
|
# Guess values for system-dependent variables and create Makefiles.
|
||||||
# Generated by GNU Autoconf 2.69 for Citus 8.0devel.
|
# Generated by GNU Autoconf 2.69 for Citus 8.0.3.
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||||
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
||||||
# Identity of this package.
|
# Identity of this package.
|
||||||
PACKAGE_NAME='Citus'
|
PACKAGE_NAME='Citus'
|
||||||
PACKAGE_TARNAME='citus'
|
PACKAGE_TARNAME='citus'
|
||||||
PACKAGE_VERSION='8.0devel'
|
PACKAGE_VERSION='8.0.3'
|
||||||
PACKAGE_STRING='Citus 8.0devel'
|
PACKAGE_STRING='Citus 8.0.3'
|
||||||
PACKAGE_BUGREPORT=''
|
PACKAGE_BUGREPORT=''
|
||||||
PACKAGE_URL=''
|
PACKAGE_URL=''
|
||||||
|
|
||||||
|
@ -1239,7 +1239,7 @@ if test "$ac_init_help" = "long"; then
|
||||||
# Omit some internal or obsolete options to make the list less imposing.
|
# Omit some internal or obsolete options to make the list less imposing.
|
||||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||||
cat <<_ACEOF
|
cat <<_ACEOF
|
||||||
\`configure' configures Citus 8.0devel to adapt to many kinds of systems.
|
\`configure' configures Citus 8.0.3 to adapt to many kinds of systems.
|
||||||
|
|
||||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
|
|
||||||
|
@ -1300,7 +1300,7 @@ fi
|
||||||
|
|
||||||
if test -n "$ac_init_help"; then
|
if test -n "$ac_init_help"; then
|
||||||
case $ac_init_help in
|
case $ac_init_help in
|
||||||
short | recursive ) echo "Configuration of Citus 8.0devel:";;
|
short | recursive ) echo "Configuration of Citus 8.0.3:";;
|
||||||
esac
|
esac
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
|
|
||||||
|
@ -1400,7 +1400,7 @@ fi
|
||||||
test -n "$ac_init_help" && exit $ac_status
|
test -n "$ac_init_help" && exit $ac_status
|
||||||
if $ac_init_version; then
|
if $ac_init_version; then
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
Citus configure 8.0devel
|
Citus configure 8.0.3
|
||||||
generated by GNU Autoconf 2.69
|
generated by GNU Autoconf 2.69
|
||||||
|
|
||||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||||
|
@ -1883,7 +1883,7 @@ cat >config.log <<_ACEOF
|
||||||
This file contains any messages produced by compilers while
|
This file contains any messages produced by compilers while
|
||||||
running configure, to aid debugging if configure makes a mistake.
|
running configure, to aid debugging if configure makes a mistake.
|
||||||
|
|
||||||
It was created by Citus $as_me 8.0devel, which was
|
It was created by Citus $as_me 8.0.3, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
$ $0 $@
|
$ $0 $@
|
||||||
|
@ -4701,7 +4701,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
||||||
# report actual input values of CONFIG_FILES etc. instead of their
|
# report actual input values of CONFIG_FILES etc. instead of their
|
||||||
# values after options handling.
|
# values after options handling.
|
||||||
ac_log="
|
ac_log="
|
||||||
This file was extended by Citus $as_me 8.0devel, which was
|
This file was extended by Citus $as_me 8.0.3, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
CONFIG_FILES = $CONFIG_FILES
|
CONFIG_FILES = $CONFIG_FILES
|
||||||
|
@ -4763,7 +4763,7 @@ _ACEOF
|
||||||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||||
ac_cs_version="\\
|
ac_cs_version="\\
|
||||||
Citus config.status 8.0devel
|
Citus config.status 8.0.3
|
||||||
configured by $0, generated by GNU Autoconf 2.69,
|
configured by $0, generated by GNU Autoconf 2.69,
|
||||||
with options \\"\$ac_cs_config\\"
|
with options \\"\$ac_cs_config\\"
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,7 @@
|
||||||
# everyone needing autoconf installed, the resulting files are checked
|
# everyone needing autoconf installed, the resulting files are checked
|
||||||
# into the SCM.
|
# into the SCM.
|
||||||
|
|
||||||
AC_INIT([Citus], [8.0devel])
|
AC_INIT([Citus], [8.0.3])
|
||||||
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
|
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
|
||||||
|
|
||||||
# we'll need sed and awk for some of the version commands
|
# we'll need sed and awk for some of the version commands
|
||||||
|
|
|
@ -338,6 +338,18 @@ StubRelation(TupleDesc tupleDescriptor)
|
||||||
void
|
void
|
||||||
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
|
||||||
DestReceiver *dest)
|
DestReceiver *dest)
|
||||||
|
{
|
||||||
|
Query *query = ParseQueryString(queryString);
|
||||||
|
|
||||||
|
ExecuteQueryIntoDestReceiver(query, params, dest);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ParseQuery parses query string and returns a Query struct.
|
||||||
|
*/
|
||||||
|
Query *
|
||||||
|
ParseQueryString(const char *queryString)
|
||||||
{
|
{
|
||||||
Query *query = NULL;
|
Query *query = NULL;
|
||||||
|
|
||||||
|
@ -356,7 +368,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params
|
||||||
|
|
||||||
query = (Query *) linitial(queryTreeList);
|
query = (Query *) linitial(queryTreeList);
|
||||||
|
|
||||||
ExecuteQueryIntoDestReceiver(query, params, dest);
|
return query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1351,6 +1351,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
LockPartitionRelations(firstShardInterval->relationId, RowExclusiveLock);
|
LockPartitionRelations(firstShardInterval->relationId, RowExclusiveLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Assign the distributed transaction id before trying to acquire the
|
||||||
|
* executor advisory locks. This is useful to show this backend in citus
|
||||||
|
* lock graphs (e.g., dump_global_wait_edges() and citus_lock_waits).
|
||||||
|
*/
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Ensure that there are no concurrent modifications on the same
|
* Ensure that there are no concurrent modifications on the same
|
||||||
* shards. In general, for DDL commands, we already obtained the
|
* shards. In general, for DDL commands, we already obtained the
|
||||||
|
@ -1362,8 +1369,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
|
||||||
*/
|
*/
|
||||||
AcquireExecutorMultiShardLocks(taskList);
|
AcquireExecutorMultiShardLocks(taskList);
|
||||||
|
|
||||||
BeginOrContinueCoordinatedTransaction();
|
|
||||||
|
|
||||||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
|
||||||
firstTask->replicationModel == REPLICATION_MODEL_2PC)
|
firstTask->replicationModel == REPLICATION_MODEL_2PC)
|
||||||
{
|
{
|
||||||
|
|
|
@ -117,10 +117,11 @@ static bool IsCitusExtensionStmt(Node *parsetree);
|
||||||
static bool IsTransmitStmt(Node *parsetree);
|
static bool IsTransmitStmt(Node *parsetree);
|
||||||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
static bool IsCopyResultStmt(CopyStmt *copyStatement);
|
static bool IsCopyResultStmt(CopyStmt *copyStatement);
|
||||||
|
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
|
||||||
|
|
||||||
/* Local functions forward declarations for processing distributed table commands */
|
/* Local functions forward declarations for processing distributed table commands */
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||||
bool *commandMustRunAsOwner);
|
const char *queryString);
|
||||||
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
|
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
|
||||||
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
|
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
|
||||||
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
|
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
|
||||||
|
@ -176,7 +177,6 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||||
const char *commandString);
|
const char *commandString);
|
||||||
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
||||||
void *arg);
|
void *arg);
|
||||||
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
|
||||||
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
|
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
|
||||||
static void PostProcessUtility(Node *parsetree);
|
static void PostProcessUtility(Node *parsetree);
|
||||||
static List * CollectGrantTableIdList(GrantStmt *grantStmt);
|
static List * CollectGrantTableIdList(GrantStmt *grantStmt);
|
||||||
|
@ -257,9 +257,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
char *completionTag)
|
char *completionTag)
|
||||||
{
|
{
|
||||||
Node *parsetree = pstmt->utilityStmt;
|
Node *parsetree = pstmt->utilityStmt;
|
||||||
bool commandMustRunAsOwner = false;
|
|
||||||
Oid savedUserId = InvalidOid;
|
|
||||||
int savedSecurityContext = 0;
|
|
||||||
List *ddlJobs = NIL;
|
List *ddlJobs = NIL;
|
||||||
bool checkExtensionVersion = false;
|
bool checkExtensionVersion = false;
|
||||||
|
|
||||||
|
@ -369,7 +366,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
parsetree = copyObject(parsetree);
|
parsetree = copyObject(parsetree);
|
||||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
|
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
|
||||||
&commandMustRunAsOwner);
|
queryString);
|
||||||
|
|
||||||
previousContext = MemoryContextSwitchTo(planContext);
|
previousContext = MemoryContextSwitchTo(planContext);
|
||||||
parsetree = copyObject(parsetree);
|
parsetree = copyObject(parsetree);
|
||||||
|
@ -549,18 +546,15 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
*/
|
*/
|
||||||
if (IsA(parsetree, DropdbStmt))
|
if (IsA(parsetree, DropdbStmt))
|
||||||
{
|
{
|
||||||
|
const bool missingOK = true;
|
||||||
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
|
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
|
||||||
char *dbname = dropDbStatement->dbname;
|
char *dbname = dropDbStatement->dbname;
|
||||||
Oid databaseOid = get_database_oid(dbname, false);
|
Oid databaseOid = get_database_oid(dbname, missingOK);
|
||||||
|
|
||||||
|
if (databaseOid != InvalidOid)
|
||||||
|
{
|
||||||
StopMaintenanceDaemon(databaseOid);
|
StopMaintenanceDaemon(databaseOid);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* set user if needed and go ahead and run local utility using standard hook */
|
|
||||||
if (commandMustRunAsOwner)
|
|
||||||
{
|
|
||||||
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
|
||||||
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
@ -601,11 +595,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
PostProcessUtility(parsetree);
|
PostProcessUtility(parsetree);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (commandMustRunAsOwner)
|
|
||||||
{
|
|
||||||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Re-forming the foreign key graph relies on the command being executed
|
* Re-forming the foreign key graph relies on the command being executed
|
||||||
* on the local table first. However, in order to decide whether the
|
* on the local table first. However, in order to decide whether the
|
||||||
|
@ -967,9 +956,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
IsCopyResultStmt(CopyStmt *copyStatement)
|
IsCopyResultStmt(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
return CopyStatementHasFormat(copyStatement, "result");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyStatementHasFormat checks whether the COPY statement has the given
|
||||||
|
* format.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
|
||||||
{
|
{
|
||||||
ListCell *optionCell = NULL;
|
ListCell *optionCell = NULL;
|
||||||
bool hasFormatReceive = false;
|
bool hasFormat = false;
|
||||||
|
|
||||||
/* extract WITH (...) options from the COPY statement */
|
/* extract WITH (...) options from the COPY statement */
|
||||||
foreach(optionCell, copyStatement->options)
|
foreach(optionCell, copyStatement->options)
|
||||||
|
@ -977,14 +977,14 @@ IsCopyResultStmt(CopyStmt *copyStatement)
|
||||||
DefElem *defel = (DefElem *) lfirst(optionCell);
|
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||||
|
|
||||||
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
|
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
|
||||||
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
|
strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0)
|
||||||
{
|
{
|
||||||
hasFormatReceive = true;
|
hasFormat = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return hasFormatReceive;
|
return hasFormat;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -993,18 +993,10 @@ IsCopyResultStmt(CopyStmt *copyStatement)
|
||||||
* COPYing from distributed tables and preventing unsupported actions. The
|
* COPYing from distributed tables and preventing unsupported actions. The
|
||||||
* function returns a modified COPY statement to be executed, or NULL if no
|
* function returns a modified COPY statement to be executed, or NULL if no
|
||||||
* further processing is needed.
|
* further processing is needed.
|
||||||
*
|
|
||||||
* commandMustRunAsOwner is an output parameter used to communicate to the caller whether
|
|
||||||
* the copy statement should be executed using elevated privileges. If
|
|
||||||
* ProcessCopyStmt that is required, a call to CheckCopyPermissions will take
|
|
||||||
* care of verifying the current user's permissions before ProcessCopyStmt
|
|
||||||
* returns.
|
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner)
|
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString)
|
||||||
{
|
{
|
||||||
*commandMustRunAsOwner = false; /* make sure variable is initialized */
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
|
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
|
||||||
* for sending intermediate results to workers.
|
* for sending intermediate results to workers.
|
||||||
|
@ -1110,49 +1102,47 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (copyStatement->filename != NULL && !copyStatement->is_program)
|
if (copyStatement->filename != NULL && !copyStatement->is_program)
|
||||||
{
|
{
|
||||||
const char *filename = copyStatement->filename;
|
char *filename = copyStatement->filename;
|
||||||
|
|
||||||
if (CacheDirectoryElement(filename))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Only superusers are allowed to copy from a file, so we have to
|
|
||||||
* become superuser to execute copies to/from files used by citus'
|
|
||||||
* query execution.
|
|
||||||
*
|
|
||||||
* XXX: This is a decidedly suboptimal solution, as that means
|
|
||||||
* that triggers, input functions, etc. run with elevated
|
|
||||||
* privileges. But this is better than not being able to run
|
|
||||||
* queries as normal user.
|
|
||||||
*/
|
|
||||||
*commandMustRunAsOwner = true;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Have to manually check permissions here as the COPY is will be
|
* We execute COPY commands issued by the task-tracker executor here
|
||||||
* run as a superuser.
|
* because we're not normally allowed to write to a file as a regular
|
||||||
|
* user and we don't want to execute the query as superuser.
|
||||||
*/
|
*/
|
||||||
if (copyStatement->relation != NULL)
|
if (CacheDirectoryElement(filename) && copyStatement->query != NULL &&
|
||||||
|
!copyStatement->is_from && !is_absolute_path(filename))
|
||||||
{
|
{
|
||||||
CheckCopyPermissions(copyStatement);
|
bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary");
|
||||||
|
int64 tuplesSent = 0;
|
||||||
|
Query *query = NULL;
|
||||||
|
Node *queryNode = copyStatement->query;
|
||||||
|
List *queryTreeList = NIL;
|
||||||
|
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
RawStmt *rawStmt = makeNode(RawStmt);
|
||||||
|
rawStmt->stmt = queryNode;
|
||||||
|
|
||||||
|
queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
|
||||||
|
#else
|
||||||
|
queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
if (list_length(queryTreeList) != 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("can only execute a single query")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
query = (Query *) linitial(queryTreeList);
|
||||||
* Check if we have a "COPY (query) TO filename". If we do, copy
|
tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat);
|
||||||
* doesn't accept relative file paths. However, SQL tasks that get
|
|
||||||
* assigned to worker nodes have relative paths. We therefore
|
|
||||||
* convert relative paths to absolute ones here.
|
|
||||||
*/
|
|
||||||
if (copyStatement->relation == NULL &&
|
|
||||||
!copyStatement->is_from &&
|
|
||||||
!is_absolute_path(filename))
|
|
||||||
{
|
|
||||||
copyStatement->filename = make_absolute_path(filename);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
|
||||||
|
"COPY " UINT64_FORMAT, tuplesSent);
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return (Node *) copyStatement;
|
return (Node *) copyStatement;
|
||||||
}
|
}
|
||||||
|
@ -3913,7 +3903,7 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi
|
||||||
*
|
*
|
||||||
* Copied from postgres, where it's part of DoCopy().
|
* Copied from postgres, where it's part of DoCopy().
|
||||||
*/
|
*/
|
||||||
static void
|
void
|
||||||
CheckCopyPermissions(CopyStmt *copyStatement)
|
CheckCopyPermissions(CopyStmt *copyStatement)
|
||||||
{
|
{
|
||||||
/* *INDENT-OFF* */
|
/* *INDENT-OFF* */
|
||||||
|
|
|
@ -157,6 +157,10 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
|
|
||||||
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
|
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
|
||||||
boundParams, plannerRestrictionContext);
|
boundParams, plannerRestrictionContext);
|
||||||
|
|
||||||
|
setPartitionedTablesInherited = true;
|
||||||
|
AdjustPartitioningForDistributedPlanning(parse,
|
||||||
|
setPartitionedTablesInherited);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PG_CATCH();
|
PG_CATCH();
|
||||||
|
@ -166,13 +170,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
|
||||||
}
|
}
|
||||||
PG_END_TRY();
|
PG_END_TRY();
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
|
||||||
{
|
|
||||||
setPartitionedTablesInherited = true;
|
|
||||||
|
|
||||||
AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* remove the context from the context list */
|
/* remove the context from the context list */
|
||||||
PopPlannerRestrictionContext();
|
PopPlannerRestrictionContext();
|
||||||
|
|
||||||
|
@ -1517,6 +1514,13 @@ CurrentPlannerRestrictionContext(void)
|
||||||
plannerRestrictionContext =
|
plannerRestrictionContext =
|
||||||
(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
|
(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
|
||||||
|
|
||||||
|
if (plannerRestrictionContext == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
|
||||||
|
errmsg("planner restriction context stack was empty"),
|
||||||
|
errdetail("Please report this to the Citus core team.")));
|
||||||
|
}
|
||||||
|
|
||||||
return plannerRestrictionContext;
|
return plannerRestrictionContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1559,22 +1563,30 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
|
||||||
/* check whether parameter is available (and valid) */
|
/* check whether parameter is available (and valid) */
|
||||||
if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
|
if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
|
||||||
{
|
{
|
||||||
ParamExternData *externParam = &boundParams->params[paramId - 1];
|
ParamExternData *externParam = NULL;
|
||||||
Oid paramType = externParam->ptype;
|
Oid paramType = InvalidOid;
|
||||||
|
|
||||||
/* give hook a chance in case parameter is dynamic */
|
/* give hook a chance in case parameter is dynamic */
|
||||||
if (!OidIsValid(paramType) && boundParams->paramFetch != NULL)
|
if (boundParams->paramFetch != NULL)
|
||||||
{
|
{
|
||||||
#if (PG_VERSION_NUM >= 110000)
|
#if (PG_VERSION_NUM >= 110000)
|
||||||
ParamExternData externParamPlaceholder;
|
ParamExternData externParamPlaceholder;
|
||||||
externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
|
externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
|
||||||
&externParamPlaceholder);
|
&externParamPlaceholder);
|
||||||
#else
|
#else
|
||||||
|
externParam = &boundParams->params[paramId - 1];
|
||||||
|
if (!OidIsValid(externParam->ptype))
|
||||||
|
{
|
||||||
(*boundParams->paramFetch)(boundParams, paramId);
|
(*boundParams->paramFetch)(boundParams, paramId);
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
paramType = externParam->ptype;
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
externParam = &boundParams->params[paramId - 1];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
paramType = externParam->ptype;
|
||||||
if (OidIsValid(paramType))
|
if (OidIsValid(paramType))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -106,6 +106,8 @@ static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column
|
||||||
List *tableIdList);
|
List *tableIdList);
|
||||||
static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
|
static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
|
||||||
static Query * BuildSubqueryJobQuery(MultiNode *multiNode);
|
static Query * BuildSubqueryJobQuery(MultiNode *multiNode);
|
||||||
|
static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
|
||||||
|
List *dependedJobList);
|
||||||
static void UpdateColumnAttributes(Var *column, List *rangeTableList,
|
static void UpdateColumnAttributes(Var *column, List *rangeTableList,
|
||||||
List *dependedJobList);
|
List *dependedJobList);
|
||||||
static Index NewTableId(Index originalTableId, List *rangeTableList);
|
static Index NewTableId(Index originalTableId, List *rangeTableList);
|
||||||
|
@ -582,10 +584,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
|
||||||
List *sortClauseList = NIL;
|
List *sortClauseList = NIL;
|
||||||
List *groupClauseList = NIL;
|
List *groupClauseList = NIL;
|
||||||
List *selectClauseList = NIL;
|
List *selectClauseList = NIL;
|
||||||
List *columnList = NIL;
|
|
||||||
Node *limitCount = NULL;
|
Node *limitCount = NULL;
|
||||||
Node *limitOffset = NULL;
|
Node *limitOffset = NULL;
|
||||||
ListCell *columnCell = NULL;
|
|
||||||
FromExpr *joinTree = NULL;
|
FromExpr *joinTree = NULL;
|
||||||
Node *joinRoot = NULL;
|
Node *joinRoot = NULL;
|
||||||
Node *havingQual = NULL;
|
Node *havingQual = NULL;
|
||||||
|
@ -653,13 +653,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
|
||||||
/* update the column attributes for target entries */
|
/* update the column attributes for target entries */
|
||||||
if (updateColumnAttributes)
|
if (updateColumnAttributes)
|
||||||
{
|
{
|
||||||
ListCell *columnCell = NULL;
|
UpdateAllColumnAttributes((Node *) targetList, rangeTableList, dependedJobList);
|
||||||
List *columnList = pull_var_clause_default((Node *) targetList);
|
|
||||||
foreach(columnCell, columnList)
|
|
||||||
{
|
|
||||||
Var *column = (Var *) lfirst(columnCell);
|
|
||||||
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* extract limit count/offset and sort clauses */
|
/* extract limit count/offset and sort clauses */
|
||||||
|
@ -679,16 +673,12 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
|
||||||
/* build the where clause list using select predicates */
|
/* build the where clause list using select predicates */
|
||||||
selectClauseList = QuerySelectClauseList(multiNode);
|
selectClauseList = QuerySelectClauseList(multiNode);
|
||||||
|
|
||||||
/* set correct column attributes for select columns */
|
/* set correct column attributes for select and having clauses */
|
||||||
if (updateColumnAttributes)
|
if (updateColumnAttributes)
|
||||||
{
|
{
|
||||||
columnCell = NULL;
|
UpdateAllColumnAttributes((Node *) selectClauseList, rangeTableList,
|
||||||
columnList = pull_var_clause_default((Node *) selectClauseList);
|
dependedJobList);
|
||||||
foreach(columnCell, columnList)
|
UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList);
|
||||||
{
|
|
||||||
Var *column = (Var *) lfirst(columnCell);
|
|
||||||
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1529,6 +1519,25 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* UpdateAllColumnAttributes extracts column references from provided columnContainer
|
||||||
|
* and calls UpdateColumnAttributes to updates the column's range table reference (varno) and
|
||||||
|
* column attribute number for the range table (varattno).
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
|
||||||
|
List *dependedJobList)
|
||||||
|
{
|
||||||
|
ListCell *columnCell = NULL;
|
||||||
|
List *columnList = pull_var_clause_default(columnContainer);
|
||||||
|
foreach(columnCell, columnList)
|
||||||
|
{
|
||||||
|
Var *column = (Var *) lfirst(columnCell);
|
||||||
|
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateColumnAttributes updates the column's range table reference (varno) and
|
* UpdateColumnAttributes updates the column's range table reference (varno) and
|
||||||
* column attribute number for the range table (varattno). The function uses the
|
* column attribute number for the range table (varattno). The function uses the
|
||||||
|
|
|
@ -55,6 +55,7 @@
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
#include "optimizer/planner.h"
|
#include "optimizer/planner.h"
|
||||||
#include "optimizer/paths.h"
|
#include "optimizer/paths.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
#include "utils/guc.h"
|
#include "utils/guc.h"
|
||||||
#include "utils/guc_tables.h"
|
#include "utils/guc_tables.h"
|
||||||
|
|
||||||
|
@ -65,6 +66,7 @@ static char *CitusVersion = CITUS_VERSION;
|
||||||
|
|
||||||
void _PG_init(void);
|
void _PG_init(void);
|
||||||
|
|
||||||
|
static void ResizeStackToMaximumDepth(void);
|
||||||
static void multi_log_hook(ErrorData *edata);
|
static void multi_log_hook(ErrorData *edata);
|
||||||
static void CreateRequiredDirectories(void);
|
static void CreateRequiredDirectories(void);
|
||||||
static void RegisterCitusConfigVariables(void);
|
static void RegisterCitusConfigVariables(void);
|
||||||
|
@ -169,6 +171,8 @@ _PG_init(void)
|
||||||
"shared_preload_libraries.")));
|
"shared_preload_libraries.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ResizeStackToMaximumDepth();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Extend the database directory structure before continuing with
|
* Extend the database directory structure before continuing with
|
||||||
* initialization - one of the later steps might require them to exist.
|
* initialization - one of the later steps might require them to exist.
|
||||||
|
@ -234,6 +238,35 @@ _PG_init(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Stack size increase during high memory load may cause unexpected crashes.
|
||||||
|
* With this alloca call, we are increasing stack size explicitly, so that if
|
||||||
|
* it is not possible to increase stack size, we will get an OOM error instead
|
||||||
|
* of a crash.
|
||||||
|
*
|
||||||
|
* This function is called on backend startup. The allocated memory will
|
||||||
|
* automatically be released at the end of the function's scope. However, we'd
|
||||||
|
* have already expanded the stack and it wouldn't shrink back. So, in a sense,
|
||||||
|
* per backend we're securing max_stack_depth kB's of memory on the stack upfront.
|
||||||
|
*
|
||||||
|
* Not all the backends require max_stack_depth kB's on the stack, so we might end
|
||||||
|
* up with unnecessary allocations. However, the default value is 2MB, which seems
|
||||||
|
* an acceptable trade-off. Also, allocating memory upfront may perform better
|
||||||
|
* under some circumstances.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
ResizeStackToMaximumDepth(void)
|
||||||
|
{
|
||||||
|
#ifndef WIN32
|
||||||
|
volatile char *stack_resizer = NULL;
|
||||||
|
long max_stack_depth_bytes = max_stack_depth * 1024L;
|
||||||
|
|
||||||
|
stack_resizer = alloca(max_stack_depth_bytes);
|
||||||
|
stack_resizer[max_stack_depth_bytes - 1] = 0;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* multi_log_hook intercepts postgres log commands. We use this to override
|
* multi_log_hook intercepts postgres log commands. We use this to override
|
||||||
* postgres error messages when they're not specific enough for the users.
|
* postgres error messages when they're not specific enough for the users.
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/transaction_identifier.h"
|
#include "distributed/transaction_identifier.h"
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
|
#include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
|
||||||
#include "storage/ipc.h"
|
#include "storage/ipc.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "storage/lwlock.h"
|
#include "storage/lwlock.h"
|
||||||
|
@ -512,6 +513,7 @@ BackendManagementShmemInit(void)
|
||||||
if (!alreadyInitialized)
|
if (!alreadyInitialized)
|
||||||
{
|
{
|
||||||
int backendIndex = 0;
|
int backendIndex = 0;
|
||||||
|
int totalProcs = 0;
|
||||||
char *trancheName = "Backend Management Tranche";
|
char *trancheName = "Backend Management Tranche";
|
||||||
|
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
@ -557,7 +559,8 @@ BackendManagementShmemInit(void)
|
||||||
* We also initiate initiatorNodeIdentifier to -1, which can never be
|
* We also initiate initiatorNodeIdentifier to -1, which can never be
|
||||||
* used as a node id.
|
* used as a node id.
|
||||||
*/
|
*/
|
||||||
for (backendIndex = 0; backendIndex < TotalProcs; ++backendIndex)
|
totalProcs = TotalProcCount();
|
||||||
|
for (backendIndex = 0; backendIndex < totalProcs; ++backendIndex)
|
||||||
{
|
{
|
||||||
backendManagementShmemData->backends[backendIndex].citusBackend.
|
backendManagementShmemData->backends[backendIndex].citusBackend.
|
||||||
initiatorNodeIdentifier = -1;
|
initiatorNodeIdentifier = -1;
|
||||||
|
@ -582,14 +585,62 @@ static size_t
|
||||||
BackendManagementShmemSize(void)
|
BackendManagementShmemSize(void)
|
||||||
{
|
{
|
||||||
Size size = 0;
|
Size size = 0;
|
||||||
|
int totalProcs = TotalProcCount();
|
||||||
|
|
||||||
size = add_size(size, sizeof(BackendManagementShmemData));
|
size = add_size(size, sizeof(BackendManagementShmemData));
|
||||||
size = add_size(size, mul_size(sizeof(BackendData), TotalProcs));
|
size = add_size(size, mul_size(sizeof(BackendData), totalProcs));
|
||||||
|
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TotalProcCount returns the total processes that could run via the current
|
||||||
|
* postgres server. See the details in the function comments.
|
||||||
|
*
|
||||||
|
* There is one thing we should warn the readers. Citus enforces to be loaded
|
||||||
|
* as the first extension in shared_preload_libraries. However, if any other
|
||||||
|
* extension overrides MaxConnections, autovacuum_max_workers or
|
||||||
|
* max_worker_processes, our reasoning in this function may not work as expected.
|
||||||
|
* Given that it is not a usual pattern for extension, we consider Citus' behaviour
|
||||||
|
* good enough for now.
|
||||||
|
*/
|
||||||
|
int
|
||||||
|
TotalProcCount(void)
|
||||||
|
{
|
||||||
|
int maxBackends = 0;
|
||||||
|
int totalProcs = 0;
|
||||||
|
|
||||||
|
#ifdef WIN32
|
||||||
|
|
||||||
|
/* autovacuum_max_workers is not PGDLLIMPORT, so use a high estimate for windows */
|
||||||
|
int estimatedMaxAutovacuumWorkers = 30;
|
||||||
|
maxBackends =
|
||||||
|
MaxConnections + estimatedMaxAutovacuumWorkers + 1 + max_worker_processes;
|
||||||
|
#else
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We're simply imitating Postgrsql's InitializeMaxBackends(). Given that all
|
||||||
|
* the items used here PGC_POSTMASTER, should be safe to access them
|
||||||
|
* anytime during the execution even before InitializeMaxBackends() is called.
|
||||||
|
*/
|
||||||
|
maxBackends = MaxConnections + autovacuum_max_workers + 1 + max_worker_processes;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We prefer to maintain space for auxiliary procs or preperad transactions in
|
||||||
|
* the backend space because they could be blocking processes and our current
|
||||||
|
* implementation of distributed deadlock detection could process them
|
||||||
|
* as a regular backend. In the future, we could consider chaning deadlock
|
||||||
|
* detection algorithm to ignore auxiliary procs or preperad transactions and
|
||||||
|
* save same space.
|
||||||
|
*/
|
||||||
|
totalProcs = maxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
|
||||||
|
|
||||||
|
return totalProcs;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeBackendData initialises MyBackendData to the shared memory segment
|
* InitializeBackendData initialises MyBackendData to the shared memory segment
|
||||||
* belonging to the current backend.
|
* belonging to the current backend.
|
||||||
|
@ -754,9 +805,15 @@ AssignDistributedTransactionId(void)
|
||||||
void
|
void
|
||||||
MarkCitusInitiatedCoordinatorBackend(void)
|
MarkCitusInitiatedCoordinatorBackend(void)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* GetLocalGroupId may throw exception which can cause leaving spin lock
|
||||||
|
* unreleased. Calling GetLocalGroupId function before the lock to avoid this.
|
||||||
|
*/
|
||||||
|
int localGroupId = GetLocalGroupId();
|
||||||
|
|
||||||
SpinLockAcquire(&MyBackendData->mutex);
|
SpinLockAcquire(&MyBackendData->mutex);
|
||||||
|
|
||||||
MyBackendData->citusBackend.initiatorNodeIdentifier = GetLocalGroupId();
|
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
|
||||||
MyBackendData->citusBackend.transactionOriginator = true;
|
MyBackendData->citusBackend.transactionOriginator = true;
|
||||||
|
|
||||||
SpinLockRelease(&MyBackendData->mutex);
|
SpinLockRelease(&MyBackendData->mutex);
|
||||||
|
|
|
@ -389,6 +389,7 @@ BuildLocalWaitGraph(void)
|
||||||
WaitGraph *waitGraph = NULL;
|
WaitGraph *waitGraph = NULL;
|
||||||
int curBackend = 0;
|
int curBackend = 0;
|
||||||
PROCStack remaining;
|
PROCStack remaining;
|
||||||
|
int totalProcs = TotalProcCount();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
|
||||||
|
@ -398,12 +399,12 @@ BuildLocalWaitGraph(void)
|
||||||
*/
|
*/
|
||||||
waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
|
waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
|
||||||
waitGraph->localNodeId = GetLocalGroupId();
|
waitGraph->localNodeId = GetLocalGroupId();
|
||||||
waitGraph->allocatedSize = TotalProcs * 3;
|
waitGraph->allocatedSize = totalProcs * 3;
|
||||||
waitGraph->edgeCount = 0;
|
waitGraph->edgeCount = 0;
|
||||||
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
|
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
|
||||||
|
|
||||||
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * TotalProcs);
|
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * totalProcs);
|
||||||
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * TotalProcs);
|
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * totalProcs);
|
||||||
remaining.procCount = 0;
|
remaining.procCount = 0;
|
||||||
|
|
||||||
LockLockData();
|
LockLockData();
|
||||||
|
@ -416,7 +417,7 @@ BuildLocalWaitGraph(void)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
/* build list of starting procs */
|
/* build list of starting procs */
|
||||||
for (curBackend = 0; curBackend < TotalProcs; curBackend++)
|
for (curBackend = 0; curBackend < totalProcs; curBackend++)
|
||||||
{
|
{
|
||||||
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
|
||||||
BackendData currentBackendData;
|
BackendData currentBackendData;
|
||||||
|
@ -762,7 +763,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert(remaining->procCount < TotalProcs);
|
Assert(remaining->procCount < TotalProcCount());
|
||||||
|
|
||||||
remaining->procs[remaining->procCount++] = proc;
|
remaining->procs[remaining->procCount++] = proc;
|
||||||
remaining->procAdded[proc->pgprocno] = true;
|
remaining->procAdded[proc->pgprocno] = true;
|
||||||
|
|
|
@ -31,6 +31,7 @@
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_logical_optimizer.h"
|
#include "distributed/multi_logical_optimizer.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
Oid sourceShardRelationId = InvalidOid;
|
Oid sourceShardRelationId = InvalidOid;
|
||||||
Oid sourceSchemaId = InvalidOid;
|
Oid sourceSchemaId = InvalidOid;
|
||||||
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
|
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
|
||||||
localFilePath->data);
|
localFilePath->data);
|
||||||
|
|
||||||
|
/* make sure we are allowed to execute the COPY command */
|
||||||
|
CheckCopyPermissions(localCopyCommand);
|
||||||
|
|
||||||
|
/* need superuser to copy from files */
|
||||||
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
CitusProcessUtility((Node *) localCopyCommand, queryString->data,
|
CitusProcessUtility((Node *) localCopyCommand, queryString->data,
|
||||||
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||||
|
|
||||||
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
/* finally delete the temporary file we created */
|
/* finally delete the temporary file we created */
|
||||||
CitusDeleteFile(localFilePath->data);
|
CitusDeleteFile(localFilePath->data);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,280 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* worker_sql_task_protocol.c
|
||||||
|
*
|
||||||
|
* Routines for executing SQL tasks during task-tracker execution.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2012-2018, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "funcapi.h"
|
||||||
|
#include "pgstat.h"
|
||||||
|
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
|
#include "distributed/multi_executor.h"
|
||||||
|
#include "distributed/transmit.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* TaskFileDestReceiver can be used to stream results into a file */
|
||||||
|
typedef struct TaskFileDestReceiver
|
||||||
|
{
|
||||||
|
/* public DestReceiver interface */
|
||||||
|
DestReceiver pub;
|
||||||
|
|
||||||
|
/* descriptor of the tuples that are sent to the worker */
|
||||||
|
TupleDesc tupleDescriptor;
|
||||||
|
|
||||||
|
/* EState for per-tuple memory allocation */
|
||||||
|
EState *executorState;
|
||||||
|
|
||||||
|
/* MemoryContext for DestReceiver session */
|
||||||
|
MemoryContext memoryContext;
|
||||||
|
|
||||||
|
/* output file */
|
||||||
|
char *filePath;
|
||||||
|
File fileDesc;
|
||||||
|
bool binaryCopyFormat;
|
||||||
|
|
||||||
|
/* state on how to copy out data types */
|
||||||
|
CopyOutState copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions;
|
||||||
|
|
||||||
|
/* number of tuples sent */
|
||||||
|
uint64 tuplesSent;
|
||||||
|
} TaskFileDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState,
|
||||||
|
bool binaryCopyFormat);
|
||||||
|
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
TupleDesc inputTupleDescriptor);
|
||||||
|
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
|
||||||
|
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
|
||||||
|
static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
|
||||||
|
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WorkerExecuteSqlTask executes an already-parsed query and writes the result
|
||||||
|
* to the given task file.
|
||||||
|
*/
|
||||||
|
int64
|
||||||
|
WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
|
||||||
|
{
|
||||||
|
EState *estate = NULL;
|
||||||
|
TaskFileDestReceiver *taskFileDest = NULL;
|
||||||
|
ParamListInfo paramListInfo = NULL;
|
||||||
|
int64 tuplesSent = 0L;
|
||||||
|
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
taskFileDest =
|
||||||
|
(TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate,
|
||||||
|
binaryCopyFormat);
|
||||||
|
|
||||||
|
ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest);
|
||||||
|
|
||||||
|
tuplesSent = taskFileDest->tuplesSent;
|
||||||
|
|
||||||
|
taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest);
|
||||||
|
FreeExecutorState(estate);
|
||||||
|
|
||||||
|
return tuplesSent;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateTaskFileDestReceiver creates a DestReceiver for writing query results
|
||||||
|
* to a task file.
|
||||||
|
*/
|
||||||
|
static DestReceiver *
|
||||||
|
CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat)
|
||||||
|
{
|
||||||
|
TaskFileDestReceiver *taskFileDest = NULL;
|
||||||
|
|
||||||
|
taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver));
|
||||||
|
|
||||||
|
/* set up the DestReceiver function pointers */
|
||||||
|
taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive;
|
||||||
|
taskFileDest->pub.rStartup = TaskFileDestReceiverStartup;
|
||||||
|
taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown;
|
||||||
|
taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy;
|
||||||
|
taskFileDest->pub.mydest = DestCopyOut;
|
||||||
|
|
||||||
|
/* set up output parameters */
|
||||||
|
taskFileDest->executorState = executorState;
|
||||||
|
taskFileDest->memoryContext = CurrentMemoryContext;
|
||||||
|
taskFileDest->filePath = pstrdup(filePath);
|
||||||
|
taskFileDest->binaryCopyFormat = binaryCopyFormat;
|
||||||
|
|
||||||
|
return (DestReceiver *) taskFileDest;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TaskFileDestReceiverStartup implements the rStartup interface of
|
||||||
|
* TaskFileDestReceiver. It opens the destination file and sets up
|
||||||
|
* the CopyOutState.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
TupleDesc inputTupleDescriptor)
|
||||||
|
{
|
||||||
|
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
|
||||||
|
|
||||||
|
CopyOutState copyOutState = NULL;
|
||||||
|
const char *delimiterCharacter = "\t";
|
||||||
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
|
const int fileMode = (S_IRUSR | S_IWUSR);
|
||||||
|
|
||||||
|
/* use the memory context that was in place when the DestReceiver was created */
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext);
|
||||||
|
|
||||||
|
taskFileDest->tupleDescriptor = inputTupleDescriptor;
|
||||||
|
|
||||||
|
/* define how tuples will be serialised */
|
||||||
|
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||||
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
|
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->binary = taskFileDest->binaryCopyFormat;
|
||||||
|
copyOutState->fe_msgbuf = makeStringInfo();
|
||||||
|
copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState);
|
||||||
|
taskFileDest->copyOutState = copyOutState;
|
||||||
|
|
||||||
|
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
|
||||||
|
copyOutState->binary);
|
||||||
|
|
||||||
|
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
|
||||||
|
fileMode);
|
||||||
|
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
/* write headers when using binary encoding */
|
||||||
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
AppendCopyBinaryHeaders(copyOutState);
|
||||||
|
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TaskFileDestReceiverReceive implements the receiveSlot function of
|
||||||
|
* TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents
|
||||||
|
* to a local file.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
|
{
|
||||||
|
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
|
||||||
|
|
||||||
|
TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor;
|
||||||
|
|
||||||
|
CopyOutState copyOutState = taskFileDest->copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions;
|
||||||
|
|
||||||
|
Datum *columnValues = NULL;
|
||||||
|
bool *columnNulls = NULL;
|
||||||
|
StringInfo copyData = copyOutState->fe_msgbuf;
|
||||||
|
|
||||||
|
EState *executorState = taskFileDest->executorState;
|
||||||
|
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
|
slot_getallattrs(slot);
|
||||||
|
|
||||||
|
columnValues = slot->tts_values;
|
||||||
|
columnNulls = slot->tts_isnull;
|
||||||
|
|
||||||
|
resetStringInfo(copyData);
|
||||||
|
|
||||||
|
/* construct row in COPY format */
|
||||||
|
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
|
copyOutState, columnOutputFunctions, NULL);
|
||||||
|
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
taskFileDest->tuplesSent++;
|
||||||
|
|
||||||
|
ResetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
WriteToLocalFile(StringInfo copyData, File fileDesc)
|
||||||
|
{
|
||||||
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
|
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
|
||||||
|
#else
|
||||||
|
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len);
|
||||||
|
#endif
|
||||||
|
if (bytesWritten < 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not append to file: %m")));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TaskFileDestReceiverShutdown implements the rShutdown interface of
|
||||||
|
* TaskFileDestReceiver. It writes the footer and closes the file.
|
||||||
|
* the relation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
|
||||||
|
CopyOutState copyOutState = taskFileDest->copyOutState;
|
||||||
|
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
/* write footers when using binary encoding */
|
||||||
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
AppendCopyBinaryFooters(copyOutState);
|
||||||
|
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
FileClose(taskFileDest->fileDesc);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TaskFileDestReceiverDestroy frees memory allocated as part of the
|
||||||
|
* TaskFileDestReceiver and closes file descriptors.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
if (taskFileDest->copyOutState)
|
||||||
|
{
|
||||||
|
pfree(taskFileDest->copyOutState);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taskFileDest->columnOutputFunctions)
|
||||||
|
{
|
||||||
|
pfree(taskFileDest->columnOutputFunctions);
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(taskFileDest->filePath);
|
||||||
|
pfree(taskFileDest);
|
||||||
|
}
|
|
@ -22,9 +22,6 @@
|
||||||
#include "storage/s_lock.h"
|
#include "storage/s_lock.h"
|
||||||
|
|
||||||
|
|
||||||
#define TotalProcs (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusInitiatedBackend keeps some information about the backends that are
|
* CitusInitiatedBackend keeps some information about the backends that are
|
||||||
* initiated by Citus.
|
* initiated by Citus.
|
||||||
|
@ -58,6 +55,7 @@ typedef struct BackendData
|
||||||
|
|
||||||
|
|
||||||
extern void InitializeBackendManagement(void);
|
extern void InitializeBackendManagement(void);
|
||||||
|
extern int TotalProcCount(void);
|
||||||
extern void InitializeBackendData(void);
|
extern void InitializeBackendData(void);
|
||||||
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
extern void LockBackendSharedMemory(LWLockMode lockMode);
|
||||||
extern void UnlockBackendSharedMemory(void);
|
extern void UnlockBackendSharedMemory(void);
|
||||||
|
|
|
@ -131,6 +131,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
||||||
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
||||||
|
extern void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_COPY_H */
|
#endif /* MULTI_COPY_H */
|
||||||
|
|
|
@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
|
||||||
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
|
||||||
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
|
||||||
tupleDescriptor, Tuplestorestate *tupstore);
|
tupleDescriptor, Tuplestorestate *tupstore);
|
||||||
|
extern Query * ParseQueryString(const char *queryString);
|
||||||
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
|
||||||
params,
|
params,
|
||||||
DestReceiver *dest);
|
DestReceiver *dest);
|
||||||
|
|
|
@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur
|
||||||
extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk);
|
extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk);
|
||||||
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
||||||
const char *tableName);
|
const char *tableName);
|
||||||
|
extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename,
|
||||||
|
bool binaryCopyFormat);
|
||||||
|
|
||||||
|
|
||||||
/* Function declarations shared with the master planner */
|
/* Function declarations shared with the master planner */
|
||||||
|
|
|
@ -9,3 +9,6 @@
|
||||||
# Regression test output
|
# Regression test output
|
||||||
/regression.diffs
|
/regression.diffs
|
||||||
/regression.out
|
/regression.out
|
||||||
|
|
||||||
|
# Failure test side effets
|
||||||
|
/proxy.output
|
||||||
|
|
|
@ -122,17 +122,28 @@ SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- hide the error message (it has the PID)...
|
-- this transaction block will be sent to the coordinator as a remote command to hide the
|
||||||
|
-- error message that is caused during commit.
|
||||||
-- we'll test for the txn side-effects to ensure it didn't run
|
-- we'll test for the txn side-effects to ensure it didn't run
|
||||||
SET client_min_messages TO FATAL;
|
SELECT master_run_on_worker(
|
||||||
|
ARRAY['localhost']::text[],
|
||||||
|
ARRAY[:master_port]::int[],
|
||||||
|
ARRAY['
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
DELETE FROM dml_test WHERE id = 2;
|
DELETE FROM dml_test WHERE id = 2;
|
||||||
INSERT INTO dml_test VALUES (5, 'Epsilon');
|
INSERT INTO dml_test VALUES (5, ''Epsilon'');
|
||||||
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
|
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
|
||||||
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
|
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
SET client_min_messages TO DEFAULT;
|
'],
|
||||||
|
false
|
||||||
|
);
|
||||||
|
master_run_on_worker
|
||||||
|
---------------------------
|
||||||
|
(localhost,57636,t,BEGIN)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
mitmproxy
|
mitmproxy
|
||||||
-----------
|
-----------
|
||||||
|
|
|
@ -23,16 +23,6 @@ SELECT create_distributed_table('test_table','id');
|
||||||
|
|
||||||
-- Populate data to the table
|
-- Populate data to the table
|
||||||
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
||||||
-- Create a function to make sure that queries returning the same result
|
|
||||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
|
||||||
BEGIN
|
|
||||||
EXECUTE query;
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
|
||||||
RAISE 'Task failed to execute';
|
|
||||||
END IF;
|
|
||||||
END;
|
|
||||||
$$LANGUAGE plpgsql;
|
|
||||||
-- Kill when the first COPY command arrived, since we have a single placement
|
-- Kill when the first COPY command arrived, since we have a single placement
|
||||||
-- it is expected to error out.
|
-- it is expected to error out.
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
|
@ -42,9 +32,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
|
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
|
||||||
ERROR: Task failed to execute
|
ERROR: Task failed to execute
|
||||||
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
-- Kill the connection with a CTE
|
-- Kill the connection with a CTE
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
|
@ -70,12 +60,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT raise_failed_execution('WITH
|
SELECT public.raise_failed_execution('WITH
|
||||||
results AS (SELECT * FROM test_table)
|
results AS (SELECT * FROM test_table)
|
||||||
SELECT * FROM test_table, results
|
SELECT * FROM test_table, results
|
||||||
WHERE test_table.id = results.id');
|
WHERE test_table.id = results.id');
|
||||||
ERROR: Task failed to execute
|
ERROR: Task failed to execute
|
||||||
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
-- In parallel execution mode Citus opens separate connections for each shard
|
-- In parallel execution mode Citus opens separate connections for each shard
|
||||||
-- so killing the connection after the first copy does not break it.
|
-- so killing the connection after the first copy does not break it.
|
||||||
|
@ -297,7 +287,5 @@ WARNING: could not consume data from worker node
|
||||||
|
|
||||||
COMMIT;
|
COMMIT;
|
||||||
DROP SCHEMA real_time_select_failure CASCADE;
|
DROP SCHEMA real_time_select_failure CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to table test_table
|
||||||
DETAIL: drop cascades to function raise_failed_execution(text)
|
|
||||||
drop cascades to table test_table
|
|
||||||
SET search_path TO default;
|
SET search_path TO default;
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
Parsed test spec with 3 sessions
|
Parsed test spec with 4 sessions
|
||||||
|
|
||||||
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection
|
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection
|
||||||
step s1-begin:
|
step s1-begin:
|
||||||
|
@ -747,3 +747,103 @@ stop_session_level_connection_to_node
|
||||||
restore_isolation_tester_func
|
restore_isolation_tester_func
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-update-on-the-coordinator s2-update-on-the-coordinator s3-select-distributed-waiting-queries s1-commit
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-update-on-the-coordinator:
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
|
||||||
|
step s2-update-on-the-coordinator:
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
<waiting ...>
|
||||||
|
step s3-select-distributed-waiting-queries:
|
||||||
|
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
|
||||||
|
|
||||||
|
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
|
||||||
|
|
||||||
|
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
coordinator_hostcoordinator_host57636 57636
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-update-on-the-coordinator: <... completed>
|
||||||
|
restore_isolation_tester_func
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s4-start-session-level-connection s4-begin-on-worker s4-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s4-commit-worker s1-stop-connection s4-stop-connection
|
||||||
|
step s1-start-session-level-connection:
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||||
|
|
||||||
|
start_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s1-begin-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s1-update-dist-table:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-start-session-level-connection:
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||||
|
|
||||||
|
start_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-begin-on-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-update-dist-table:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
|
||||||
|
<waiting ...>
|
||||||
|
step s3-select-distributed-waiting-queries:
|
||||||
|
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
|
||||||
|
|
||||||
|
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
|
||||||
|
|
||||||
|
UPDATE tt1 SET value_1 = 5UPDATE tt1 SET value_1 = 4localhost localhost 57637 57637
|
||||||
|
step s1-commit-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-update-dist-table: <... completed>
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-commit-worker:
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
|
||||||
|
run_commands_on_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s1-stop-connection:
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
|
||||||
|
stop_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
step s4-stop-connection:
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
|
||||||
|
stop_session_level_connection_to_node
|
||||||
|
|
||||||
|
|
||||||
|
restore_isolation_tester_func
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -155,7 +155,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-8';
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
---------------
|
---------------
|
||||||
8.0devel
|
8.0.3
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- ensure no objects were created outside pg_catalog
|
-- ensure no objects were created outside pg_catalog
|
||||||
|
|
|
@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
count
|
count | min
|
||||||
-------
|
-------+-------------
|
||||||
2
|
2 | full_access
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
|
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
count
|
count | min
|
||||||
-------
|
-------+-------------
|
||||||
2
|
2 | read_access
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
|
@ -171,7 +171,7 @@ ERROR: permission denied for table test
|
||||||
SELECT count(*) FROM test WHERE id = 1;
|
SELECT count(*) FROM test WHERE id = 1;
|
||||||
ERROR: permission denied for table test
|
ERROR: permission denied for table test
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
ERROR: permission denied for table test
|
ERROR: permission denied for table test
|
||||||
-- test re-partition query
|
-- test re-partition query
|
||||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||||
|
|
|
@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
count
|
count | min
|
||||||
-------
|
-------+-------------
|
||||||
2
|
2 | full_access
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
|
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
count
|
count | min
|
||||||
-------
|
-------+-------------
|
||||||
2
|
2 | read_access
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
|
@ -171,7 +171,7 @@ ERROR: permission denied for relation test
|
||||||
SELECT count(*) FROM test WHERE id = 1;
|
SELECT count(*) FROM test WHERE id = 1;
|
||||||
ERROR: permission denied for relation test
|
ERROR: permission denied for relation test
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
ERROR: permission denied for relation test
|
ERROR: permission denied for relation test
|
||||||
-- test re-partition query
|
-- test re-partition query
|
||||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||||
|
|
|
@ -225,8 +225,12 @@ ERROR: parameter "citus.max_task_string_size" cannot be changed without restart
|
||||||
-- error message may vary between executions
|
-- error message may vary between executions
|
||||||
-- hiding warning and error message
|
-- hiding warning and error message
|
||||||
-- no output means the query has failed
|
-- no output means the query has failed
|
||||||
SET client_min_messages to FATAL;
|
SET client_min_messages to ERROR;
|
||||||
|
SELECT raise_failed_execution('
|
||||||
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||||
|
');
|
||||||
|
ERROR: Task failed to execute
|
||||||
|
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
|
||||||
-- following will succeed since it fetches few columns
|
-- following will succeed since it fetches few columns
|
||||||
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||||
long_column_001 | long_column_002 | long_column_003
|
long_column_001 | long_column_002 | long_column_003
|
||||||
|
|
|
@ -109,3 +109,13 @@ $desc_views$
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Create a function to make sure that queries returning the same result
|
||||||
|
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE query;
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
RAISE 'Task failed to execute';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$LANGUAGE plpgsql;
|
||||||
|
|
|
@ -268,3 +268,8 @@ FETCH BACKWARD noHoldCursor;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
FETCH ABSOLUTE 5 FROM noHoldCursor;
|
FETCH ABSOLUTE 5 FROM noHoldCursor;
|
||||||
ERROR: cursor "noholdcursor" does not exist
|
ERROR: cursor "noholdcursor" does not exist
|
||||||
|
-- Test we don't throw an error for DROP IF EXISTS
|
||||||
|
DROP DATABASE IF EXISTS not_existing_database;
|
||||||
|
NOTICE: database "not_existing_database" does not exist, skipping
|
||||||
|
DROP TABLE IF EXISTS not_existing_table;
|
||||||
|
NOTICE: table "not_existing_table" does not exist, skipping
|
||||||
|
|
|
@ -428,8 +428,8 @@ ORDER BY 2 DESC, 1;
|
||||||
5 | 14
|
5 | 14
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
-- non-partition key joins are not supported inside subquery
|
-- non-partition key joins are supported inside subquery
|
||||||
-- since the join with a table
|
-- via pull-push execution
|
||||||
SELECT * FROM
|
SELECT * FROM
|
||||||
(SELECT ru.user_id, count(*)
|
(SELECT ru.user_id, count(*)
|
||||||
FROM recent_users ru
|
FROM recent_users ru
|
||||||
|
@ -438,7 +438,13 @@ SELECT * FROM
|
||||||
GROUP BY ru.user_id
|
GROUP BY ru.user_id
|
||||||
ORDER BY 2 DESC, 1) s1
|
ORDER BY 2 DESC, 1) s1
|
||||||
ORDER BY 2 DESC, 1;
|
ORDER BY 2 DESC, 1;
|
||||||
ERROR: bogus varno: 3
|
user_id | count
|
||||||
|
---------+-------
|
||||||
|
1 | 24
|
||||||
|
3 | 23
|
||||||
|
5 | 7
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
-- join between views
|
-- join between views
|
||||||
-- recent users who has an event in recent events
|
-- recent users who has an event in recent events
|
||||||
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
|
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
|
||||||
|
@ -533,7 +539,8 @@ SELECT * FROM
|
||||||
ON(ru.user_id = et.event_type)
|
ON(ru.user_id = et.event_type)
|
||||||
) s1
|
) s1
|
||||||
ORDER BY 2 DESC, 1;
|
ORDER BY 2 DESC, 1;
|
||||||
ERROR: bogus varno: 3
|
ERROR: cannot pushdown the subquery
|
||||||
|
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
|
||||||
-- create a select only view
|
-- create a select only view
|
||||||
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
|
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
|
||||||
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
|
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
|
||||||
|
@ -863,7 +870,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: remote_scan."time" DESC
|
Sort Key: remote_scan."time" DESC
|
||||||
-> Custom Scan (Citus Real-Time)
|
-> Custom Scan (Citus Real-Time)
|
||||||
-> Distributed Subplan 95_1
|
-> Distributed Subplan 96_1
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: max((max(remote_scan.lastseen))) DESC
|
Sort Key: max((max(remote_scan.lastseen))) DESC
|
||||||
|
|
|
@ -428,8 +428,8 @@ ORDER BY 2 DESC, 1;
|
||||||
5 | 14
|
5 | 14
|
||||||
(3 rows)
|
(3 rows)
|
||||||
|
|
||||||
-- non-partition key joins are not supported inside subquery
|
-- non-partition key joins are supported inside subquery
|
||||||
-- since the join with a table
|
-- via pull-push execution
|
||||||
SELECT * FROM
|
SELECT * FROM
|
||||||
(SELECT ru.user_id, count(*)
|
(SELECT ru.user_id, count(*)
|
||||||
FROM recent_users ru
|
FROM recent_users ru
|
||||||
|
@ -438,7 +438,13 @@ SELECT * FROM
|
||||||
GROUP BY ru.user_id
|
GROUP BY ru.user_id
|
||||||
ORDER BY 2 DESC, 1) s1
|
ORDER BY 2 DESC, 1) s1
|
||||||
ORDER BY 2 DESC, 1;
|
ORDER BY 2 DESC, 1;
|
||||||
ERROR: bogus varno: 3
|
user_id | count
|
||||||
|
---------+-------
|
||||||
|
1 | 24
|
||||||
|
3 | 23
|
||||||
|
5 | 7
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
-- join between views
|
-- join between views
|
||||||
-- recent users who has an event in recent events
|
-- recent users who has an event in recent events
|
||||||
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
|
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
|
||||||
|
@ -533,7 +539,8 @@ SELECT * FROM
|
||||||
ON(ru.user_id = et.event_type)
|
ON(ru.user_id = et.event_type)
|
||||||
) s1
|
) s1
|
||||||
ORDER BY 2 DESC, 1;
|
ORDER BY 2 DESC, 1;
|
||||||
ERROR: bogus varno: 3
|
ERROR: cannot pushdown the subquery
|
||||||
|
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
|
||||||
-- create a select only view
|
-- create a select only view
|
||||||
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
|
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
|
||||||
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
|
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
|
||||||
|
@ -865,7 +872,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: remote_scan."time" DESC
|
Sort Key: remote_scan."time" DESC
|
||||||
-> Custom Scan (Citus Real-Time)
|
-> Custom Scan (Citus Real-Time)
|
||||||
-> Distributed Subplan 95_1
|
-> Distributed Subplan 96_1
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Sort
|
-> Sort
|
||||||
Sort Key: max((max(remote_scan.lastseen))) DESC
|
Sort Key: max((max(remote_scan.lastseen))) DESC
|
||||||
|
|
|
@ -20,8 +20,16 @@ setup
|
||||||
SELECT citus.replace_isolation_tester_func();
|
SELECT citus.replace_isolation_tester_func();
|
||||||
SELECT citus.refresh_isolation_tester_prepared_statement();
|
SELECT citus.refresh_isolation_tester_prepared_statement();
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57637);
|
-- start_metadata_sync_to_node can not be run inside a transaction block
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
-- following is a workaround to overcome that
|
||||||
|
-- port numbers are hard coded at the moment
|
||||||
|
SELECT master_run_on_worker(
|
||||||
|
ARRAY['localhost']::text[],
|
||||||
|
ARRAY[57636]::int[],
|
||||||
|
ARRAY[format('SELECT start_metadata_sync_to_node(''%s'', %s)', nodename, nodeport)]::text[],
|
||||||
|
false)
|
||||||
|
FROM pg_dist_node;
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO 1;
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.replication_model to streaming;
|
SET citus.replication_model to streaming;
|
||||||
|
|
||||||
|
@ -112,6 +120,11 @@ step "s1-stop-connection"
|
||||||
SELECT stop_session_level_connection_to_node();
|
SELECT stop_session_level_connection_to_node();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-update-on-the-coordinator"
|
||||||
|
{
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-commit"
|
step "s1-commit"
|
||||||
{
|
{
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -119,6 +132,11 @@ step "s1-commit"
|
||||||
|
|
||||||
session "s2"
|
session "s2"
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-start-session-level-connection"
|
step "s2-start-session-level-connection"
|
||||||
{
|
{
|
||||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||||
|
@ -164,6 +182,11 @@ step "s2-stop-connection"
|
||||||
SELECT stop_session_level_connection_to_node();
|
SELECT stop_session_level_connection_to_node();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s2-update-on-the-coordinator"
|
||||||
|
{
|
||||||
|
UPDATE tt1 SET value_1 = 4;
|
||||||
|
}
|
||||||
|
|
||||||
step "s2-commit-worker"
|
step "s2-commit-worker"
|
||||||
{
|
{
|
||||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
@ -176,6 +199,33 @@ step "s3-select-distributed-waiting-queries"
|
||||||
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
|
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
# session s1 and s4 executes the commands on the same worker node
|
||||||
|
session "s4"
|
||||||
|
|
||||||
|
step "s4-start-session-level-connection"
|
||||||
|
{
|
||||||
|
SELECT start_session_level_connection_to_node('localhost', 57637);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s4-begin-on-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s4-update-dist-table"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s4-stop-connection"
|
||||||
|
{
|
||||||
|
SELECT stop_session_level_connection_to_node();
|
||||||
|
}
|
||||||
|
step "s4-commit-worker"
|
||||||
|
{
|
||||||
|
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||||
|
}
|
||||||
|
|
||||||
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-commit-worker" "s2-stop-connection"
|
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-commit-worker" "s2-stop-connection"
|
||||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||||
|
@ -187,3 +237,9 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to
|
||||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||||
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
|
||||||
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s3-select-distributed-waiting-queries" "s2-commit-worker" "s1-commit" "s2-stop-connection"
|
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s3-select-distributed-waiting-queries" "s2-commit-worker" "s1-commit" "s2-stop-connection"
|
||||||
|
|
||||||
|
# make sure that multi-shard modification queries
|
||||||
|
# show up in the waiting processes even if they are
|
||||||
|
# blocked on the same node
|
||||||
|
permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit"
|
||||||
|
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection"
|
||||||
|
|
|
@ -20,8 +20,16 @@ setup
|
||||||
SELECT citus.replace_isolation_tester_func();
|
SELECT citus.replace_isolation_tester_func();
|
||||||
SELECT citus.refresh_isolation_tester_prepared_statement();
|
SELECT citus.refresh_isolation_tester_prepared_statement();
|
||||||
|
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57637);
|
-- start_metadata_sync_to_node can not be run inside a transaction block
|
||||||
SELECT start_metadata_sync_to_node('localhost', 57638);
|
-- following is a workaround to overcome that
|
||||||
|
-- port numbers are hard coded at the moment
|
||||||
|
SELECT master_run_on_worker(
|
||||||
|
ARRAY['localhost']::text[],
|
||||||
|
ARRAY[57636]::int[],
|
||||||
|
ARRAY[format('SELECT start_metadata_sync_to_node(''%s'', %s)', nodename, nodeport)]::text[],
|
||||||
|
false)
|
||||||
|
FROM pg_dist_node;
|
||||||
|
|
||||||
SET citus.replication_model to streaming;
|
SET citus.replication_model to streaming;
|
||||||
|
|
||||||
CREATE TABLE ref_table(user_id int, value_1 int);
|
CREATE TABLE ref_table(user_id int, value_1 int);
|
||||||
|
|
|
@ -65,19 +65,23 @@ SELECT * FROM dml_test ORDER BY id ASC;
|
||||||
-- fail at PREPARE TRANSACTION
|
-- fail at PREPARE TRANSACTION
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
|
||||||
|
|
||||||
-- hide the error message (it has the PID)...
|
-- this transaction block will be sent to the coordinator as a remote command to hide the
|
||||||
|
-- error message that is caused during commit.
|
||||||
-- we'll test for the txn side-effects to ensure it didn't run
|
-- we'll test for the txn side-effects to ensure it didn't run
|
||||||
SET client_min_messages TO FATAL;
|
SELECT master_run_on_worker(
|
||||||
|
ARRAY['localhost']::text[],
|
||||||
|
ARRAY[:master_port]::int[],
|
||||||
|
ARRAY['
|
||||||
BEGIN;
|
BEGIN;
|
||||||
DELETE FROM dml_test WHERE id = 1;
|
DELETE FROM dml_test WHERE id = 1;
|
||||||
DELETE FROM dml_test WHERE id = 2;
|
DELETE FROM dml_test WHERE id = 2;
|
||||||
INSERT INTO dml_test VALUES (5, 'Epsilon');
|
INSERT INTO dml_test VALUES (5, ''Epsilon'');
|
||||||
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
|
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
|
||||||
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
|
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
'],
|
||||||
SET client_min_messages TO DEFAULT;
|
false
|
||||||
|
);
|
||||||
|
|
||||||
SELECT citus.mitmproxy('conn.allow()');
|
SELECT citus.mitmproxy('conn.allow()');
|
||||||
SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3;
|
SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3;
|
||||||
|
|
|
@ -17,22 +17,11 @@ SELECT create_distributed_table('test_table','id');
|
||||||
-- Populate data to the table
|
-- Populate data to the table
|
||||||
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
|
||||||
|
|
||||||
-- Create a function to make sure that queries returning the same result
|
|
||||||
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
|
||||||
BEGIN
|
|
||||||
EXECUTE query;
|
|
||||||
EXCEPTION WHEN OTHERS THEN
|
|
||||||
IF SQLERRM LIKE 'failed to execute task%' THEN
|
|
||||||
RAISE 'Task failed to execute';
|
|
||||||
END IF;
|
|
||||||
END;
|
|
||||||
$$LANGUAGE plpgsql;
|
|
||||||
|
|
||||||
-- Kill when the first COPY command arrived, since we have a single placement
|
-- Kill when the first COPY command arrived, since we have a single placement
|
||||||
-- it is expected to error out.
|
-- it is expected to error out.
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
|
||||||
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
|
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
|
||||||
SET client_min_messages TO DEFAULT;
|
SET client_min_messages TO DEFAULT;
|
||||||
|
|
||||||
-- Kill the connection with a CTE
|
-- Kill the connection with a CTE
|
||||||
|
@ -46,7 +35,7 @@ WHERE test_table.id = results.id;
|
||||||
-- killing connection after first successful query should break.
|
-- killing connection after first successful query should break.
|
||||||
SET client_min_messages TO ERROR;
|
SET client_min_messages TO ERROR;
|
||||||
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
|
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
|
||||||
SELECT raise_failed_execution('WITH
|
SELECT public.raise_failed_execution('WITH
|
||||||
results AS (SELECT * FROM test_table)
|
results AS (SELECT * FROM test_table)
|
||||||
SELECT * FROM test_table, results
|
SELECT * FROM test_table, results
|
||||||
WHERE test_table.id = results.id');
|
WHERE test_table.id = results.id');
|
||||||
|
|
|
@ -70,7 +70,7 @@ SELECT count(*) FROM test;
|
||||||
SELECT count(*) FROM test WHERE id = 1;
|
SELECT count(*) FROM test WHERE id = 1;
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||||
|
@ -94,7 +94,7 @@ SELECT count(*) FROM test;
|
||||||
SELECT count(*) FROM test WHERE id = 1;
|
SELECT count(*) FROM test WHERE id = 1;
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
|
|
||||||
-- test re-partition query (needs to transmit intermediate results)
|
-- test re-partition query (needs to transmit intermediate results)
|
||||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||||
|
@ -115,7 +115,7 @@ SELECT count(*) FROM test;
|
||||||
SELECT count(*) FROM test WHERE id = 1;
|
SELECT count(*) FROM test WHERE id = 1;
|
||||||
|
|
||||||
SET citus.task_executor_type TO 'task-tracker';
|
SET citus.task_executor_type TO 'task-tracker';
|
||||||
SELECT count(*) FROM test;
|
SELECT count(*), min(current_user) FROM test;
|
||||||
|
|
||||||
-- test re-partition query
|
-- test re-partition query
|
||||||
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
|
||||||
|
|
|
@ -220,9 +220,11 @@ SET citus.max_task_string_size TO 20000;
|
||||||
-- error message may vary between executions
|
-- error message may vary between executions
|
||||||
-- hiding warning and error message
|
-- hiding warning and error message
|
||||||
-- no output means the query has failed
|
-- no output means the query has failed
|
||||||
SET client_min_messages to FATAL;
|
SET client_min_messages to ERROR;
|
||||||
|
|
||||||
|
SELECT raise_failed_execution('
|
||||||
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||||
|
');
|
||||||
|
|
||||||
-- following will succeed since it fetches few columns
|
-- following will succeed since it fetches few columns
|
||||||
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
|
||||||
|
|
|
@ -106,3 +106,14 @@ ORDER BY a.attrelid, a.attnum;
|
||||||
|
|
||||||
$desc_views$
|
$desc_views$
|
||||||
);
|
);
|
||||||
|
|
||||||
|
-- Create a function to make sure that queries returning the same result
|
||||||
|
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE query;
|
||||||
|
EXCEPTION WHEN OTHERS THEN
|
||||||
|
IF SQLERRM LIKE 'failed to execute task%' THEN
|
||||||
|
RAISE 'Task failed to execute';
|
||||||
|
END IF;
|
||||||
|
END;
|
||||||
|
$$LANGUAGE plpgsql;
|
||||||
|
|
|
@ -146,3 +146,7 @@ FETCH ABSOLUTE 5 FROM noHoldCursor;
|
||||||
FETCH BACKWARD noHoldCursor;
|
FETCH BACKWARD noHoldCursor;
|
||||||
COMMIT;
|
COMMIT;
|
||||||
FETCH ABSOLUTE 5 FROM noHoldCursor;
|
FETCH ABSOLUTE 5 FROM noHoldCursor;
|
||||||
|
|
||||||
|
-- Test we don't throw an error for DROP IF EXISTS
|
||||||
|
DROP DATABASE IF EXISTS not_existing_database;
|
||||||
|
DROP TABLE IF EXISTS not_existing_table;
|
||||||
|
|
|
@ -220,8 +220,8 @@ SELECT * FROM
|
||||||
ORDER BY 2 DESC, 1) s1
|
ORDER BY 2 DESC, 1) s1
|
||||||
ORDER BY 2 DESC, 1;
|
ORDER BY 2 DESC, 1;
|
||||||
|
|
||||||
-- non-partition key joins are not supported inside subquery
|
-- non-partition key joins are supported inside subquery
|
||||||
-- since the join with a table
|
-- via pull-push execution
|
||||||
SELECT * FROM
|
SELECT * FROM
|
||||||
(SELECT ru.user_id, count(*)
|
(SELECT ru.user_id, count(*)
|
||||||
FROM recent_users ru
|
FROM recent_users ru
|
||||||
|
|
|
@ -11,13 +11,13 @@
|
||||||
#define CITUS_MAJORVERSION "8.0"
|
#define CITUS_MAJORVERSION "8.0"
|
||||||
|
|
||||||
/* Citus version as a string */
|
/* Citus version as a string */
|
||||||
#define CITUS_VERSION "8.0devel"
|
#define CITUS_VERSION "8.0.3"
|
||||||
|
|
||||||
/* Citus version as a number */
|
/* Citus version as a number */
|
||||||
#define CITUS_VERSION_NUM 80000
|
#define CITUS_VERSION_NUM 80000
|
||||||
|
|
||||||
/* A string containing the version number, platform, and C compiler */
|
/* A string containing the version number, platform, and C compiler */
|
||||||
#define CITUS_VERSION_STR "Citus 8.0devel on x86_64-windows, compiled by Visual Studio"
|
#define CITUS_VERSION_STR "Citus 8.0.3 on x86_64-windows, compiled by Visual Studio"
|
||||||
|
|
||||||
/* Define to 1 if you have the `curl' library (-lcurl). */
|
/* Define to 1 if you have the `curl' library (-lcurl). */
|
||||||
#define HAVE_LIBCURL 0
|
#define HAVE_LIBCURL 0
|
||||||
|
|
Loading…
Reference in New Issue