Compare commits

...

17 Commits
main ... v7.0.2

Author SHA1 Message Date
Burak Yucesoy 1f83929e80 Bump Citus version to 7.0.2 2017-09-28 10:39:25 -07:00
Burak Yucesoy abf9bb6b86 Add CHANGELOG entry for 7.0.2 2017-09-28 10:38:50 -07:00
Marco Slot cdea47e5c3 Add multi-user re-partitioning regression tests 2017-09-28 15:29:17 +02:00
Marco Slot c565ab55aa Execute transmit commands as superuser during task-tracker queries 2017-09-28 15:29:17 +02:00
Marco Slot d072e06746 Check for absolute paths in COPY with format transmit 2017-09-28 15:29:17 +02:00
Marco Slot ea52bee891 Allow read-only users to run task-tracker queries 2017-09-28 15:29:17 +02:00
Burak Yucesoy 4873d771e8 Bump Citus version to 7.0.1 2017-09-12 17:25:38 -07:00
Burak Yucesoy e1a641fe9b Add CHANGELOG entry for 7.0.1 release 2017-09-12 17:13:52 -07:00
Marco Slot 6b5baf21fb Wait for I/O to finish after PQputCopyData 2017-09-12 17:13:52 -07:00
Marco Slot 8c0274cba6 Free per-tuple COPY memory in INSERT...SELECT 2017-09-12 17:13:52 -07:00
Marco Slot 907048ace8 Add volatile function in prepared statement regression test 2017-09-12 17:13:52 -07:00
Marco Slot d8bb32bd5a Always copy MultiPlan in GetMultiPlan 2017-09-12 17:13:52 -07:00
Jason Petersen 0012d70b1b Add clarifying comment to RngVarCallbackForDropIdx
We don't need the PARTITION-related logic recently added in PostgreSQL.
2017-09-12 17:13:52 -07:00
Jason Petersen 44ddef6fe8 Update ruleutils_10 with latest PostgreSQL changes
See:
	postgres/postgres@21d304dfed
	postgres/postgres@bb5d6e80b1
	postgres/postgres@d363d42bb9
	postgres/postgres@eb145fdfea
	postgres/postgres@decb08ebdf
	postgres/postgres@a3ca72ae9a
	postgres/postgres@bc2d716ad0
	postgres/postgres@382ceffdf7
	postgres/postgres@c7b8998ebb
	postgres/postgres@e3860ffa4d
	postgres/postgres@76a3df6e5e
2017-09-12 17:13:52 -07:00
Jason Petersen 0ec41de26c Update ruleutils_96 with latest PostgreSQL changes
See:
	postgres/postgres@41ada83774
	postgres/postgres@3b0c2dbed0
	postgres/postgres@ff2d537223
2017-09-12 17:13:52 -07:00
Burak Yucesoy 34c6bd4b44 Bump configure PACKAGE_VERSION 2017-08-28 16:45:20 +03:00
Burak Yucesoy d607368a9e Add CHANGELOG entry for 7.0 release 2017-08-28 16:41:14 +03:00
20 changed files with 672 additions and 176 deletions

View File

@ -1,6 +1,78 @@
### citus v7.0.0 (unreleased) ###
### citus v7.0.2 (September 28, 2017) ###
* Replaces pg_dist_shard_placement metadata table with pg_dist_placement
* Updates task-tracker to limit file access
### citus v7.0.1 (September 12, 2017) ###
* Fixes a bug that could cause memory leaks in `INSERT ... SELECT` queries
* Fixes a bug that could cause incorrect execution of prepared statements
* Fixes a bug that could cause excessive memory usage during COPY
* Incorporates latest changes from core PostgreSQL code
### citus v7.0.0 (August 28, 2017) ###
* Adds support for PostgreSQL 10
* Drops support for PostgreSQL 9.5
* Adds support for multi-row `INSERT`
* Adds support for router `UPDATE` and `DELETE` queries with subqueries
* Adds infrastructure for distributed deadlock detection
* Deprecates `enable_deadlock_prevention` flag
* Adds support for partitioned tables
* Adds support for creating `UNLOGGED` tables
* Adds support for `SAVEPOINT`
* Adds UDF `citus_create_restore_point` for taking distributed snapshots
* Adds support for evaluating non-pushable `INSERT ... SELECT` queries
* Adds support for subquery pushdown on reference tables
* Adds shard pruning support for `IN` and `ANY`
* Adds support for `UPDATE` and `DELETE` commands that prune down to 0 shard
* Enhances transaction support by relaxing some transaction restrictions
* Fixes a bug causing crash if distributed table has no shards
* Fixes a bug causing crash when removing inactive node
* Fixes a bug causing failure during `COPY` on tables with dropped columns
* Fixes a bug causing failure during `DROP EXTENSION`
* Fixes a bug preventing executing `VACUUM` and `INSERT` concurrently
* Fixes a bug in prepared `INSERT` statements containing an implicit cast
* Fixes several issues related to statement cancellations and connections
* Fixes several 2PC related issues
* Removes an unnecessary dependency causing warning messages in pg_dump
* Adds internal infrastructure for follower clusters
* Adds internal infrastructure for progress tracking
* Implements various performance improvements
* Adds internal infrastructures and tests to improve development process
* Addresses various race conditions and deadlocks
* Improves and standardizes error messages
### citus v6.2.3 (July 13, 2017) ###

32
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 7.0devel.
# Generated by GNU Autoconf 2.69 for Citus 7.0.2.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='7.0devel'
PACKAGE_STRING='Citus 7.0devel'
PACKAGE_VERSION='7.0.2'
PACKAGE_STRING='Citus 7.0.2'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -621,6 +621,7 @@ infodir
docdir
oldincludedir
includedir
runstatedir
localstatedir
sharedstatedir
sysconfdir
@ -693,6 +694,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include'
oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -945,6 +947,15 @@ do
| -silent | --silent | --silen | --sile | --sil)
silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1082,7 +1093,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir
libdir localedir mandir runstatedir
do
eval ac_val=\$$ac_var
# Remove trailing slashes.
@ -1195,7 +1206,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 7.0devel to adapt to many kinds of systems.
\`configure' configures Citus 7.0.2 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1235,6 +1246,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include]
@ -1256,7 +1268,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 7.0devel:";;
short | recursive ) echo "Configuration of Citus 7.0.2:";;
esac
cat <<\_ACEOF
@ -1344,7 +1356,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 7.0devel
Citus configure 7.0.2
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1401,7 +1413,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 7.0devel, which was
It was created by Citus $as_me 7.0.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -3564,7 +3576,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 7.0devel, which was
This file was extended by Citus $as_me 7.0.2, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -3626,7 +3638,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 7.0devel
Citus config.status 7.0.2
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [7.0devel])
AC_INIT([Citus], [7.0.2])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -2010,6 +2010,14 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
copyDest->tuplesSent++;
/*
* Release per tuple memory allocated in this function. If we're writing
* the results of an INSERT ... SELECT then the SELECT execution will use
* its own executor state and reset the per tuple expression context
* separately.
*/
ResetPerTupleExprContext(executorState);
return true;
}

View File

@ -497,6 +497,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
bool allowInterrupts = true;
if (PQstatus(pgConn) != CONNECTION_OK)
{
@ -506,21 +507,22 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyData(pgConn, buffer, nbytes);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
return FinishConnectionIO(connection, allowInterrupts);
}
/*
* PQputCopyData may have queued up part of the data even if it managed
* to send some of it succesfully. We provide back pressure by waiting
* until the socket is writable to prevent the internal libpq buffers
* from growing excessively.
*
* In the future, we could reduce the frequency of these pushbacks to
* achieve higher throughput.
*/
return FinishConnectionIO(connection, allowInterrupts);
}
@ -535,6 +537,7 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
bool allowInterrupts = true;
if (PQstatus(pgConn) != CONNECTION_OK)
{
@ -544,21 +547,14 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyEnd(pgConn, errormsg);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
return FinishConnectionIO(connection, allowInterrupts);
}
/* see PutRemoteCopyData() */
return FinishConnectionIO(connection, allowInterrupts);
}

View File

@ -127,7 +127,8 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
* error and returns INVALID_CONNECTION_ID.
*/
int32
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
const char *userName)
{
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
@ -148,7 +149,8 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
/* prepare asynchronous request for worker node connection */
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
userName, nodeDatabase);
connStatusType = PQstatus(connection->pgConn);
/*

View File

@ -272,7 +272,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
/* we use the same database name on the master and worker nodes */
nodeDatabase = get_database_name(MyDatabaseId);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
NULL);
connectionIdArray[currentIndex] = connectionId;
/* if valid, poll the connection until the connection is initiated */

View File

@ -71,7 +71,8 @@ static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId,
uint32 taskId);
static bool TopLevelTask(Task *task);
static bool TransmitExecutionCompleted(TaskExecution *taskExecution);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList,
char *userName);
static HTAB * TrackerHashCreate(const char *taskTrackerHashName,
uint32 taskTrackerHashSize);
static TaskTracker * TrackerHashEnter(HTAB *taskTrackerHash, char *nodeName,
@ -160,6 +161,7 @@ MultiTaskTrackerExecute(Job *job)
List *workerNodeList = NIL;
HTAB *taskTrackerHash = NULL;
HTAB *transmitTrackerHash = NULL;
char *extensionOwner = CitusExtensionOwnerName();
const char *taskTrackerHashName = "Task Tracker Hash";
const char *transmitTrackerHashName = "Transmit Tracker Hash";
List *jobIdList = NIL;
@ -201,8 +203,12 @@ MultiTaskTrackerExecute(Job *job)
workerNodeList = ActivePrimaryNodeList();
taskTrackerCount = (uint32) list_length(workerNodeList);
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList);
/* connect as the current user for running queries */
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList, NULL);
/* connect as the superuser for fetching result files */
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList,
extensionOwner);
TrackerHashConnect(taskTrackerHash);
TrackerHashConnect(transmitTrackerHash);
@ -666,10 +672,11 @@ TransmitExecutionCompleted(TaskExecution *taskExecution)
/*
* TrackerHash creates a task tracker hash with the given name. The function
* then inserts one task tracker entry for each node in the given worker node
* list, and initializes state for each task tracker.
* list, and initializes state for each task tracker. The userName argument
* indicates which user to connect as.
*/
static HTAB *
TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
TrackerHash(const char *taskTrackerHashName, List *workerNodeList, char *userName)
{
/* create task tracker hash */
uint32 taskTrackerHashSize = list_length(workerNodeList);
@ -711,6 +718,7 @@ TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
}
taskTracker->taskStateHash = taskStateHash;
taskTracker->userName = userName;
}
return taskTrackerHash;
@ -844,9 +852,10 @@ TrackerConnectPoll(TaskTracker *taskTracker)
char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
char *nodeDatabase = get_database_name(MyDatabaseId);
char *nodeUser = taskTracker->userName;
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
nodeDatabase);
nodeDatabase, nodeUser);
if (connectionId != INVALID_CONNECTION_ID)
{
taskTracker->connectionId = connectionId;

View File

@ -634,6 +634,10 @@ IsTransmitStmt(Node *parsetree)
static void
VerifyTransmitStmt(CopyStmt *copyStatement)
{
char *fileName = NULL;
EnsureSuperUser();
/* do some minimal option verification */
if (copyStatement->relation == NULL ||
copyStatement->relation->relname == NULL)
@ -642,6 +646,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
errmsg("FORMAT 'transmit' requires a target file")));
}
fileName = copyStatement->relation->relname;
if (is_absolute_path(fileName))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("absolute path not allowed"))));
}
else if (!path_is_relative_and_below_cwd(fileName))
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("path must be in or below the current directory"))));
}
if (copyStatement->filename != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -3111,7 +3129,9 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
*
* This code is heavily borrowed from RangeVarCallbackForDropRelation() in
* commands/tablecmds.c in Postgres source. We need this to ensure the right
* order of locking while dealing with DROP INDEX statments.
* order of locking while dealing with DROP INDEX statments. Because we are
* exclusively using this callback for INDEX processing, the PARTITION-related
* logic from PostgreSQL's similar callback has been omitted as unneeded.
*/
static void
RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg)

View File

@ -467,7 +467,14 @@ GetMultiPlan(CustomScan *customScan)
node = CheckNodeCopyAndSerialization(node);
multiPlan = (MultiPlan *) node;
/*
* When using prepared statements the same plan gets reused across
* multiple statements and transactions. We make several modifications
* to the MultiPlan during execution such as assigning task placements
* and evaluating functions and parameters. These changes should not
* persist, so we always work on a copy.
*/
multiPlan = (MultiPlan *) copyObject(node);
return multiPlan;
}

View File

@ -93,7 +93,7 @@
#define PRETTYINDENT_JOIN 4
#define PRETTYINDENT_VAR 4
#define PRETTYINDENT_LIMIT 40 /* wrap limit */
#define PRETTYINDENT_LIMIT 40 /* wrap limit */
/* Pretty flags */
#define PRETTYFLAG_PAREN 1
@ -125,8 +125,8 @@ typedef struct
bool varprefix; /* TRUE to print prefixes on Vars */
Oid distrelid; /* the distributed table being modified, if valid */
int64 shardid; /* a distributed table's shardid, if positive */
ParseExprKind special_exprkind; /* set only for exprkinds needing
* special handling */
ParseExprKind special_exprkind; /* set only for exprkinds needing special
* handling */
} deparse_context;
/*
@ -292,7 +292,7 @@ typedef struct
*/
typedef struct
{
char name[NAMEDATALEN]; /* Hash key --- must be first */
char name[NAMEDATALEN]; /* Hash key --- must be first */
int counter; /* Largest addition used so far for name */
} NameHashEntry;
@ -390,6 +390,9 @@ static void get_rule_expr(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_toplevel(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit);
static bool looks_like_function(Node *node);
static void get_oper_expr(OpExpr *expr, deparse_context *context);
static void get_func_expr(FuncExpr *expr, deparse_context *context,
bool showimplicit);
@ -2148,19 +2151,19 @@ get_select_query_def(Query *query, deparse_context *context,
break;
case LCS_FORKEYSHARE:
appendContextKeyword(context, " FOR KEY SHARE",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
break;
case LCS_FORSHARE:
appendContextKeyword(context, " FOR SHARE",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
break;
case LCS_FORNOKEYUPDATE:
appendContextKeyword(context, " FOR NO KEY UPDATE",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
break;
case LCS_FORUPDATE:
appendContextKeyword(context, " FOR UPDATE",
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
-PRETTYINDENT_STD, PRETTYINDENT_STD, 0);
break;
}
@ -3018,8 +3021,8 @@ get_insert_query_def(Query *query, deparse_context *context)
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_relid_attribute_name(rte->relid,
tle->resno)));
quote_identifier(get_relid_attribute_name(rte->relid,
tle->resno)));
/*
* Print any indirection needed (subfields or subscripts), and strip
@ -3299,8 +3302,11 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/*
* We must dig down into the expr to see if it's a PARAM_MULTIEXPR
* Param. That could be buried under FieldStores and ArrayRefs
* (cf processIndirection()), and underneath those there could be
* an implicit type coercion.
* and CoerceToDomains (cf processIndirection()), and underneath
* those there could be an implicit type coercion. Because we
* would ignore implicit type coercions anyway, we don't need to
* be as careful as processIndirection() is about descending past
* implicit CoerceToDomains.
*/
expr = (Node *) tle->expr;
while (expr)
@ -3319,6 +3325,14 @@ get_update_query_targetlist_def(Query *query, List *targetList,
break;
expr = (Node *) aref->refassgnexpr;
}
else if (IsA(expr, CoerceToDomain))
{
CoerceToDomain *cdomain = (CoerceToDomain *) expr;
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
expr = (Node *) cdomain->arg;
}
else
break;
}
@ -3330,7 +3344,7 @@ get_update_query_targetlist_def(Query *query, List *targetList,
cur_ma_sublink = (SubLink *) lfirst(next_ma_cell);
next_ma_cell = lnext(next_ma_cell);
remaining_ma_columns = count_nonjunk_tlist_entries(
((Query *) cur_ma_sublink->subselect)->targetList);
((Query *) cur_ma_sublink->subselect)->targetList);
Assert(((Param *) expr)->paramid ==
((cur_ma_sublink->subLinkId << 16) | 1));
appendStringInfoChar(buf, '(');
@ -3342,8 +3356,8 @@ get_update_query_targetlist_def(Query *query, List *targetList,
* tle->resname, since resname will fail to track RENAME.
*/
appendStringInfoString(buf,
quote_identifier(get_relid_attribute_name(rte->relid,
tle->resno)));
quote_identifier(get_relid_attribute_name(rte->relid,
tle->resno)));
/*
* Print any indirection needed (subfields or subscripts), and strip
@ -4423,6 +4437,7 @@ isSimpleNode(Node *node, Node *parentNode, int prettyFlags)
case T_MinMaxExpr:
case T_SQLValueFunction:
case T_XmlExpr:
case T_NextValueExpr:
case T_NullIfExpr:
case T_Aggref:
case T_WindowFunc:
@ -4532,17 +4547,17 @@ isSimpleNode(Node *node, Node *parentNode, int prettyFlags)
return false;
return true; /* own parentheses */
}
case T_BoolExpr: /* lower precedence */
case T_ArrayRef: /* other separators */
case T_ArrayExpr: /* other separators */
case T_RowExpr: /* other separators */
case T_BoolExpr: /* lower precedence */
case T_ArrayRef: /* other separators */
case T_ArrayExpr: /* other separators */
case T_RowExpr: /* other separators */
case T_CoalesceExpr: /* own parentheses */
case T_MinMaxExpr: /* own parentheses */
case T_XmlExpr: /* own parentheses */
case T_NullIfExpr: /* other separators */
case T_MinMaxExpr: /* own parentheses */
case T_XmlExpr: /* own parentheses */
case T_NullIfExpr: /* other separators */
case T_Aggref: /* own parentheses */
case T_WindowFunc: /* own parentheses */
case T_CaseExpr: /* other separators */
case T_WindowFunc: /* own parentheses */
case T_CaseExpr: /* other separators */
return true;
default:
return false;
@ -4583,16 +4598,16 @@ isSimpleNode(Node *node, Node *parentNode, int prettyFlags)
return false;
return true; /* own parentheses */
}
case T_ArrayRef: /* other separators */
case T_ArrayExpr: /* other separators */
case T_RowExpr: /* other separators */
case T_ArrayRef: /* other separators */
case T_ArrayExpr: /* other separators */
case T_RowExpr: /* other separators */
case T_CoalesceExpr: /* own parentheses */
case T_MinMaxExpr: /* own parentheses */
case T_XmlExpr: /* own parentheses */
case T_NullIfExpr: /* other separators */
case T_MinMaxExpr: /* own parentheses */
case T_XmlExpr: /* own parentheses */
case T_NullIfExpr: /* other separators */
case T_Aggref: /* own parentheses */
case T_WindowFunc: /* own parentheses */
case T_CaseExpr: /* other separators */
case T_WindowFunc: /* own parentheses */
case T_CaseExpr: /* other separators */
return true;
default:
return false;
@ -4891,7 +4906,7 @@ get_rule_expr(Node *node, deparse_context *context,
appendStringInfo(buf, " %s %s (",
generate_operator_name(expr->opno,
exprType(arg1),
get_base_element_type(exprType(arg2))),
get_base_element_type(exprType(arg2))),
expr->useOr ? "ANY" : "ALL");
get_rule_expr_paren(arg2, context, true, node);
@ -4911,7 +4926,7 @@ get_rule_expr(Node *node, deparse_context *context,
((SubLink *) arg2)->subLinkType == EXPR_SUBLINK)
appendStringInfo(buf, "::%s",
format_type_with_typemod(exprType(arg2),
exprTypmod(arg2)));
exprTypmod(arg2)));
appendStringInfoChar(buf, ')');
if (!PRETTY_PAREN(context))
appendStringInfoChar(buf, ')');
@ -5269,7 +5284,7 @@ get_rule_expr(Node *node, deparse_context *context,
*/
if (arrayexpr->elements == NIL)
appendStringInfo(buf, "::%s",
format_type_with_typemod(arrayexpr->array_typeid, -1));
format_type_with_typemod(arrayexpr->array_typeid, -1));
}
break;
@ -5331,7 +5346,7 @@ get_rule_expr(Node *node, deparse_context *context,
appendStringInfoChar(buf, ')');
if (rowexpr->row_format == COERCE_EXPLICIT_CAST)
appendStringInfo(buf, "::%s",
format_type_with_typemod(rowexpr->row_typeid, -1));
format_type_with_typemod(rowexpr->row_typeid, -1));
}
break;
@ -5364,9 +5379,9 @@ get_rule_expr(Node *node, deparse_context *context,
* be perfect.
*/
appendStringInfo(buf, ") %s ROW(",
generate_operator_name(linitial_oid(rcexpr->opnos),
exprType(linitial(rcexpr->largs)),
exprType(linitial(rcexpr->rargs))));
generate_operator_name(linitial_oid(rcexpr->opnos),
exprType(linitial(rcexpr->largs)),
exprType(linitial(rcexpr->rargs))));
sep = "";
foreach(arg, rcexpr->rargs)
{
@ -5564,7 +5579,7 @@ get_rule_expr(Node *node, deparse_context *context,
Assert(!con->constisnull);
if (DatumGetBool(con->constvalue))
appendStringInfoString(buf,
" PRESERVE WHITESPACE");
" PRESERVE WHITESPACE");
else
appendStringInfoString(buf,
" STRIP WHITESPACE");
@ -5592,15 +5607,15 @@ get_rule_expr(Node *node, deparse_context *context,
{
case XML_STANDALONE_YES:
appendStringInfoString(buf,
", STANDALONE YES");
", STANDALONE YES");
break;
case XML_STANDALONE_NO:
appendStringInfoString(buf,
", STANDALONE NO");
", STANDALONE NO");
break;
case XML_STANDALONE_NO_VALUE:
appendStringInfoString(buf,
", STANDALONE NO VALUE");
", STANDALONE NO VALUE");
break;
default:
break;
@ -5752,6 +5767,22 @@ get_rule_expr(Node *node, deparse_context *context,
}
break;
case T_NextValueExpr:
{
NextValueExpr *nvexpr = (NextValueExpr *) node;
/*
* This isn't exactly nextval(), but that seems close enough
* for EXPLAIN's purposes.
*/
appendStringInfoString(buf, "nextval(");
simple_quote_literal(buf,
generate_relation_name(nvexpr->seqid,
NIL));
appendStringInfoChar(buf, ')');
}
break;
case T_InferenceElem:
{
InferenceElem *iexpr = (InferenceElem *) node;
@ -5786,7 +5817,7 @@ get_rule_expr(Node *node, deparse_context *context,
if (iexpr->infercollid)
appendStringInfo(buf, " COLLATE %s",
generate_collation_name(iexpr->infercollid));
generate_collation_name(iexpr->infercollid));
/* Add the operator class name, if not default */
if (iexpr->inferopclass)
@ -5810,12 +5841,11 @@ get_rule_expr(Node *node, deparse_context *context,
case PARTITION_STRATEGY_LIST:
Assert(spec->listdatums != NIL);
appendStringInfoString(buf, "FOR VALUES");
appendStringInfoString(buf, " IN (");
appendStringInfoString(buf, "FOR VALUES IN (");
sep = "";
foreach(cell, spec->listdatums)
{
Const *val = lfirst(cell);
Const *val = castNode(Const, lfirst(cell));
appendStringInfoString(buf, sep);
get_const_expr(val, context, -1);
@ -5831,50 +5861,9 @@ get_rule_expr(Node *node, deparse_context *context,
list_length(spec->lowerdatums) ==
list_length(spec->upperdatums));
appendStringInfoString(buf, "FOR VALUES");
appendStringInfoString(buf, " FROM");
appendStringInfoString(buf, " (");
sep = "";
foreach(cell, spec->lowerdatums)
{
PartitionRangeDatum *datum = lfirst(cell);
Const *val;
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
val = (Const *) datum->value;
get_const_expr(val, context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
appendStringInfoString(buf, " TO");
appendStringInfoString(buf, " (");
sep = "";
foreach(cell, spec->upperdatums)
{
PartitionRangeDatum *datum = lfirst(cell);
Const *val;
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
val = (Const *) datum->value;
get_const_expr(val, context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
appendStringInfo(buf, "FOR VALUES FROM %s TO %s",
get_range_partbound_string(spec->lowerdatums),
get_range_partbound_string(spec->upperdatums));
break;
default:
@ -5931,6 +5920,64 @@ get_rule_expr_toplevel(Node *node, deparse_context *context,
get_rule_expr(node, context, showimplicit);
}
/*
* get_rule_expr_funccall - Parse back a function-call expression
*
* Same as get_rule_expr(), except that we guarantee that the output will
* look like a function call, or like one of the things the grammar treats as
* equivalent to a function call (see the func_expr_windowless production).
* This is needed in places where the grammar uses func_expr_windowless and
* you can't substitute a parenthesized a_expr. If what we have isn't going
* to look like a function call, wrap it in a dummy CAST() expression, which
* will satisfy the grammar --- and, indeed, is likely what the user wrote to
* produce such a thing.
*/
static void
get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit)
{
if (looks_like_function(node))
get_rule_expr(node, context, showimplicit);
else
{
StringInfo buf = context->buf;
appendStringInfoString(buf, "CAST(");
/* no point in showing any top-level implicit cast */
get_rule_expr(node, context, false);
appendStringInfo(buf, " AS %s)",
format_type_with_typemod(exprType(node),
exprTypmod(node)));
}
}
/*
* Helper function to identify node types that satisfy func_expr_windowless.
* If in doubt, "false" is always a safe answer.
*/
static bool
looks_like_function(Node *node)
{
if (node == NULL)
return false; /* probably shouldn't happen */
switch (nodeTag(node))
{
case T_FuncExpr:
/* OK, unless it's going to deparse as a cast */
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL);
case T_NullIfExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_SQLValueFunction:
case T_XmlExpr:
/* these are all accepted by func_expr_common_subexpr */
return true;
default:
break;
}
return false;
}
/*
* get_oper_expr - Parse back an OpExpr node
@ -6385,7 +6432,7 @@ get_const_expr(Const *constval, deparse_context *context, int showtype)
else
{
appendStringInfo(buf, "'%s'", extval);
needlabel = true; /* we must attach a cast */
needlabel = true; /* we must attach a cast */
}
break;
@ -6404,7 +6451,7 @@ get_const_expr(Const *constval, deparse_context *context, int showtype)
else
{
appendStringInfo(buf, "'%s'", extval);
needlabel = true; /* we must attach a cast */
needlabel = true; /* we must attach a cast */
}
break;
@ -6566,8 +6613,8 @@ get_sublink_expr(SubLink *sublink, deparse_context *context)
get_rule_expr(linitial(opexpr->args), context, true);
if (!opname)
opname = generate_operator_name(opexpr->opno,
exprType(linitial(opexpr->args)),
exprType(lsecond(opexpr->args)));
exprType(linitial(opexpr->args)),
exprType(lsecond(opexpr->args)));
sep = ", ";
}
appendStringInfoChar(buf, ')');
@ -6581,7 +6628,7 @@ get_sublink_expr(SubLink *sublink, deparse_context *context)
get_rule_expr((Node *) rcexpr->largs, context, true);
opname = generate_operator_name(linitial_oid(rcexpr->opnos),
exprType(linitial(rcexpr->largs)),
exprType(linitial(rcexpr->rargs)));
exprType(linitial(rcexpr->rargs)));
appendStringInfoChar(buf, ')');
}
else
@ -6598,7 +6645,7 @@ get_sublink_expr(SubLink *sublink, deparse_context *context)
break;
case ANY_SUBLINK:
if (strcmp(opname, "=") == 0) /* Represent = ANY as IN */
if (strcmp(opname, "=") == 0) /* Represent = ANY as IN */
appendStringInfoString(buf, " IN ");
else
appendStringInfo(buf, " %s ANY ", opname);
@ -6921,7 +6968,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (list_length(rte->functions) == 1 &&
(rtfunc1->funccolnames == NIL || !rte->funcordinality))
{
get_rule_expr(rtfunc1->funcexpr, context, true);
get_rule_expr_funccall(rtfunc1->funcexpr, context, true);
/* we'll print the coldeflist below, if it has one */
}
else
@ -6984,7 +7031,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (funcno > 0)
appendStringInfoString(buf, ", ");
get_rule_expr(rtfunc->funcexpr, context, true);
get_rule_expr_funccall(rtfunc->funcexpr, context, true);
if (rtfunc->funccolnames != NIL)
{
/* Reconstruct the column definition list */
@ -7177,6 +7224,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (!PRETTY_PAREN(context))
appendStringInfoChar(buf, ')');
}
else if (j->jointype != JOIN_INNER)
{
/* If we didn't say CROSS JOIN above, we must provide an ON */
appendStringInfoString(buf, " ON TRUE");
}
if (!PRETTY_PAREN(context) || j->alias != NULL)
appendStringInfoChar(buf, ')');
@ -7373,13 +7425,17 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
*
* We strip any top-level FieldStore or assignment ArrayRef nodes that
* appear in the input, printing them as decoration for the base column
* name (which we assume the caller just printed). Return the subexpression
* that's to be assigned.
* name (which we assume the caller just printed). We might also need to
* strip CoerceToDomain nodes, but only ones that appear above assignment
* nodes.
*
* Returns the subexpression that's to be assigned.
*/
static Node *
processIndirection(Node *node, deparse_context *context)
{
StringInfo buf = context->buf;
CoerceToDomain *cdomain = NULL;
for (;;)
{
@ -7404,7 +7460,7 @@ processIndirection(Node *node, deparse_context *context)
*/
Assert(list_length(fstore->fieldnums) == 1);
fieldname = get_relid_attribute_name(typrelid,
linitial_int(fstore->fieldnums));
linitial_int(fstore->fieldnums));
appendStringInfo(buf, ".%s", quote_identifier(fieldname));
/*
@ -7427,10 +7483,28 @@ processIndirection(Node *node, deparse_context *context)
*/
node = (Node *) aref->refassgnexpr;
}
else if (IsA(node, CoerceToDomain))
{
cdomain = (CoerceToDomain *) node;
/* If it's an explicit domain coercion, we're done */
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
/* Tentatively descend past the CoerceToDomain */
node = (Node *) cdomain->arg;
}
else
break;
}
/*
* If we descended past a CoerceToDomain whose argument turned out not to
* be a FieldStore or array assignment, back up to the CoerceToDomain.
* (This is not enough to be fully correct if there are nested implicit
* CoerceToDomains, but such cases shouldn't ever occur.)
*/
if (cdomain && node == (Node *) cdomain->arg)
node = (Node *) cdomain;
return node;
}
@ -7791,4 +7865,44 @@ generate_operator_name(Oid operid, Oid arg1, Oid arg2)
return buf.data;
}
/*
* get_one_range_partition_bound_string
* A C string representation of one range partition bound
*/
char *
get_range_partbound_string(List *bound_datums)
{
deparse_context context;
StringInfo buf = makeStringInfo();
ListCell *cell;
char *sep;
memset(&context, 0, sizeof(deparse_context));
context.buf = buf;
appendStringInfoString(buf, "(");
sep = "";
foreach(cell, bound_datums)
{
PartitionRangeDatum *datum =
castNode(PartitionRangeDatum, lfirst(cell));
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
Const *val = castNode(Const, datum->value);
get_const_expr(val, &context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
return buf->data;
}
#endif /* (PG_VERSION_NUM >= 100000) */

View File

@ -386,6 +386,9 @@ static void get_rule_expr(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_toplevel(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit);
static bool looks_like_function(Node *node);
static void get_oper_expr(OpExpr *expr, deparse_context *context);
static void get_func_expr(FuncExpr *expr, deparse_context *context,
bool showimplicit);
@ -3282,8 +3285,11 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/*
* We must dig down into the expr to see if it's a PARAM_MULTIEXPR
* Param. That could be buried under FieldStores and ArrayRefs
* (cf processIndirection()), and underneath those there could be
* an implicit type coercion.
* and CoerceToDomains (cf processIndirection()), and underneath
* those there could be an implicit type coercion. Because we
* would ignore implicit type coercions anyway, we don't need to
* be as careful as processIndirection() is about descending past
* implicit CoerceToDomains.
*/
expr = (Node *) tle->expr;
while (expr)
@ -3302,6 +3308,14 @@ get_update_query_targetlist_def(Query *query, List *targetList,
break;
expr = (Node *) aref->refassgnexpr;
}
else if (IsA(expr, CoerceToDomain))
{
CoerceToDomain *cdomain = (CoerceToDomain *) expr;
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
expr = (Node *) cdomain->arg;
}
else
break;
}
@ -5763,6 +5777,63 @@ get_rule_expr_toplevel(Node *node, deparse_context *context,
get_rule_expr(node, context, showimplicit);
}
/*
* get_rule_expr_funccall - Parse back a function-call expression
*
* Same as get_rule_expr(), except that we guarantee that the output will
* look like a function call, or like one of the things the grammar treats as
* equivalent to a function call (see the func_expr_windowless production).
* This is needed in places where the grammar uses func_expr_windowless and
* you can't substitute a parenthesized a_expr. If what we have isn't going
* to look like a function call, wrap it in a dummy CAST() expression, which
* will satisfy the grammar --- and, indeed, is likely what the user wrote to
* produce such a thing.
*/
static void
get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit)
{
if (looks_like_function(node))
get_rule_expr(node, context, showimplicit);
else
{
StringInfo buf = context->buf;
appendStringInfoString(buf, "CAST(");
/* no point in showing any top-level implicit cast */
get_rule_expr(node, context, false);
appendStringInfo(buf, " AS %s)",
format_type_with_typemod(exprType(node),
exprTypmod(node)));
}
}
/*
* Helper function to identify node types that satisfy func_expr_windowless.
* If in doubt, "false" is always a safe answer.
*/
static bool
looks_like_function(Node *node)
{
if (node == NULL)
return false; /* probably shouldn't happen */
switch (nodeTag(node))
{
case T_FuncExpr:
/* OK, unless it's going to deparse as a cast */
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL);
case T_NullIfExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_XmlExpr:
/* these are all accepted by func_expr_common_subexpr */
return true;
default:
break;
}
return false;
}
/*
* get_oper_expr - Parse back an OpExpr node
@ -6641,7 +6712,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (list_length(rte->functions) == 1 &&
(rtfunc1->funccolnames == NIL || !rte->funcordinality))
{
get_rule_expr(rtfunc1->funcexpr, context, true);
get_rule_expr_funccall(rtfunc1->funcexpr, context, true);
/* we'll print the coldeflist below, if it has one */
}
else
@ -6704,7 +6775,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (funcno > 0)
appendStringInfoString(buf, ", ");
get_rule_expr(rtfunc->funcexpr, context, true);
get_rule_expr_funccall(rtfunc->funcexpr, context, true);
if (rtfunc->funccolnames != NIL)
{
/* Reconstruct the column definition list */
@ -6894,6 +6965,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (!PRETTY_PAREN(context))
appendStringInfoChar(buf, ')');
}
else if (j->jointype != JOIN_INNER)
{
/* If we didn't say CROSS JOIN above, we must provide an ON */
appendStringInfoString(buf, " ON TRUE");
}
if (!PRETTY_PAREN(context) || j->alias != NULL)
appendStringInfoChar(buf, ')');
@ -7090,13 +7166,17 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
*
* We strip any top-level FieldStore or assignment ArrayRef nodes that
* appear in the input, printing them as decoration for the base column
* name (which we assume the caller just printed). Return the subexpression
* that's to be assigned.
* name (which we assume the caller just printed). We might also need to
* strip CoerceToDomain nodes, but only ones that appear above assignment
* nodes.
*
* Returns the subexpression that's to be assigned.
*/
static Node *
processIndirection(Node *node, deparse_context *context)
{
StringInfo buf = context->buf;
CoerceToDomain *cdomain = NULL;
for (;;)
{
@ -7144,10 +7224,28 @@ processIndirection(Node *node, deparse_context *context)
*/
node = (Node *) aref->refassgnexpr;
}
else if (IsA(node, CoerceToDomain))
{
cdomain = (CoerceToDomain *) node;
/* If it's an explicit domain coercion, we're done */
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
/* Tentatively descend past the CoerceToDomain */
node = (Node *) cdomain->arg;
}
else
break;
}
/*
* If we descended past a CoerceToDomain whose argument turned out not to
* be a FieldStore or array assignment, back up to the CoerceToDomain.
* (This is not enough to be fully correct if there are nested implicit
* CoerceToDomains, but such cases shouldn't ever occur.)
*/
if (cdomain && node == (Node *) cdomain->arg)
node = (Node *) cdomain;
return node;
}

View File

@ -56,10 +56,12 @@ bool ExpireCachedShards = false;
/* Local functions forward declarations */
static void FetchRegularFile(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename);
static void FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename,
StringInfo localFilename);
static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
StringInfo transmitCommand, StringInfo filePath);
const char *nodeUser, StringInfo transmitCommand,
StringInfo filePath);
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
int32 fileDescriptor);
static void DeleteFile(const char *filename);
@ -131,7 +133,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -175,7 +179,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -194,11 +200,16 @@ TaskFilename(StringInfo directoryName, uint32 taskId)
}
/* Helper function to transfer the remote file in an idempotent manner. */
/*
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
* manner. It connects to the remote node as superuser to give file access.
* Callers must make sure that the file names are sanitized.
*/
static void
FetchRegularFile(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename)
FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename)
{
char *nodeUser = NULL;
StringInfo attemptFilename = NULL;
StringInfo transmitCommand = NULL;
uint32 randomId = (uint32) random();
@ -217,7 +228,11 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, attemptFilename);
/* connect as superuser to give file access */
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
attemptFilename);
if (!received)
{
ereport(ERROR, (errmsg("could not receive file \"%s\" from %s:%u",
@ -244,7 +259,7 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
* and returns false.
*/
static bool
ReceiveRegularFile(const char *nodeName, uint32 nodePort,
ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
StringInfo transmitCommand, StringInfo filePath)
{
int32 fileDescriptor = -1;
@ -276,7 +291,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort,
nodeDatabase = get_database_name(MyDatabaseId);
/* connect to remote node */
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
if (connectionId == INVALID_CONNECTION_ID)
{
ReceiveResourceCleanup(connectionId, filename, fileDescriptor);
@ -821,7 +836,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
received = ReceiveRegularFile(nodeName, nodePort, NULL, remoteCopyCommand,
localFilePath);
if (!received)
{
return false;
@ -861,8 +877,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
CommandCounterIncrement();
}
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
/*
* Copy local file into the relation. We call ProcessUtility() instead of
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
@ -881,6 +895,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
/* finally delete the temporary file we created */
DeleteFile(localFilePath->data);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
return true;
}
@ -896,6 +912,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
static bool
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
{
const char *nodeUser = NULL;
StringInfo localFilePath = NULL;
StringInfo remoteFilePath = NULL;
StringInfo transmitCommand = NULL;
@ -922,7 +939,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilePath->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, localFilePath);
/*
* We allow some arbitrary input in the file name and connect to the remote
* node as superuser to transmit. Therefore, we only allow calling this
* function when already running as superuser.
*/
EnsureSuperUser();
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
localFilePath);
if (!received)
{
return false;
@ -1239,7 +1265,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
sourceCopyCommand = makeStringInfo();
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand,
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
localFilePath);
if (!received)
{

View File

@ -98,7 +98,7 @@ typedef struct WaitInfo
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
const char *nodeDatabase);
const char *nodeDatabase, const char *nodeUser);
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);

View File

@ -156,6 +156,7 @@ typedef struct TaskTracker
{
uint32 workerPort; /* node's port; part of hash table key */
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
char *userName; /* which user to connect as */
TrackerStatus trackerStatus;
int32 connectionId;
uint32 connectPollCount;

View File

@ -129,7 +129,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-15';
SHOW citus.version;
citus.version
---------------
7.0devel
7.0.2
(1 row)
-- ensure no objects were created outside pg_catalog

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
create_distributed_table
--------------------------
@ -56,6 +56,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
ERROR: absolute path not allowed
-- check full permission
SET ROLE full_access;
EXECUTE prepare_insert(1);
@ -85,7 +88,22 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
-- check read permission
SET ROLE read_access;
EXECUTE prepare_insert(1);
@ -117,6 +135,17 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- check no permission
SET ROLE no_access;
@ -133,6 +162,13 @@ ERROR: permission denied for relation test
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
ERROR: permission denied for relation test
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
ERROR: permission denied for relation test
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
RESET ROLE;
DROP TABLE test;

View File

@ -998,6 +998,42 @@ SELECT key, value FROM domain_partition_column_table ORDER BY key;
(6 rows)
DROP TABLE domain_partition_column_table;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
);
SELECT create_distributed_table('http_request', 'site_id');
create_distributed_table
--------------------------
(1 row)
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
1, clock_timestamp(), 'http://example.com/path', 'USA',
inet '88.250.10.123', 200, 10
);
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
count
-------
6
(1 row)
DROP TABLE http_request;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will

View File

@ -10,7 +10,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
-- turn off propagation to avoid Enterprise processing the following section
@ -47,6 +47,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
-- check full permission
SET ROLE full_access;
@ -59,8 +62,18 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
-- check read permission
SET ROLE read_access;
@ -73,6 +86,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- check no permission
@ -87,6 +107,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
RESET ROLE;

View File

@ -533,6 +533,37 @@ SELECT key, value FROM domain_partition_column_table ORDER BY key;
DROP TABLE domain_partition_column_table;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
);
SELECT create_distributed_table('http_request', 'site_id');
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
1, clock_timestamp(), 'http://example.com/path', 'USA',
inet '88.250.10.123', 200, 10
);
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
DROP TABLE http_request;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will