Compare commits

...

24 Commits
main ... v8.0.3

Author SHA1 Message Date
Hanefi Onaldi 20f84f2396
allow PG9.6 CI test failures 2019-01-09 09:48:53 +03:00
Hanefi Onaldi 32b8235da3
Bump version to 8.0.3 2019-01-09 09:37:12 +03:00
Hanefi Onaldi 16bb2c618f
Add changelog entry for 8.0.3 2019-01-09 09:34:06 +03:00
Murat Tuncer 54a893523e
Move repeated code to a function 2019-01-09 09:31:05 +03:00
Murat Tuncer 57d51b280e
Fix multi_view tests 2019-01-09 09:31:04 +03:00
Murat Tuncer ba00e930ea
Fix having clause bug for complex joins
We update column attributes of various clauses for a query
inluding target columns, select clauses when we introduce
new range table entries in the query.

It seems having clause column attributes were not updated.

This fix resolves the issue
2019-01-09 09:31:04 +03:00
Murat Tuncer c5371965f6
Make sure spinlock is not left unreleased when an exception is thrown
A spinlock is not released when an exception is thrown after
spinlock is acquired. This has caused infinite wait and eventual
crash in maintenance daemon.

This work moves the code than can fail to the outside of spinlock
scope so that in the case of failure spinlock is not left locked
since it was not locked in the first place.
2019-01-09 09:31:03 +03:00
Hanefi Onaldi 1e3969cae1 Bump version to 8.0.2 2018-12-13 16:22:48 +03:00
Hanefi Onaldi 0fa6049cce Add changelog entry for 8.0.2 2018-12-13 16:19:03 +03:00
Burak Yucesoy 2a1ae6489b Fix crashes caused by stack size increase under high memory load
Each PostgreSQL backend starts with a predefined amount of stack and this stack
size can be increased if there is a need. However, stack size increase during
high memory load may cause unexpected crashes, because if there is not enough
memory for stack size increase, there is nothing to do for process apart from
crashing. An interesting thing is; the process would get OOM error instead of
crash, if the process had an explicit memory request (with palloc) for example.
However, in the case of stack size increase, there is no system call to get OOM
error, so the process simply crashes.

With this change, we are increasing the stack size explicitly by requesting extra
memory from the stack, so that, even if there is not memory, we can at least get
an OOM instead of a crash.
2018-12-13 10:46:28 +03:00
Onder Kalaci 507964dde3 Ensure to use initialized MaxBackends
Postgresql loads shared libraries before calculating MaxBackends.
However, Citus relies on MaxBackends being set. Thus, with this
commit we use the same steps to calculate MaxBackends while
Citus is being loaded (e.g., PG_Init is called).

Note that this is safe since all the elements that are used to
calculate MaxBackends are PGC_POSTMASTER gucs and a constant
value.
2018-12-13 10:44:19 +03:00
Jason Petersen 2de7b85b89
Bump version to 8.0.1 2018-11-28 00:42:05 -07:00
Jason Petersen 0b0c0fef25
Add changelog entry for 8.0.1 2018-11-27 23:27:25 -07:00
Marco Slot 606e2b18d7
Merge pull request #2491 from citusdata/backport_copytt_80
Backport #2487 to release-8.0
2018-11-23 11:21:43 +01:00
Nils Dijk 67f058c5f6 Description: Fix failures of tests on recent postgres builds
In recent postgres builds you cannot set client_min_messages to
values higher then ERROR, if will silently set it to ERROR if so.

During some tests we would set it to fatal to hide random values
(eg. pid's of processes) from the test output. This patch will use
different tactics for hiding these values.
2018-11-22 19:52:43 +01:00
Marco Slot 4392cc2f9c Test current user in task-tracker queries 2018-11-22 19:39:23 +01:00
Marco Slot ca8a4dc735 COPY to a task file no longer switches to superuser 2018-11-22 19:38:59 +01:00
velioglu da0b98c991 Bump Citus version to 8.0.0 2018-10-31 14:45:36 +03:00
velioglu 480797e600 Add changelog entry for 8.0.0 2018-10-31 14:36:25 +03:00
Onder Kalaci 2311a3614a Make sure to access PARAM_EXTERN accurately in PG 11
PG 11 has change the way that PARAM_EXTERN is processed.
This commit ensures that Citus follows the same pattern.

For details see the related Postgres commit:
6719b238e8
2018-10-26 11:35:09 +03:00
Hadi Moshayedi c22cbb7a13 Don't throw error for DROP DATABASE IF EXISTS 2018-10-25 21:14:46 +03:00
Onder Kalaci 5d805dba27 Processes that are blocked on advisory locks show up in wait edges
Assign the distributed transaction id before trying to acquire the
executor advisory locks. This is useful to show this backend in citus
lock graphs (e.g., dump_global_wait_edges() and citus_lock_waits).
2018-10-24 14:01:30 +03:00
Murat Tuncer b08106b5cf Don't allow PG11 travis failures anymore
We made PG11 builds optional when we had an issue
with mx isolation test that we could not solve back then.

This commit solves the issue with a workaround by running
start_metadata_sync_to_node outside the transaction block.
2018-10-19 16:57:48 +03:00
Jason Petersen 87817aec9d Attempt to address planner context crashes
Both of these are a bit of a shot in the dark. In one case, we noticed
a stack trace where a caller received a null pointer and attempted to
dereference the memory context field (at 0x010). In the other, I saw
that any error thrown from within AdjustParseTree could keep the stack
from being cleaned up (presumably if we push we should always pop).

Both stack traces were collected during times of high memory pressure
and locally reproducing the problem locally or otherwise has been very
tricky (i.e. it hasn't been reproduced reliably at all).
2018-10-19 10:24:07 +03:00
41 changed files with 916 additions and 211 deletions

View File

@ -21,8 +21,8 @@ matrix:
- env: PGVERSION=9.6
- env: PGVERSION=10
- env: PGVERSION=11
allow_failures:
- env: PGVERSION=11
allow_failures:
- env: PGVERSION=9.6
before_install:
- git clone -b v0.7.9 --depth 1 https://github.com/citusdata/tools.git
- sudo make -C tools install

View File

@ -1,3 +1,85 @@
### citus v8.0.3 (January 9, 2019) ###
* Fixes maintenance daemon panic due to unreleased spinlock
* Fixes an issue with having clause when used with complex joins
### citus v8.0.2 (December 13, 2018) ###
* Fixes a bug that could cause maintenance daemon panic
* Fixes crashes caused by stack size increase under high memory load
### citus v7.5.4 (December 11, 2018) ###
* Fixes a bug that could cause maintenance daemon panic
### citus v8.0.1 (November 27, 2018) ###
* Execute SQL tasks using worker_execute_sql_task UDF when using task-tracker
### citus v7.5.3 (November 27, 2018) ###
* Execute SQL tasks using worker_execute_sql_task UDF when using task-tracker
### citus v7.5.2 (November 14, 2018) ###
* Fixes inconsistent metadata error when shard metadata caching get interrupted
* Fixes a bug that could cause memory leak
* Fixes a bug that prevents recovering wrong transactions in MX
* Fixes a bug to prevent wrong memory accesses on Citus MX under very high load
* Fixes crashes caused by stack size increase under high memory load
### citus v8.0.0 (October 31, 2018) ###
* Adds support for PostgreSQL 11
* Adds support for applying DML operations on reference tables from MX nodes
* Adds distributed locking to truncated MX tables
* Adds support for running TRUNCATE command from MX worker nodes
* Adds views to provide insight about the distributed transactions
* Adds support for TABLESAMPLE in router queries
* Adds support for INCLUDE option in index creation
* Adds option to allow simple DML commands from hot standby
* Adds support for partitioned tables with replication factor > 1
* Prevents a deadlock on concurrent DROP TABLE and SELECT on Citus MX
* Fixes a bug that prevents recovering wrong transactions in MX
* Fixes a bug to prevent wrong memory accesses on Citus MX under very high load
* Fixes a bug in MX mode, calling DROP SCHEMA with existing partitioned table
* Fixes a bug that could cause modifying CTEs to select wrong execution mode
* Fixes a bug preventing rollback in CREATE PROCEDURE
* Fixes a bug on not being able to drop index on a partitioned table
* Fixes a bug on TRUNCATE when there is a foreign key to a reference table
* Fixes a performance issue in prepared INSERT..SELECT
* Fixes a bug which causes errors on DROP DATABASE IF EXISTS
* Fixes a bug to remove intermediate result directory in pull-push execution
* Improves query pushdown planning performance
* Evaluate functions anywhere in query
### citus v7.5.1 (August 28, 2018) ###
* Improves query pushdown planning performance
@ -68,7 +150,7 @@
* Removes broadcast join logic
* Deprecates `large_table_shard_count` and `master_expire_table_cache()`
* Deprecates `large_table_shard_count` and `master_expire_table_cache()`
* Modifies master_update_node to lock write on shards hosted by node over update

View File

@ -116,6 +116,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/worker/worker_file_access_protocol.o \
src/backend/distributed/worker/worker_merge_protocol.o \
src/backend/distributed/worker/worker_partition_protocol.o \
src/backend/distributed/worker/worker_sql_task_protocol.o \
src/backend/distributed/worker/worker_truncate_trigger_protocol.o \
$(WIN32RES)

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 8.0devel.
# Generated by GNU Autoconf 2.69 for Citus 8.0.3.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='8.0devel'
PACKAGE_STRING='Citus 8.0devel'
PACKAGE_VERSION='8.0.3'
PACKAGE_STRING='Citus 8.0.3'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1239,7 +1239,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 8.0devel to adapt to many kinds of systems.
\`configure' configures Citus 8.0.3 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1300,7 +1300,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 8.0devel:";;
short | recursive ) echo "Configuration of Citus 8.0.3:";;
esac
cat <<\_ACEOF
@ -1400,7 +1400,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 8.0devel
Citus configure 8.0.3
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1883,7 +1883,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 8.0devel, which was
It was created by Citus $as_me 8.0.3, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -4701,7 +4701,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 8.0devel, which was
This file was extended by Citus $as_me 8.0.3, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -4763,7 +4763,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 8.0devel
Citus config.status 8.0.3
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [8.0devel])
AC_INIT([Citus], [8.0.3])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -338,6 +338,18 @@ StubRelation(TupleDesc tupleDescriptor)
void
ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params,
DestReceiver *dest)
{
Query *query = ParseQueryString(queryString);
ExecuteQueryIntoDestReceiver(query, params, dest);
}
/*
* ParseQuery parses query string and returns a Query struct.
*/
Query *
ParseQueryString(const char *queryString)
{
Query *query = NULL;
@ -356,7 +368,7 @@ ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo params
query = (Query *) linitial(queryTreeList);
ExecuteQueryIntoDestReceiver(query, params, dest);
return query;
}

View File

@ -1351,6 +1351,13 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
LockPartitionRelations(firstShardInterval->relationId, RowExclusiveLock);
}
/*
* Assign the distributed transaction id before trying to acquire the
* executor advisory locks. This is useful to show this backend in citus
* lock graphs (e.g., dump_global_wait_edges() and citus_lock_waits).
*/
BeginOrContinueCoordinatedTransaction();
/*
* Ensure that there are no concurrent modifications on the same
* shards. In general, for DDL commands, we already obtained the
@ -1362,8 +1369,6 @@ ExecuteModifyTasks(List *taskList, bool expectResults, ParamListInfo paramListIn
*/
AcquireExecutorMultiShardLocks(taskList);
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
firstTask->replicationModel == REPLICATION_MODEL_2PC)
{

View File

@ -117,10 +117,11 @@ static bool IsCitusExtensionStmt(Node *parsetree);
static bool IsTransmitStmt(Node *parsetree);
static void VerifyTransmitStmt(CopyStmt *copyStatement);
static bool IsCopyResultStmt(CopyStmt *copyStatement);
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
/* Local functions forward declarations for processing distributed table commands */
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner);
const char *queryString);
static void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
static void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
static List * PlanIndexStmt(IndexStmt *createIndexStatement,
@ -176,7 +177,6 @@ static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg);
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static void PostProcessUtility(Node *parsetree);
static List * CollectGrantTableIdList(GrantStmt *grantStmt);
@ -257,9 +257,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
char *completionTag)
{
Node *parsetree = pstmt->utilityStmt;
bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
List *ddlJobs = NIL;
bool checkExtensionVersion = false;
@ -369,7 +366,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
parsetree = copyObject(parsetree);
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
&commandMustRunAsOwner);
queryString);
previousContext = MemoryContextSwitchTo(planContext);
parsetree = copyObject(parsetree);
@ -549,18 +546,15 @@ multi_ProcessUtility(PlannedStmt *pstmt,
*/
if (IsA(parsetree, DropdbStmt))
{
const bool missingOK = true;
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
char *dbname = dropDbStatement->dbname;
Oid databaseOid = get_database_oid(dbname, false);
Oid databaseOid = get_database_oid(dbname, missingOK);
StopMaintenanceDaemon(databaseOid);
}
/* set user if needed and go ahead and run local utility using standard hook */
if (commandMustRunAsOwner)
{
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
if (databaseOid != InvalidOid)
{
StopMaintenanceDaemon(databaseOid);
}
}
#if (PG_VERSION_NUM >= 100000)
@ -601,11 +595,6 @@ multi_ProcessUtility(PlannedStmt *pstmt,
PostProcessUtility(parsetree);
}
if (commandMustRunAsOwner)
{
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}
/*
* Re-forming the foreign key graph relies on the command being executed
* on the local table first. However, in order to decide whether the
@ -967,9 +956,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
*/
static bool
IsCopyResultStmt(CopyStmt *copyStatement)
{
return CopyStatementHasFormat(copyStatement, "result");
}
/*
* CopyStatementHasFormat checks whether the COPY statement has the given
* format.
*/
static bool
CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName)
{
ListCell *optionCell = NULL;
bool hasFormatReceive = false;
bool hasFormat = false;
/* extract WITH (...) options from the COPY statement */
foreach(optionCell, copyStatement->options)
@ -977,14 +977,14 @@ IsCopyResultStmt(CopyStmt *copyStatement)
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
strncmp(defGetString(defel), formatName, NAMEDATALEN) == 0)
{
hasFormatReceive = true;
hasFormat = true;
break;
}
}
return hasFormatReceive;
return hasFormat;
}
@ -993,18 +993,10 @@ IsCopyResultStmt(CopyStmt *copyStatement)
* COPYing from distributed tables and preventing unsupported actions. The
* function returns a modified COPY statement to be executed, or NULL if no
* further processing is needed.
*
* commandMustRunAsOwner is an output parameter used to communicate to the caller whether
* the copy statement should be executed using elevated privileges. If
* ProcessCopyStmt that is required, a call to CheckCopyPermissions will take
* care of verifying the current user's permissions before ProcessCopyStmt
* returns.
*/
static Node *
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner)
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString)
{
*commandMustRunAsOwner = false; /* make sure variable is initialized */
/*
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
* for sending intermediate results to workers.
@ -1110,50 +1102,48 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
}
}
if (copyStatement->filename != NULL && !copyStatement->is_program)
{
const char *filename = copyStatement->filename;
char *filename = copyStatement->filename;
if (CacheDirectoryElement(filename))
/*
* We execute COPY commands issued by the task-tracker executor here
* because we're not normally allowed to write to a file as a regular
* user and we don't want to execute the query as superuser.
*/
if (CacheDirectoryElement(filename) && copyStatement->query != NULL &&
!copyStatement->is_from && !is_absolute_path(filename))
{
/*
* Only superusers are allowed to copy from a file, so we have to
* become superuser to execute copies to/from files used by citus'
* query execution.
*
* XXX: This is a decidedly suboptimal solution, as that means
* that triggers, input functions, etc. run with elevated
* privileges. But this is better than not being able to run
* queries as normal user.
*/
*commandMustRunAsOwner = true;
bool binaryCopyFormat = CopyStatementHasFormat(copyStatement, "binary");
int64 tuplesSent = 0;
Query *query = NULL;
Node *queryNode = copyStatement->query;
List *queryTreeList = NIL;
/*
* Have to manually check permissions here as the COPY is will be
* run as a superuser.
*/
if (copyStatement->relation != NULL)
#if (PG_VERSION_NUM >= 100000)
RawStmt *rawStmt = makeNode(RawStmt);
rawStmt->stmt = queryNode;
queryTreeList = pg_analyze_and_rewrite(rawStmt, queryString, NULL, 0, NULL);
#else
queryTreeList = pg_analyze_and_rewrite(queryNode, queryString, NULL, 0);
#endif
if (list_length(queryTreeList) != 1)
{
CheckCopyPermissions(copyStatement);
ereport(ERROR, (errmsg("can only execute a single query")));
}
/*
* Check if we have a "COPY (query) TO filename". If we do, copy
* doesn't accept relative file paths. However, SQL tasks that get
* assigned to worker nodes have relative paths. We therefore
* convert relative paths to absolute ones here.
*/
if (copyStatement->relation == NULL &&
!copyStatement->is_from &&
!is_absolute_path(filename))
{
copyStatement->filename = make_absolute_path(filename);
}
query = (Query *) linitial(queryTreeList);
tuplesSent = WorkerExecuteSqlTask(query, filename, binaryCopyFormat);
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, tuplesSent);
return NULL;
}
}
return (Node *) copyStatement;
}
@ -3913,7 +3903,7 @@ RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, voi
*
* Copied from postgres, where it's part of DoCopy().
*/
static void
void
CheckCopyPermissions(CopyStmt *copyStatement)
{
/* *INDENT-OFF* */

View File

@ -157,6 +157,10 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
result = CreateDistributedPlannedStmt(planId, result, originalQuery, parse,
boundParams, plannerRestrictionContext);
setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(parse,
setPartitionedTablesInherited);
}
}
PG_CATCH();
@ -166,13 +170,6 @@ distributed_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
}
PG_END_TRY();
if (needsDistributedPlanning)
{
setPartitionedTablesInherited = true;
AdjustPartitioningForDistributedPlanning(parse, setPartitionedTablesInherited);
}
/* remove the context from the context list */
PopPlannerRestrictionContext();
@ -1517,6 +1514,13 @@ CurrentPlannerRestrictionContext(void)
plannerRestrictionContext =
(PlannerRestrictionContext *) linitial(plannerRestrictionContextList);
if (plannerRestrictionContext == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
errmsg("planner restriction context stack was empty"),
errdetail("Please report this to the Citus core team.")));
}
return plannerRestrictionContext;
}
@ -1559,22 +1563,30 @@ HasUnresolvedExternParamsWalker(Node *expression, ParamListInfo boundParams)
/* check whether parameter is available (and valid) */
if (boundParams && paramId > 0 && paramId <= boundParams->numParams)
{
ParamExternData *externParam = &boundParams->params[paramId - 1];
Oid paramType = externParam->ptype;
ParamExternData *externParam = NULL;
Oid paramType = InvalidOid;
/* give hook a chance in case parameter is dynamic */
if (!OidIsValid(paramType) && boundParams->paramFetch != NULL)
if (boundParams->paramFetch != NULL)
{
#if (PG_VERSION_NUM >= 110000)
ParamExternData externParamPlaceholder;
externParam = (*boundParams->paramFetch)(boundParams, paramId, false,
&externParamPlaceholder);
#else
(*boundParams->paramFetch)(boundParams, paramId);
externParam = &boundParams->params[paramId - 1];
if (!OidIsValid(externParam->ptype))
{
(*boundParams->paramFetch)(boundParams, paramId);
}
#endif
paramType = externParam->ptype;
}
else
{
externParam = &boundParams->params[paramId - 1];
}
paramType = externParam->ptype;
if (OidIsValid(paramType))
{
return false;

View File

@ -106,6 +106,8 @@ static RangeTblEntry * DerivedRangeTableEntry(MultiNode *multiNode, List *column
List *tableIdList);
static List * DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId);
static Query * BuildSubqueryJobQuery(MultiNode *multiNode);
static void UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
List *dependedJobList);
static void UpdateColumnAttributes(Var *column, List *rangeTableList,
List *dependedJobList);
static Index NewTableId(Index originalTableId, List *rangeTableList);
@ -582,10 +584,8 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
List *sortClauseList = NIL;
List *groupClauseList = NIL;
List *selectClauseList = NIL;
List *columnList = NIL;
Node *limitCount = NULL;
Node *limitOffset = NULL;
ListCell *columnCell = NULL;
FromExpr *joinTree = NULL;
Node *joinRoot = NULL;
Node *havingQual = NULL;
@ -653,13 +653,7 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
/* update the column attributes for target entries */
if (updateColumnAttributes)
{
ListCell *columnCell = NULL;
List *columnList = pull_var_clause_default((Node *) targetList);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
}
UpdateAllColumnAttributes((Node *) targetList, rangeTableList, dependedJobList);
}
/* extract limit count/offset and sort clauses */
@ -679,16 +673,12 @@ BuildJobQuery(MultiNode *multiNode, List *dependedJobList)
/* build the where clause list using select predicates */
selectClauseList = QuerySelectClauseList(multiNode);
/* set correct column attributes for select columns */
/* set correct column attributes for select and having clauses */
if (updateColumnAttributes)
{
columnCell = NULL;
columnList = pull_var_clause_default((Node *) selectClauseList);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
}
UpdateAllColumnAttributes((Node *) selectClauseList, rangeTableList,
dependedJobList);
UpdateAllColumnAttributes(havingQual, rangeTableList, dependedJobList);
}
/*
@ -1529,6 +1519,25 @@ BuildSubqueryJobQuery(MultiNode *multiNode)
}
/*
* UpdateAllColumnAttributes extracts column references from provided columnContainer
* and calls UpdateColumnAttributes to updates the column's range table reference (varno) and
* column attribute number for the range table (varattno).
*/
static void
UpdateAllColumnAttributes(Node *columnContainer, List *rangeTableList,
List *dependedJobList)
{
ListCell *columnCell = NULL;
List *columnList = pull_var_clause_default(columnContainer);
foreach(columnCell, columnList)
{
Var *column = (Var *) lfirst(columnCell);
UpdateColumnAttributes(column, rangeTableList, dependedJobList);
}
}
/*
* UpdateColumnAttributes updates the column's range table reference (varno) and
* column attribute number for the range table (varattno). The function uses the

View File

@ -55,6 +55,7 @@
#include "postmaster/postmaster.h"
#include "optimizer/planner.h"
#include "optimizer/paths.h"
#include "tcop/tcopprot.h"
#include "utils/guc.h"
#include "utils/guc_tables.h"
@ -65,6 +66,7 @@ static char *CitusVersion = CITUS_VERSION;
void _PG_init(void);
static void ResizeStackToMaximumDepth(void);
static void multi_log_hook(ErrorData *edata);
static void CreateRequiredDirectories(void);
static void RegisterCitusConfigVariables(void);
@ -169,6 +171,8 @@ _PG_init(void)
"shared_preload_libraries.")));
}
ResizeStackToMaximumDepth();
/*
* Extend the database directory structure before continuing with
* initialization - one of the later steps might require them to exist.
@ -234,6 +238,35 @@ _PG_init(void)
}
/*
* Stack size increase during high memory load may cause unexpected crashes.
* With this alloca call, we are increasing stack size explicitly, so that if
* it is not possible to increase stack size, we will get an OOM error instead
* of a crash.
*
* This function is called on backend startup. The allocated memory will
* automatically be released at the end of the function's scope. However, we'd
* have already expanded the stack and it wouldn't shrink back. So, in a sense,
* per backend we're securing max_stack_depth kB's of memory on the stack upfront.
*
* Not all the backends require max_stack_depth kB's on the stack, so we might end
* up with unnecessary allocations. However, the default value is 2MB, which seems
* an acceptable trade-off. Also, allocating memory upfront may perform better
* under some circumstances.
*/
static void
ResizeStackToMaximumDepth(void)
{
#ifndef WIN32
volatile char *stack_resizer = NULL;
long max_stack_depth_bytes = max_stack_depth * 1024L;
stack_resizer = alloca(max_stack_depth_bytes);
stack_resizer[max_stack_depth_bytes - 1] = 0;
#endif
}
/*
* multi_log_hook intercepts postgres log commands. We use this to override
* postgres error messages when they're not specific enough for the users.

View File

@ -25,6 +25,7 @@
#include "distributed/remote_commands.h"
#include "distributed/transaction_identifier.h"
#include "nodes/execnodes.h"
#include "postmaster/autovacuum.h" /* to access autovacuum_max_workers */
#include "storage/ipc.h"
#include "storage/lmgr.h"
#include "storage/lwlock.h"
@ -512,6 +513,7 @@ BackendManagementShmemInit(void)
if (!alreadyInitialized)
{
int backendIndex = 0;
int totalProcs = 0;
char *trancheName = "Backend Management Tranche";
#if (PG_VERSION_NUM >= 100000)
@ -557,7 +559,8 @@ BackendManagementShmemInit(void)
* We also initiate initiatorNodeIdentifier to -1, which can never be
* used as a node id.
*/
for (backendIndex = 0; backendIndex < TotalProcs; ++backendIndex)
totalProcs = TotalProcCount();
for (backendIndex = 0; backendIndex < totalProcs; ++backendIndex)
{
backendManagementShmemData->backends[backendIndex].citusBackend.
initiatorNodeIdentifier = -1;
@ -582,14 +585,62 @@ static size_t
BackendManagementShmemSize(void)
{
Size size = 0;
int totalProcs = TotalProcCount();
size = add_size(size, sizeof(BackendManagementShmemData));
size = add_size(size, mul_size(sizeof(BackendData), TotalProcs));
size = add_size(size, mul_size(sizeof(BackendData), totalProcs));
return size;
}
/*
* TotalProcCount returns the total processes that could run via the current
* postgres server. See the details in the function comments.
*
* There is one thing we should warn the readers. Citus enforces to be loaded
* as the first extension in shared_preload_libraries. However, if any other
* extension overrides MaxConnections, autovacuum_max_workers or
* max_worker_processes, our reasoning in this function may not work as expected.
* Given that it is not a usual pattern for extension, we consider Citus' behaviour
* good enough for now.
*/
int
TotalProcCount(void)
{
int maxBackends = 0;
int totalProcs = 0;
#ifdef WIN32
/* autovacuum_max_workers is not PGDLLIMPORT, so use a high estimate for windows */
int estimatedMaxAutovacuumWorkers = 30;
maxBackends =
MaxConnections + estimatedMaxAutovacuumWorkers + 1 + max_worker_processes;
#else
/*
* We're simply imitating Postgrsql's InitializeMaxBackends(). Given that all
* the items used here PGC_POSTMASTER, should be safe to access them
* anytime during the execution even before InitializeMaxBackends() is called.
*/
maxBackends = MaxConnections + autovacuum_max_workers + 1 + max_worker_processes;
#endif
/*
* We prefer to maintain space for auxiliary procs or preperad transactions in
* the backend space because they could be blocking processes and our current
* implementation of distributed deadlock detection could process them
* as a regular backend. In the future, we could consider chaning deadlock
* detection algorithm to ignore auxiliary procs or preperad transactions and
* save same space.
*/
totalProcs = maxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts;
return totalProcs;
}
/*
* InitializeBackendData initialises MyBackendData to the shared memory segment
* belonging to the current backend.
@ -754,9 +805,15 @@ AssignDistributedTransactionId(void)
void
MarkCitusInitiatedCoordinatorBackend(void)
{
/*
* GetLocalGroupId may throw exception which can cause leaving spin lock
* unreleased. Calling GetLocalGroupId function before the lock to avoid this.
*/
int localGroupId = GetLocalGroupId();
SpinLockAcquire(&MyBackendData->mutex);
MyBackendData->citusBackend.initiatorNodeIdentifier = GetLocalGroupId();
MyBackendData->citusBackend.initiatorNodeIdentifier = localGroupId;
MyBackendData->citusBackend.transactionOriginator = true;
SpinLockRelease(&MyBackendData->mutex);

View File

@ -389,6 +389,7 @@ BuildLocalWaitGraph(void)
WaitGraph *waitGraph = NULL;
int curBackend = 0;
PROCStack remaining;
int totalProcs = TotalProcCount();
/*
* Try hard to avoid allocations while holding lock. Thus we pre-allocate
@ -398,12 +399,12 @@ BuildLocalWaitGraph(void)
*/
waitGraph = (WaitGraph *) palloc0(sizeof(WaitGraph));
waitGraph->localNodeId = GetLocalGroupId();
waitGraph->allocatedSize = TotalProcs * 3;
waitGraph->allocatedSize = totalProcs * 3;
waitGraph->edgeCount = 0;
waitGraph->edges = (WaitEdge *) palloc(waitGraph->allocatedSize * sizeof(WaitEdge));
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * TotalProcs);
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * TotalProcs);
remaining.procs = (PGPROC **) palloc(sizeof(PGPROC *) * totalProcs);
remaining.procAdded = (bool *) palloc0(sizeof(bool *) * totalProcs);
remaining.procCount = 0;
LockLockData();
@ -416,7 +417,7 @@ BuildLocalWaitGraph(void)
*/
/* build list of starting procs */
for (curBackend = 0; curBackend < TotalProcs; curBackend++)
for (curBackend = 0; curBackend < totalProcs; curBackend++)
{
PGPROC *currentProc = &ProcGlobal->allProcs[curBackend];
BackendData currentBackendData;
@ -762,7 +763,7 @@ AddProcToVisit(PROCStack *remaining, PGPROC *proc)
return;
}
Assert(remaining->procCount < TotalProcs);
Assert(remaining->procCount < TotalProcCount());
remaining->procs[remaining->procCount++] = proc;
remaining->procAdded[proc->pgprocno] = true;

View File

@ -31,6 +31,7 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_server_executor.h"
@ -766,6 +767,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
StringInfo queryString = NULL;
Oid sourceShardRelationId = InvalidOid;
Oid sourceSchemaId = InvalidOid;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
CheckCitusVersion(ERROR);
@ -829,9 +832,18 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
appendStringInfo(queryString, COPY_IN_COMMAND, shardQualifiedName,
localFilePath->data);
/* make sure we are allowed to execute the COPY command */
CheckCopyPermissions(localCopyCommand);
/* need superuser to copy from files */
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
CitusProcessUtility((Node *) localCopyCommand, queryString->data,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
/* finally delete the temporary file we created */
CitusDeleteFile(localFilePath->data);

View File

@ -0,0 +1,280 @@
/*-------------------------------------------------------------------------
*
* worker_sql_task_protocol.c
*
* Routines for executing SQL tasks during task-tracker execution.
*
* Copyright (c) 2012-2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "pgstat.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/memutils.h"
/* TaskFileDestReceiver can be used to stream results into a file */
typedef struct TaskFileDestReceiver
{
/* public DestReceiver interface */
DestReceiver pub;
/* descriptor of the tuples that are sent to the worker */
TupleDesc tupleDescriptor;
/* EState for per-tuple memory allocation */
EState *executorState;
/* MemoryContext for DestReceiver session */
MemoryContext memoryContext;
/* output file */
char *filePath;
File fileDesc;
bool binaryCopyFormat;
/* state on how to copy out data types */
CopyOutState copyOutState;
FmgrInfo *columnOutputFunctions;
/* number of tuples sent */
uint64 tuplesSent;
} TaskFileDestReceiver;
static DestReceiver * CreateTaskFileDestReceiver(char *filePath, EState *executorState,
bool binaryCopyFormat);
static void TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static bool TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void WriteToLocalFile(StringInfo copyData, File fileDesc);
static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
/*
* WorkerExecuteSqlTask executes an already-parsed query and writes the result
* to the given task file.
*/
int64
WorkerExecuteSqlTask(Query *query, char *taskFilename, bool binaryCopyFormat)
{
EState *estate = NULL;
TaskFileDestReceiver *taskFileDest = NULL;
ParamListInfo paramListInfo = NULL;
int64 tuplesSent = 0L;
estate = CreateExecutorState();
taskFileDest =
(TaskFileDestReceiver *) CreateTaskFileDestReceiver(taskFilename, estate,
binaryCopyFormat);
ExecuteQueryIntoDestReceiver(query, paramListInfo, (DestReceiver *) taskFileDest);
tuplesSent = taskFileDest->tuplesSent;
taskFileDest->pub.rDestroy((DestReceiver *) taskFileDest);
FreeExecutorState(estate);
return tuplesSent;
}
/*
* CreateTaskFileDestReceiver creates a DestReceiver for writing query results
* to a task file.
*/
static DestReceiver *
CreateTaskFileDestReceiver(char *filePath, EState *executorState, bool binaryCopyFormat)
{
TaskFileDestReceiver *taskFileDest = NULL;
taskFileDest = (TaskFileDestReceiver *) palloc0(sizeof(TaskFileDestReceiver));
/* set up the DestReceiver function pointers */
taskFileDest->pub.receiveSlot = TaskFileDestReceiverReceive;
taskFileDest->pub.rStartup = TaskFileDestReceiverStartup;
taskFileDest->pub.rShutdown = TaskFileDestReceiverShutdown;
taskFileDest->pub.rDestroy = TaskFileDestReceiverDestroy;
taskFileDest->pub.mydest = DestCopyOut;
/* set up output parameters */
taskFileDest->executorState = executorState;
taskFileDest->memoryContext = CurrentMemoryContext;
taskFileDest->filePath = pstrdup(filePath);
taskFileDest->binaryCopyFormat = binaryCopyFormat;
return (DestReceiver *) taskFileDest;
}
/*
* TaskFileDestReceiverStartup implements the rStartup interface of
* TaskFileDestReceiver. It opens the destination file and sets up
* the CopyOutState.
*/
static void
TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor)
{
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
CopyOutState copyOutState = NULL;
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
const int fileMode = (S_IRUSR | S_IWUSR);
/* use the memory context that was in place when the DestReceiver was created */
MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext);
taskFileDest->tupleDescriptor = inputTupleDescriptor;
/* define how tuples will be serialised */
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = taskFileDest->binaryCopyFormat;
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = GetPerTupleMemoryContext(taskFileDest->executorState);
taskFileDest->copyOutState = copyOutState;
taskFileDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor,
copyOutState->binary);
taskFileDest->fileDesc = FileOpenForTransmit(taskFileDest->filePath, fileFlags,
fileMode);
if (copyOutState->binary)
{
/* write headers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
}
MemoryContextSwitchTo(oldContext);
}
/*
* TaskFileDestReceiverReceive implements the receiveSlot function of
* TaskFileDestReceiver. It takes a TupleTableSlot and writes the contents
* to a local file.
*/
static bool
TaskFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
{
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) dest;
TupleDesc tupleDescriptor = taskFileDest->tupleDescriptor;
CopyOutState copyOutState = taskFileDest->copyOutState;
FmgrInfo *columnOutputFunctions = taskFileDest->columnOutputFunctions;
Datum *columnValues = NULL;
bool *columnNulls = NULL;
StringInfo copyData = copyOutState->fe_msgbuf;
EState *executorState = taskFileDest->executorState;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
slot_getallattrs(slot);
columnValues = slot->tts_values;
columnNulls = slot->tts_isnull;
resetStringInfo(copyData);
/* construct row in COPY format */
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions, NULL);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
MemoryContextSwitchTo(oldContext);
taskFileDest->tuplesSent++;
ResetPerTupleExprContext(executorState);
return true;
}
/*
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/
static void
WriteToLocalFile(StringInfo copyData, File fileDesc)
{
#if (PG_VERSION_NUM >= 100000)
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len, PG_WAIT_IO);
#else
int bytesWritten = FileWrite(fileDesc, copyData->data, copyData->len);
#endif
if (bytesWritten < 0)
{
ereport(ERROR, (errcode_for_file_access(),
errmsg("could not append to file: %m")));
}
}
/*
* TaskFileDestReceiverShutdown implements the rShutdown interface of
* TaskFileDestReceiver. It writes the footer and closes the file.
* the relation.
*/
static void
TaskFileDestReceiverShutdown(DestReceiver *destReceiver)
{
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
CopyOutState copyOutState = taskFileDest->copyOutState;
if (copyOutState->binary)
{
/* write footers when using binary encoding */
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
WriteToLocalFile(copyOutState->fe_msgbuf, taskFileDest->fileDesc);
}
FileClose(taskFileDest->fileDesc);
}
/*
* TaskFileDestReceiverDestroy frees memory allocated as part of the
* TaskFileDestReceiver and closes file descriptors.
*/
static void
TaskFileDestReceiverDestroy(DestReceiver *destReceiver)
{
TaskFileDestReceiver *taskFileDest = (TaskFileDestReceiver *) destReceiver;
if (taskFileDest->copyOutState)
{
pfree(taskFileDest->copyOutState);
}
if (taskFileDest->columnOutputFunctions)
{
pfree(taskFileDest->columnOutputFunctions);
}
pfree(taskFileDest->filePath);
pfree(taskFileDest);
}

View File

@ -22,9 +22,6 @@
#include "storage/s_lock.h"
#define TotalProcs (MaxBackends + NUM_AUXILIARY_PROCS + max_prepared_xacts)
/*
* CitusInitiatedBackend keeps some information about the backends that are
* initiated by Citus.
@ -58,6 +55,7 @@ typedef struct BackendData
extern void InitializeBackendManagement(void);
extern int TotalProcCount(void);
extern void InitializeBackendData(void);
extern void LockBackendSharedMemory(LWLockMode lockMode);
extern void UnlockBackendSharedMemory(void);

View File

@ -131,6 +131,7 @@ extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailur
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
extern void CheckCopyPermissions(CopyStmt *copyStatement);
#endif /* MULTI_COPY_H */

View File

@ -36,6 +36,7 @@ extern TupleTableSlot * ReturnTupleFromTuplestore(CitusScanState *scanState);
extern void LoadTuplesIntoTupleStore(CitusScanState *citusScanState, Job *workerJob);
extern void ReadFileIntoTupleStore(char *fileName, char *copyFormat, TupleDesc
tupleDescriptor, Tuplestorestate *tupstore);
extern Query * ParseQueryString(const char *queryString);
extern void ExecuteQueryStringIntoDestReceiver(const char *queryString, ParamListInfo
params,
DestReceiver *dest);

View File

@ -122,6 +122,8 @@ extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedur
extern uint64 ExtractShardIdFromTableName(const char *tableName, bool missingOk);
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
const char *tableName);
extern int64 WorkerExecuteSqlTask(Query *query, char *taskFilename,
bool binaryCopyFormat);
/* Function declarations shared with the master planner */

View File

@ -9,3 +9,6 @@
# Regression test output
/regression.diffs
/regression.out
# Failure test side effets
/proxy.output

View File

@ -122,17 +122,28 @@ SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
(1 row)
-- hide the error message (it has the PID)...
-- this transaction block will be sent to the coordinator as a remote command to hide the
-- error message that is caused during commit.
-- we'll test for the txn side-effects to ensure it didn't run
SET client_min_messages TO FATAL;
SELECT master_run_on_worker(
ARRAY['localhost']::text[],
ARRAY[:master_port]::int[],
ARRAY['
BEGIN;
DELETE FROM dml_test WHERE id = 1;
DELETE FROM dml_test WHERE id = 2;
INSERT INTO dml_test VALUES (5, 'Epsilon');
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
INSERT INTO dml_test VALUES (5, ''Epsilon'');
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
COMMIT;
SET client_min_messages TO DEFAULT;
'],
false
);
master_run_on_worker
---------------------------
(localhost,57636,t,BEGIN)
(1 row)
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------

View File

@ -23,16 +23,6 @@ SELECT create_distributed_table('test_table','id');
-- Populate data to the table
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
-- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- Kill when the first COPY command arrived, since we have a single placement
-- it is expected to error out.
SET client_min_messages TO ERROR;
@ -42,9 +32,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
(1 row)
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
SET client_min_messages TO DEFAULT;
-- Kill the connection with a CTE
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
@ -70,12 +60,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
(1 row)
SELECT raise_failed_execution('WITH
SELECT public.raise_failed_execution('WITH
results AS (SELECT * FROM test_table)
SELECT * FROM test_table, results
WHERE test_table.id = results.id');
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
CONTEXT: PL/pgSQL function public.raise_failed_execution(text) line 6 at RAISE
SET client_min_messages TO DEFAULT;
-- In parallel execution mode Citus opens separate connections for each shard
-- so killing the connection after the first copy does not break it.
@ -297,7 +287,5 @@ WARNING: could not consume data from worker node
COMMIT;
DROP SCHEMA real_time_select_failure CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to function raise_failed_execution(text)
drop cascades to table test_table
NOTICE: drop cascades to table test_table
SET search_path TO default;

View File

@ -1,4 +1,4 @@
Parsed test spec with 3 sessions
Parsed test spec with 4 sessions
starting permutation: s1-begin s1-update-ref-table-from-coordinator s2-start-session-level-connection s2-begin-on-worker s2-update-ref-table s3-select-distributed-waiting-queries s1-commit s2-commit-worker s2-stop-connection
step s1-begin:
@ -747,3 +747,103 @@ stop_session_level_connection_to_node
restore_isolation_tester_func
starting permutation: s1-begin s1-update-on-the-coordinator s2-update-on-the-coordinator s3-select-distributed-waiting-queries s1-commit
step s1-begin:
BEGIN;
step s1-update-on-the-coordinator:
UPDATE tt1 SET value_1 = 4;
step s2-update-on-the-coordinator:
UPDATE tt1 SET value_1 = 4;
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE tt1 SET value_1 = 4;
UPDATE tt1 SET value_1 = 4;
coordinator_hostcoordinator_host57636 57636
step s1-commit:
COMMIT;
step s2-update-on-the-coordinator: <... completed>
restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-update-dist-table s4-start-session-level-connection s4-begin-on-worker s4-update-dist-table s3-select-distributed-waiting-queries s1-commit-worker s4-commit-worker s1-stop-connection s4-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s1-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s1-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 4');
run_commands_on_session_level_connection_to_node
step s4-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
start_session_level_connection_to_node
step s4-begin-on-worker:
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
run_commands_on_session_level_connection_to_node
step s4-update-dist-table:
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
<waiting ...>
step s3-select-distributed-waiting-queries:
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
blocked_statementcurrent_statement_in_blocking_processwaiting_node_nameblocking_node_namewaiting_node_portblocking_node_port
UPDATE tt1 SET value_1 = 5UPDATE tt1 SET value_1 = 4localhost localhost 57637 57637
step s1-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s4-update-dist-table: <... completed>
run_commands_on_session_level_connection_to_node
step s4-commit-worker:
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
run_commands_on_session_level_connection_to_node
step s1-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
step s4-stop-connection:
SELECT stop_session_level_connection_to_node();
stop_session_level_connection_to_node
restore_isolation_tester_func

View File

@ -155,7 +155,7 @@ ALTER EXTENSION citus UPDATE TO '8.0-8';
SHOW citus.version;
citus.version
---------------
8.0devel
8.0.3
(1 row)
-- ensure no objects were created outside pg_catalog

View File

@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
(1 row)
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
count
-------
2
SELECT count(*), min(current_user) FROM test;
count | min
-------+-------------
2 | full_access
(1 row)
-- test re-partition query (needs to transmit intermediate results)
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
(1 row)
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
count
-------
2
SELECT count(*), min(current_user) FROM test;
count | min
-------+-------------
2 | read_access
(1 row)
-- test re-partition query (needs to transmit intermediate results)
@ -171,7 +171,7 @@ ERROR: permission denied for table test
SELECT count(*) FROM test WHERE id = 1;
ERROR: permission denied for table test
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
SELECT count(*), min(current_user) FROM test;
ERROR: permission denied for table test
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;

View File

@ -93,10 +93,10 @@ SELECT count(*) FROM test WHERE id = 1;
(1 row)
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
count
-------
2
SELECT count(*), min(current_user) FROM test;
count | min
-------+-------------
2 | full_access
(1 row)
-- test re-partition query (needs to transmit intermediate results)
@ -140,10 +140,10 @@ SELECT count(*) FROM test WHERE id = 1;
(1 row)
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
count
-------
2
SELECT count(*), min(current_user) FROM test;
count | min
-------+-------------
2 | read_access
(1 row)
-- test re-partition query (needs to transmit intermediate results)
@ -171,7 +171,7 @@ ERROR: permission denied for relation test
SELECT count(*) FROM test WHERE id = 1;
ERROR: permission denied for relation test
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
SELECT count(*), min(current_user) FROM test;
ERROR: permission denied for relation test
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;

View File

@ -225,8 +225,12 @@ ERROR: parameter "citus.max_task_string_size" cannot be changed without restart
-- error message may vary between executions
-- hiding warning and error message
-- no output means the query has failed
SET client_min_messages to FATAL;
SET client_min_messages to ERROR;
SELECT raise_failed_execution('
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
');
ERROR: Task failed to execute
CONTEXT: PL/pgSQL function raise_failed_execution(text) line 6 at RAISE
-- following will succeed since it fetches few columns
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
long_column_001 | long_column_002 | long_column_003

View File

@ -109,3 +109,13 @@ $desc_views$
(1 row)
-- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;

View File

@ -268,3 +268,8 @@ FETCH BACKWARD noHoldCursor;
COMMIT;
FETCH ABSOLUTE 5 FROM noHoldCursor;
ERROR: cursor "noholdcursor" does not exist
-- Test we don't throw an error for DROP IF EXISTS
DROP DATABASE IF EXISTS not_existing_database;
NOTICE: database "not_existing_database" does not exist, skipping
DROP TABLE IF EXISTS not_existing_table;
NOTICE: table "not_existing_table" does not exist, skipping

View File

@ -428,8 +428,8 @@ ORDER BY 2 DESC, 1;
5 | 14
(3 rows)
-- non-partition key joins are not supported inside subquery
-- since the join with a table
-- non-partition key joins are supported inside subquery
-- via pull-push execution
SELECT * FROM
(SELECT ru.user_id, count(*)
FROM recent_users ru
@ -438,7 +438,13 @@ SELECT * FROM
GROUP BY ru.user_id
ORDER BY 2 DESC, 1) s1
ORDER BY 2 DESC, 1;
ERROR: bogus varno: 3
user_id | count
---------+-------
1 | 24
3 | 23
5 | 7
(3 rows)
-- join between views
-- recent users who has an event in recent events
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
@ -533,7 +539,8 @@ SELECT * FROM
ON(ru.user_id = et.event_type)
) s1
ORDER BY 2 DESC, 1;
ERROR: bogus varno: 3
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
-- create a select only view
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
@ -863,7 +870,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN
-> Sort
Sort Key: remote_scan."time" DESC
-> Custom Scan (Citus Real-Time)
-> Distributed Subplan 95_1
-> Distributed Subplan 96_1
-> Limit
-> Sort
Sort Key: max((max(remote_scan.lastseen))) DESC

View File

@ -428,8 +428,8 @@ ORDER BY 2 DESC, 1;
5 | 14
(3 rows)
-- non-partition key joins are not supported inside subquery
-- since the join with a table
-- non-partition key joins are supported inside subquery
-- via pull-push execution
SELECT * FROM
(SELECT ru.user_id, count(*)
FROM recent_users ru
@ -438,7 +438,13 @@ SELECT * FROM
GROUP BY ru.user_id
ORDER BY 2 DESC, 1) s1
ORDER BY 2 DESC, 1;
ERROR: bogus varno: 3
user_id | count
---------+-------
1 | 24
3 | 23
5 | 7
(3 rows)
-- join between views
-- recent users who has an event in recent events
SELECT ru.user_id FROM recent_users ru JOIN recent_events re USING(user_id) GROUP BY ru.user_id ORDER BY ru.user_id;
@ -533,7 +539,8 @@ SELECT * FROM
ON(ru.user_id = et.event_type)
) s1
ORDER BY 2 DESC, 1;
ERROR: bogus varno: 3
ERROR: cannot pushdown the subquery
DETAIL: Complex subqueries and CTEs cannot be in the outer part of the outer join
-- create a select only view
CREATE VIEW selected_users AS SELECT * FROM users_table WHERE value_1 >= 1 and value_1 <3;
CREATE VIEW recent_selected_users AS SELECT su.* FROM selected_users su JOIN recent_users ru USING(user_id);
@ -865,7 +872,7 @@ EXPLAIN (COSTS FALSE) SELECT et.* FROM recent_10_users JOIN events_table et USIN
-> Sort
Sort Key: remote_scan."time" DESC
-> Custom Scan (Citus Real-Time)
-> Distributed Subplan 95_1
-> Distributed Subplan 96_1
-> Limit
-> Sort
Sort Key: max((max(remote_scan.lastseen))) DESC

View File

@ -19,9 +19,17 @@ setup
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT start_metadata_sync_to_node('localhost', 57637);
SELECT start_metadata_sync_to_node('localhost', 57638);
-- start_metadata_sync_to_node can not be run inside a transaction block
-- following is a workaround to overcome that
-- port numbers are hard coded at the moment
SELECT master_run_on_worker(
ARRAY['localhost']::text[],
ARRAY[57636]::int[],
ARRAY[format('SELECT start_metadata_sync_to_node(''%s'', %s)', nodename, nodeport)]::text[],
false)
FROM pg_dist_node;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model to streaming;
@ -112,6 +120,11 @@ step "s1-stop-connection"
SELECT stop_session_level_connection_to_node();
}
step "s1-update-on-the-coordinator"
{
UPDATE tt1 SET value_1 = 4;
}
step "s1-commit"
{
COMMIT;
@ -119,6 +132,11 @@ step "s1-commit"
session "s2"
step "s2-begin"
{
COMMIT;
}
step "s2-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57638);
@ -164,6 +182,11 @@ step "s2-stop-connection"
SELECT stop_session_level_connection_to_node();
}
step "s2-update-on-the-coordinator"
{
UPDATE tt1 SET value_1 = 4;
}
step "s2-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
@ -176,6 +199,33 @@ step "s3-select-distributed-waiting-queries"
SELECT blocked_statement, current_statement_in_blocking_process, waiting_node_name, blocking_node_name, waiting_node_port, blocking_node_port FROM citus_lock_waits;
}
# session s1 and s4 executes the commands on the same worker node
session "s4"
step "s4-start-session-level-connection"
{
SELECT start_session_level_connection_to_node('localhost', 57637);
}
step "s4-begin-on-worker"
{
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s4-update-dist-table"
{
SELECT run_commands_on_session_level_connection_to_node('UPDATE tt1 SET value_1 = 5');
}
step "s4-stop-connection"
{
SELECT stop_session_level_connection_to_node();
}
step "s4-commit-worker"
{
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
}
permutation "s1-begin" "s1-update-ref-table-from-coordinator" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit" "s2-commit-worker" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
@ -187,3 +237,9 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-copy-to-ref-table" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy-to-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select-for-update" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-update-ref-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s2-start-session-level-connection" "s2-begin-on-worker" "s2-insert-into-ref-table" "s1-begin" "s1-alter-table" "s3-select-distributed-waiting-queries" "s2-commit-worker" "s1-commit" "s2-stop-connection"
# make sure that multi-shard modification queries
# show up in the waiting processes even if they are
# blocked on the same node
permutation "s1-begin" "s1-update-on-the-coordinator" "s2-update-on-the-coordinator" "s3-select-distributed-waiting-queries" "s1-commit"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-update-dist-table" "s4-start-session-level-connection" "s4-begin-on-worker" "s4-update-dist-table" "s3-select-distributed-waiting-queries" "s1-commit-worker" "s4-commit-worker" "s1-stop-connection" "s4-stop-connection"

View File

@ -20,8 +20,16 @@ setup
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT start_metadata_sync_to_node('localhost', 57637);
SELECT start_metadata_sync_to_node('localhost', 57638);
-- start_metadata_sync_to_node can not be run inside a transaction block
-- following is a workaround to overcome that
-- port numbers are hard coded at the moment
SELECT master_run_on_worker(
ARRAY['localhost']::text[],
ARRAY[57636]::int[],
ARRAY[format('SELECT start_metadata_sync_to_node(''%s'', %s)', nodename, nodeport)]::text[],
false)
FROM pg_dist_node;
SET citus.replication_model to streaming;
CREATE TABLE ref_table(user_id int, value_1 int);

View File

@ -65,19 +65,23 @@ SELECT * FROM dml_test ORDER BY id ASC;
-- fail at PREPARE TRANSACTION
SELECT citus.mitmproxy('conn.onQuery(query="^PREPARE TRANSACTION").kill()');
-- hide the error message (it has the PID)...
-- this transaction block will be sent to the coordinator as a remote command to hide the
-- error message that is caused during commit.
-- we'll test for the txn side-effects to ensure it didn't run
SET client_min_messages TO FATAL;
SELECT master_run_on_worker(
ARRAY['localhost']::text[],
ARRAY[:master_port]::int[],
ARRAY['
BEGIN;
DELETE FROM dml_test WHERE id = 1;
DELETE FROM dml_test WHERE id = 2;
INSERT INTO dml_test VALUES (5, 'Epsilon');
UPDATE dml_test SET name = 'alpha' WHERE id = 1;
UPDATE dml_test SET name = 'gamma' WHERE id = 3;
INSERT INTO dml_test VALUES (5, ''Epsilon'');
UPDATE dml_test SET name = ''alpha'' WHERE id = 1;
UPDATE dml_test SET name = ''gamma'' WHERE id = 3;
COMMIT;
SET client_min_messages TO DEFAULT;
'],
false
);
SELECT citus.mitmproxy('conn.allow()');
SELECT shardid FROM pg_dist_shard_placement WHERE shardstate = 3;

View File

@ -17,22 +17,11 @@ SELECT create_distributed_table('test_table','id');
-- Populate data to the table
INSERT INTO test_table VALUES(1,1,1),(1,2,2),(2,1,1),(2,2,2),(3,1,1),(3,2,2);
-- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;
-- Kill when the first COPY command arrived, since we have a single placement
-- it is expected to error out.
SET client_min_messages TO ERROR;
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").kill()');
SELECT raise_failed_execution('SELECT count(*) FROM test_table');
SELECT public.raise_failed_execution('SELECT count(*) FROM test_table');
SET client_min_messages TO DEFAULT;
-- Kill the connection with a CTE
@ -46,7 +35,7 @@ WHERE test_table.id = results.id;
-- killing connection after first successful query should break.
SET client_min_messages TO ERROR;
SELECT citus.mitmproxy('conn.onQuery(query="^COPY").after(1).kill()');
SELECT raise_failed_execution('WITH
SELECT public.raise_failed_execution('WITH
results AS (SELECT * FROM test_table)
SELECT * FROM test_table, results
WHERE test_table.id = results.id');

View File

@ -70,7 +70,7 @@ SELECT count(*) FROM test;
SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
SELECT count(*), min(current_user) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
@ -94,7 +94,7 @@ SELECT count(*) FROM test;
SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
SELECT count(*), min(current_user) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
@ -115,7 +115,7 @@ SELECT count(*) FROM test;
SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
SELECT count(*), min(current_user) FROM test;
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;

View File

@ -220,9 +220,11 @@ SET citus.max_task_string_size TO 20000;
-- error message may vary between executions
-- hiding warning and error message
-- no output means the query has failed
SET client_min_messages to FATAL;
SET client_min_messages to ERROR;
SELECT raise_failed_execution('
SELECT u.* FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);
');
-- following will succeed since it fetches few columns
SELECT u.long_column_001, u.long_column_002, u.long_column_003 FROM wide_table u JOIN wide_table v ON (u.long_column_002 = v.long_column_003);

View File

@ -106,3 +106,14 @@ ORDER BY a.attrelid, a.attnum;
$desc_views$
);
-- Create a function to make sure that queries returning the same result
CREATE FUNCTION raise_failed_execution(query text) RETURNS void AS $$
BEGIN
EXECUTE query;
EXCEPTION WHEN OTHERS THEN
IF SQLERRM LIKE 'failed to execute task%' THEN
RAISE 'Task failed to execute';
END IF;
END;
$$LANGUAGE plpgsql;

View File

@ -146,3 +146,7 @@ FETCH ABSOLUTE 5 FROM noHoldCursor;
FETCH BACKWARD noHoldCursor;
COMMIT;
FETCH ABSOLUTE 5 FROM noHoldCursor;
-- Test we don't throw an error for DROP IF EXISTS
DROP DATABASE IF EXISTS not_existing_database;
DROP TABLE IF EXISTS not_existing_table;

View File

@ -220,8 +220,8 @@ SELECT * FROM
ORDER BY 2 DESC, 1) s1
ORDER BY 2 DESC, 1;
-- non-partition key joins are not supported inside subquery
-- since the join with a table
-- non-partition key joins are supported inside subquery
-- via pull-push execution
SELECT * FROM
(SELECT ru.user_id, count(*)
FROM recent_users ru

View File

@ -11,13 +11,13 @@
#define CITUS_MAJORVERSION "8.0"
/* Citus version as a string */
#define CITUS_VERSION "8.0devel"
#define CITUS_VERSION "8.0.3"
/* Citus version as a number */
#define CITUS_VERSION_NUM 80000
/* A string containing the version number, platform, and C compiler */
#define CITUS_VERSION_STR "Citus 8.0devel on x86_64-windows, compiled by Visual Studio"
#define CITUS_VERSION_STR "Citus 8.0.3 on x86_64-windows, compiled by Visual Studio"
/* Define to 1 if you have the `curl' library (-lcurl). */
#define HAVE_LIBCURL 0