Compare commits

...

29 Commits
main ... v7.0.3

Author SHA1 Message Date
Burak Yucesoy e5a156ba63 Bump Citus version to 7.0.3 2017-10-16 11:51:02 +03:00
Burak Yucesoy 398438e488 Add CHANGELOG entry for 7.0.3 2017-10-16 11:49:33 +03:00
Marco Slot 55dfbc389d Use local group ID when querying for prepared transactions 2017-10-16 11:48:58 +03:00
Marco Slot 08e0182477 Invalidate worker and group ID cache in maintenance daemon 2017-10-16 11:48:43 +03:00
Marco Slot 75073a5bc3 Use unique constraint index for transaction record deletion 2017-10-16 11:48:17 +03:00
Onder Kalaci 0770754cb6 Skip relation extension locks
We should skip if the process blocked on the relation
extension since those locks are hold for a short duration
while the relation is actually extended on the disk and
released as soon as the extension is done. Thus, recording
such waits on our lock graphs could yield detecting wrong
distributed deadlocks.
2017-10-16 11:48:01 +03:00
Murat Tuncer ee36516a5a Prevent crash when remote transaction start fails (#1662)
We sent multiple commands to worker when starting a transaction.
Previously we only checked the result of the first command that
is transaction 'BEGIN' which always succeeds. Any failure on
following commands were not checked.

With this commit, we make sure all command results are checked.
If there is any error we report the first error found.
2017-10-16 11:47:43 +03:00
Andres Freund 458762087a Fix possible shard cache incoherency.
When a table and it's shards are dropped, and afterwards the same
shard identifiers are reused, e.g. due to a DROP & CREATE EXTENSION,
the old entry in the shard cache and the required entry in the shard
cache might be for different tables.

Force invalidation for both old and new table to fix.
2017-10-16 11:45:56 +03:00
Onder Kalaci 09e7dbd06f Make the tests produce more consistent outputs 2017-10-16 11:45:20 +03:00
Onder Kalaci 39b943cbad Properly copy and trim the error messages that come from pg_conn
When a NULL connection is provided to PQerrorMessage(), the
returned error message is a static text. Modifying that static
text, which doesn't necessarly be in a writeable memory, is
dangreous and might cause a segfault.
2017-10-16 11:45:20 +03:00
Onder Kalaci 4369777101 Ensure schema exists on reference table creation
If the schema doesn't exists on the workers, create it.
2017-10-16 11:43:37 +03:00
Jason Petersen 8e7b954a2f
Modify version-output tests for PostgreSQL 11
Basically we just care whether the running version is before or after
PostgreSQL 10, so testing the major version against 9 and printing a
boolean is sufficient.
2017-10-05 11:36:19 -06:00
Burak Yucesoy 1f83929e80 Bump Citus version to 7.0.2 2017-09-28 10:39:25 -07:00
Burak Yucesoy abf9bb6b86 Add CHANGELOG entry for 7.0.2 2017-09-28 10:38:50 -07:00
Marco Slot cdea47e5c3 Add multi-user re-partitioning regression tests 2017-09-28 15:29:17 +02:00
Marco Slot c565ab55aa Execute transmit commands as superuser during task-tracker queries 2017-09-28 15:29:17 +02:00
Marco Slot d072e06746 Check for absolute paths in COPY with format transmit 2017-09-28 15:29:17 +02:00
Marco Slot ea52bee891 Allow read-only users to run task-tracker queries 2017-09-28 15:29:17 +02:00
Burak Yucesoy 4873d771e8 Bump Citus version to 7.0.1 2017-09-12 17:25:38 -07:00
Burak Yucesoy e1a641fe9b Add CHANGELOG entry for 7.0.1 release 2017-09-12 17:13:52 -07:00
Marco Slot 6b5baf21fb Wait for I/O to finish after PQputCopyData 2017-09-12 17:13:52 -07:00
Marco Slot 8c0274cba6 Free per-tuple COPY memory in INSERT...SELECT 2017-09-12 17:13:52 -07:00
Marco Slot 907048ace8 Add volatile function in prepared statement regression test 2017-09-12 17:13:52 -07:00
Marco Slot d8bb32bd5a Always copy MultiPlan in GetMultiPlan 2017-09-12 17:13:52 -07:00
Jason Petersen 0012d70b1b Add clarifying comment to RngVarCallbackForDropIdx
We don't need the PARTITION-related logic recently added in PostgreSQL.
2017-09-12 17:13:52 -07:00
Jason Petersen 44ddef6fe8 Update ruleutils_10 with latest PostgreSQL changes
See:
	postgres/postgres@21d304dfed
	postgres/postgres@bb5d6e80b1
	postgres/postgres@d363d42bb9
	postgres/postgres@eb145fdfea
	postgres/postgres@decb08ebdf
	postgres/postgres@a3ca72ae9a
	postgres/postgres@bc2d716ad0
	postgres/postgres@382ceffdf7
	postgres/postgres@c7b8998ebb
	postgres/postgres@e3860ffa4d
	postgres/postgres@76a3df6e5e
2017-09-12 17:13:52 -07:00
Jason Petersen 0ec41de26c Update ruleutils_96 with latest PostgreSQL changes
See:
	postgres/postgres@41ada83774
	postgres/postgres@3b0c2dbed0
	postgres/postgres@ff2d537223
2017-09-12 17:13:52 -07:00
Burak Yucesoy 34c6bd4b44 Bump configure PACKAGE_VERSION 2017-08-28 16:45:20 +03:00
Burak Yucesoy d607368a9e Add CHANGELOG entry for 7.0 release 2017-08-28 16:41:14 +03:00
56 changed files with 1328 additions and 314 deletions

View File

@ -1,6 +1,94 @@
### citus v7.0.0 (unreleased) ###
### citus v7.0.3 (October 16, 2017) ###
* Replaces pg_dist_shard_placement metadata table with pg_dist_placement
* Fixes several bugs that could cause crash
* Fixes a bug that could cause deadlock while creating reference tables
* Fixes a bug that could cause false-positives in deadlock detection
* Fixes a bug that could cause 2PC recovery not to work from MX workers
* Fixes a bug that could cause cache incohorency
* Fixes a bug that could cause maintenance daemon to skip cache invalidations
* Improves performance of transaction recovery by using correct index
### citus v7.0.2 (September 28, 2017) ###
* Updates task-tracker to limit file access
### citus v7.0.1 (September 12, 2017) ###
* Fixes a bug that could cause memory leaks in `INSERT ... SELECT` queries
* Fixes a bug that could cause incorrect execution of prepared statements
* Fixes a bug that could cause excessive memory usage during COPY
* Incorporates latest changes from core PostgreSQL code
### citus v7.0.0 (August 28, 2017) ###
* Adds support for PostgreSQL 10
* Drops support for PostgreSQL 9.5
* Adds support for multi-row `INSERT`
* Adds support for router `UPDATE` and `DELETE` queries with subqueries
* Adds infrastructure for distributed deadlock detection
* Deprecates `enable_deadlock_prevention` flag
* Adds support for partitioned tables
* Adds support for creating `UNLOGGED` tables
* Adds support for `SAVEPOINT`
* Adds UDF `citus_create_restore_point` for taking distributed snapshots
* Adds support for evaluating non-pushable `INSERT ... SELECT` queries
* Adds support for subquery pushdown on reference tables
* Adds shard pruning support for `IN` and `ANY`
* Adds support for `UPDATE` and `DELETE` commands that prune down to 0 shard
* Enhances transaction support by relaxing some transaction restrictions
* Fixes a bug causing crash if distributed table has no shards
* Fixes a bug causing crash when removing inactive node
* Fixes a bug causing failure during `COPY` on tables with dropped columns
* Fixes a bug causing failure during `DROP EXTENSION`
* Fixes a bug preventing executing `VACUUM` and `INSERT` concurrently
* Fixes a bug in prepared `INSERT` statements containing an implicit cast
* Fixes several issues related to statement cancellations and connections
* Fixes several 2PC related issues
* Removes an unnecessary dependency causing warning messages in pg_dump
* Adds internal infrastructure for follower clusters
* Adds internal infrastructure for progress tracking
* Implements various performance improvements
* Adds internal infrastructures and tests to improve development process
* Addresses various race conditions and deadlocks
* Improves and standardizes error messages
### citus v6.2.3 (July 13, 2017) ###

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 7.0devel.
# Generated by GNU Autoconf 2.69 for Citus 7.0.3.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='7.0devel'
PACKAGE_STRING='Citus 7.0devel'
PACKAGE_VERSION='7.0.3'
PACKAGE_STRING='Citus 7.0.3'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1195,7 +1195,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 7.0devel to adapt to many kinds of systems.
\`configure' configures Citus 7.0.3 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1256,7 +1256,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 7.0devel:";;
short | recursive ) echo "Configuration of Citus 7.0.3:";;
esac
cat <<\_ACEOF
@ -1344,7 +1344,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 7.0devel
Citus configure 7.0.3
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1401,7 +1401,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 7.0devel, which was
It was created by Citus $as_me 7.0.3, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -3564,7 +3564,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 7.0devel, which was
This file was extended by Citus $as_me 7.0.3, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -3626,7 +3626,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 7.0devel
Citus config.status 7.0.3
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [7.0devel])
AC_INIT([Citus], [7.0.3])
AC_COPYRIGHT([Copyright (c) 2012-2017, Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -236,6 +236,14 @@ create_reference_table(PG_FUNCTION_ARGS)
EnsureCoordinator();
CheckCitusVersion(ERROR);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
*/
EnsureSchemaExistsOnAllNodes(relationId);
/*
* Lock target relation with an exclusive lock - there's no way to make
* sense of this table until we've committed, and we don't want multiple

View File

@ -1205,17 +1205,8 @@ ReportCopyError(MultiConnection *connection, PGresult *result)
}
else
{
/* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL;
remoteMessage = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
/* trim the trailing characters */
remoteMessage = pchomp(PQerrorMessage(connection->pgConn));
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to complete COPY on %s:%d", connection->hostname,
@ -2010,6 +2001,14 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
copyDest->tuplesSent++;
/*
* Release per tuple memory allocated in this function. If we're writing
* the results of an INSERT ... SELECT then the SELECT execution will use
* its own executor state and reset the per tuple expression context
* separately.
*/
ResetPerTupleExprContext(executorState);
return true;
}

View File

@ -18,6 +18,7 @@
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "storage/latch.h"
#include "utils/palloc.h"
/* GUC, determining whether statements sent to remote nodes are logged */
@ -55,6 +56,8 @@ IsResponseOK(PGresult *result)
*
* Note that this might require network IO. If that's not acceptable, use
* NonblockingForgetResults().
*
* ClearResults is variant of this function which can also raise errors.
*/
void
ForgetResults(MultiConnection *connection)
@ -80,6 +83,52 @@ ForgetResults(MultiConnection *connection)
}
/*
* ClearResults clears a connection from pending activity,
* returns true if all pending commands return success. It raises
* error if raiseErrors flag is set, any command fails and transaction
* is marked critical.
*
* Note that this might require network IO. If that's not acceptable, use
* NonblockingForgetResults().
*/
bool
ClearResults(MultiConnection *connection, bool raiseErrors)
{
bool success = true;
while (true)
{
PGresult *result = GetRemoteCommandResult(connection, raiseErrors);
if (result == NULL)
{
break;
}
/*
* End any pending copy operation. Transaction will be marked
* as failed by the following part.
*/
if (PQresultStatus(result) == PGRES_COPY_IN)
{
PQputCopyEnd(connection->pgConn, NULL);
}
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
success = false;
}
PQclear(result);
}
return success;
}
/*
* NonblockingForgetResults clears a connection from pending activity if doing
* so does not require network IO. Returns true if successful, false
@ -191,7 +240,7 @@ ReportConnectionError(MultiConnection *connection, int elevel)
int nodePort = connection->port;
ereport(elevel, (errmsg("connection error: %s:%d", nodeName, nodePort),
errdetail("%s", PQerrorMessage(connection->pgConn))));
errdetail("%s", pchomp(PQerrorMessage(connection->pgConn)))));
}
@ -229,16 +278,7 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
*/
if (messagePrimary == NULL)
{
char *lastNewlineIndex = NULL;
messagePrimary = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(messagePrimary, '\n');
/* trim trailing newline, if any */
if (lastNewlineIndex != NULL)
{
*lastNewlineIndex = '\0';
}
messagePrimary = pchomp(PQerrorMessage(connection->pgConn));
}
ereport(elevel, (errcode(sqlState), errmsg("%s", messagePrimary),
@ -257,6 +297,28 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
}
/* *INDENT-OFF* */
#if (PG_VERSION_NUM < 100000)
/*
* Make copy of string with all trailing newline characters removed.
*/
char *
pchomp(const char *in)
{
size_t n;
n = strlen(in);
while (n > 0 && in[n - 1] == '\n')
n--;
return pnstrdup(in, n);
}
#endif
/* *INDENT-ON* */
/*
* LogRemoteCommand logs commands send to remote nodes if
* citus.log_remote_commands wants us to do so.
@ -497,6 +559,7 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
bool allowInterrupts = true;
if (PQstatus(pgConn) != CONNECTION_OK)
{
@ -506,21 +569,22 @@ PutRemoteCopyData(MultiConnection *connection, const char *buffer, int nbytes)
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyData(pgConn, buffer, nbytes);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
/*
* PQputCopyData may have queued up part of the data even if it managed
* to send some of it succesfully. We provide back pressure by waiting
* until the socket is writable to prevent the internal libpq buffers
* from growing excessively.
*
* In the future, we could reduce the frequency of these pushbacks to
* achieve higher throughput.
*/
return FinishConnectionIO(connection, allowInterrupts);
}
}
@ -535,6 +599,7 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
{
PGconn *pgConn = connection->pgConn;
int copyState = 0;
bool allowInterrupts = true;
if (PQstatus(pgConn) != CONNECTION_OK)
{
@ -544,21 +609,14 @@ PutRemoteCopyEnd(MultiConnection *connection, const char *errormsg)
Assert(PQisnonblocking(pgConn));
copyState = PQputCopyEnd(pgConn, errormsg);
if (copyState == 1)
{
/* successful */
return true;
}
else if (copyState == -1)
if (copyState == -1)
{
return false;
}
else
{
bool allowInterrupts = true;
/* see PutRemoteCopyData() */
return FinishConnectionIO(connection, allowInterrupts);
}
}

View File

@ -127,7 +127,8 @@ MultiClientConnect(const char *nodeName, uint32 nodePort, const char *nodeDataba
* error and returns INVALID_CONNECTION_ID.
*/
int32
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase)
MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeDatabase,
const char *userName)
{
MultiConnection *connection = NULL;
ConnStatusType connStatusType = CONNECTION_OK;
@ -148,7 +149,8 @@ MultiClientConnectStart(const char *nodeName, uint32 nodePort, const char *nodeD
}
/* prepare asynchronous request for worker node connection */
connection = StartNodeConnection(connectionFlags, nodeName, nodePort);
connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort,
userName, nodeDatabase);
connStatusType = PQstatus(connection->pgConn);
/*
@ -305,7 +307,7 @@ MultiClientSendQuery(int32 connectionId, const char *query)
querySent = PQsendQuery(connection->pgConn, query);
if (querySent == 0)
{
char *errorMessage = PQerrorMessage(connection->pgConn);
char *errorMessage = pchomp(PQerrorMessage(connection->pgConn));
ereport(WARNING, (errmsg("could not send remote query \"%s\"", query),
errdetail("Client error: %s", errorMessage)));

View File

@ -272,7 +272,8 @@ ManageTaskExecution(Task *task, TaskExecution *taskExecution,
/* we use the same database name on the master and worker nodes */
nodeDatabase = get_database_name(MyDatabaseId);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase);
connectionId = MultiClientConnectStart(nodeName, nodePort, nodeDatabase,
NULL);
connectionIdArray[currentIndex] = connectionId;
/* if valid, poll the connection until the connection is initiated */

View File

@ -71,7 +71,8 @@ static Task * TaskHashLookup(HTAB *trackerHash, TaskType taskType, uint64 jobId,
uint32 taskId);
static bool TopLevelTask(Task *task);
static bool TransmitExecutionCompleted(TaskExecution *taskExecution);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList);
static HTAB * TrackerHash(const char *taskTrackerHashName, List *workerNodeList,
char *userName);
static HTAB * TrackerHashCreate(const char *taskTrackerHashName,
uint32 taskTrackerHashSize);
static TaskTracker * TrackerHashEnter(HTAB *taskTrackerHash, char *nodeName,
@ -160,6 +161,7 @@ MultiTaskTrackerExecute(Job *job)
List *workerNodeList = NIL;
HTAB *taskTrackerHash = NULL;
HTAB *transmitTrackerHash = NULL;
char *extensionOwner = CitusExtensionOwnerName();
const char *taskTrackerHashName = "Task Tracker Hash";
const char *transmitTrackerHashName = "Transmit Tracker Hash";
List *jobIdList = NIL;
@ -201,8 +203,12 @@ MultiTaskTrackerExecute(Job *job)
workerNodeList = ActivePrimaryNodeList();
taskTrackerCount = (uint32) list_length(workerNodeList);
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList);
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList);
/* connect as the current user for running queries */
taskTrackerHash = TrackerHash(taskTrackerHashName, workerNodeList, NULL);
/* connect as the superuser for fetching result files */
transmitTrackerHash = TrackerHash(transmitTrackerHashName, workerNodeList,
extensionOwner);
TrackerHashConnect(taskTrackerHash);
TrackerHashConnect(transmitTrackerHash);
@ -666,10 +672,11 @@ TransmitExecutionCompleted(TaskExecution *taskExecution)
/*
* TrackerHash creates a task tracker hash with the given name. The function
* then inserts one task tracker entry for each node in the given worker node
* list, and initializes state for each task tracker.
* list, and initializes state for each task tracker. The userName argument
* indicates which user to connect as.
*/
static HTAB *
TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
TrackerHash(const char *taskTrackerHashName, List *workerNodeList, char *userName)
{
/* create task tracker hash */
uint32 taskTrackerHashSize = list_length(workerNodeList);
@ -711,6 +718,7 @@ TrackerHash(const char *taskTrackerHashName, List *workerNodeList)
}
taskTracker->taskStateHash = taskStateHash;
taskTracker->userName = userName;
}
return taskTrackerHash;
@ -844,9 +852,10 @@ TrackerConnectPoll(TaskTracker *taskTracker)
char *nodeName = taskTracker->workerName;
uint32 nodePort = taskTracker->workerPort;
char *nodeDatabase = get_database_name(MyDatabaseId);
char *nodeUser = taskTracker->userName;
int32 connectionId = MultiClientConnectStart(nodeName, nodePort,
nodeDatabase);
nodeDatabase, nodeUser);
if (connectionId != INVALID_CONNECTION_ID)
{
taskTracker->connectionId = connectionId;

View File

@ -634,6 +634,10 @@ IsTransmitStmt(Node *parsetree)
static void
VerifyTransmitStmt(CopyStmt *copyStatement)
{
char *fileName = NULL;
EnsureSuperUser();
/* do some minimal option verification */
if (copyStatement->relation == NULL ||
copyStatement->relation->relname == NULL)
@ -642,6 +646,20 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
errmsg("FORMAT 'transmit' requires a target file")));
}
fileName = copyStatement->relation->relname;
if (is_absolute_path(fileName))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("absolute path not allowed"))));
}
else if (!path_is_relative_and_below_cwd(fileName))
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("path must be in or below the current directory"))));
}
if (copyStatement->filename != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -3111,7 +3129,9 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
*
* This code is heavily borrowed from RangeVarCallbackForDropRelation() in
* commands/tablecmds.c in Postgres source. We need this to ensure the right
* order of locking while dealing with DROP INDEX statments.
* order of locking while dealing with DROP INDEX statments. Because we are
* exclusively using this callback for INDEX processing, the PARTITION-related
* logic from PostgreSQL's similar callback has been omitted as unneeded.
*/
static void
RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg)

View File

@ -444,7 +444,12 @@ StoreErrorMessage(MultiConnection *connection, StringInfo queryResultString)
char *errorMessage = PQerrorMessage(connection->pgConn);
if (errorMessage != NULL)
{
char *firstNewlineIndex = strchr(errorMessage, '\n');
char *firstNewlineIndex = NULL;
/* copy the error message to a writable memory */
errorMessage = pnstrdup(errorMessage, strlen(errorMessage));
firstNewlineIndex = strchr(errorMessage, '\n');
/* trim the error message at the line break */
if (firstNewlineIndex != NULL)

View File

@ -467,7 +467,14 @@ GetMultiPlan(CustomScan *customScan)
node = CheckNodeCopyAndSerialization(node);
multiPlan = (MultiPlan *) node;
/*
* When using prepared statements the same plan gets reused across
* multiple statements and transactions. We make several modifications
* to the MultiPlan during execution such as assigning task placements
* and evaluating functions and parameters. These changes should not
* persist, so we always work on a copy.
*/
multiPlan = (MultiPlan *) copyObject(node);
return multiPlan;
}

View File

@ -49,6 +49,7 @@ static bool ParseBoolField(PGresult *result, int rowIndex, int colIndex);
static TimestampTz ParseTimestampTzField(PGresult *result, int rowIndex, int colIndex);
static void ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo);
static WaitGraph * BuildLocalWaitGraph(void);
static bool IsProcessWaitingForRelationExtension(PGPROC *proc);
static void LockLockData(void);
static void UnlockLockData(void);
static void AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc,
@ -447,6 +448,12 @@ BuildLocalWaitGraph(void)
continue;
}
/* skip if the process is blocked for relation extension */
if (IsProcessWaitingForRelationExtension(currentProc))
{
continue;
}
AddProcToVisit(&remaining, currentProc);
}
@ -460,6 +467,12 @@ BuildLocalWaitGraph(void)
continue;
}
/* skip if the process is blocked for relation extension */
if (IsProcessWaitingForRelationExtension(waitingProc))
{
continue;
}
/*
* Record an edge for everyone already holding the lock in a
* conflicting manner ("hard edges" in postgres parlance).
@ -479,6 +492,36 @@ BuildLocalWaitGraph(void)
}
/*
* IsProcessWaitingForRelationExtension returns true if the given PROC
* waiting on relation extension lock.
*
* In general for the purpose of distributed deadlock detection, we should
* skip if the process blocked on the relation extension. Those locks are
* held for a short duration while the relation is actually extended on
* the disk and released as soon as the extension is done, even before the
* execution of the command that triggered the extension finishes. Thus,
* recording such waits on our lock graphs could yield detecting wrong
* distributed deadlocks.
*/
static bool
IsProcessWaitingForRelationExtension(PGPROC *proc)
{
PROCLOCK *waitProcLock = NULL;
LOCK *waitLock = NULL;
if (proc->waitStatus != STATUS_WAITING)
{
return false;
}
waitProcLock = proc->waitProcLock;
waitLock = waitProcLock->tag.myLock;
return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND;
}
/*
* LockLockData takes locks the shared lock data structure, which prevents
* concurrent lock acquisitions/releases.
@ -550,9 +593,14 @@ AddEdgesForLockWaits(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remai
{
PGPROC *currentProc = procLock->tag.myProc;
/* skip processes from the same lock group and ones that don't conflict */
/*
* Skip processes from the same lock group, processes that don't conflict,
* and processes that are waiting on a relation extension lock, which
* will be released shortly.
*/
if (!IsSameLockGroup(waitingProc, currentProc) &&
IsConflictingLockMask(procLock->holdMask, conflictMask))
IsConflictingLockMask(procLock->holdMask, conflictMask) &&
!IsProcessWaitingForRelationExtension(currentProc))
{
AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
}
@ -590,9 +638,14 @@ AddEdgesForWaitQueue(WaitGraph *waitGraph, PGPROC *waitingProc, PROCStack *remai
{
int awaitMask = LOCKBIT_ON(currentProc->waitLockMode);
/* skip processes from the same lock group and ones that don't conflict */
/*
* Skip processes from the same lock group, processes that don't conflict,
* and processes that are waiting on a relation extension lock, which
* will be released shortly.
*/
if (!IsSameLockGroup(waitingProc, currentProc) &&
IsConflictingLockMask(awaitMask, conflictMask))
IsConflictingLockMask(awaitMask, conflictMask) &&
!IsProcessWaitingForRelationExtension(currentProc))
{
AddWaitEdge(waitGraph, waitingProc, currentProc, remaining);
}
@ -621,7 +674,9 @@ AddWaitEdge(WaitGraph *waitGraph, PGPROC *waitingProc, PGPROC *blockingProc,
GetBackendDataForProc(waitingProc, &waitingBackendData);
GetBackendDataForProc(blockingProc, &blockingBackendData);
curEdge->isBlockingXactWaiting = IsProcessWaitingForLock(blockingProc);
curEdge->isBlockingXactWaiting =
IsProcessWaitingForLock(blockingProc) &&
!IsProcessWaitingForRelationExtension(blockingProc);
if (curEdge->isBlockingXactWaiting)
{
AddProcToVisit(remaining, blockingProc);

View File

@ -115,26 +115,18 @@ void
FinishRemoteTransactionBegin(struct MultiConnection *connection)
{
RemoteTransaction *transaction = &connection->remoteTransaction;
PGresult *result = NULL;
const bool raiseErrors = true;
bool clearSuccessful = true;
bool raiseErrors = true;
Assert(transaction->transactionState == REMOTE_TRANS_STARTING);
result = GetRemoteCommandResult(connection, raiseErrors);
if (!IsResponseOK(result))
{
ReportResultError(connection, result, WARNING);
MarkRemoteTransactionFailed(connection, raiseErrors);
}
else
clearSuccessful = ClearResults(connection, raiseErrors);
if (clearSuccessful)
{
transaction->transactionState = REMOTE_TRANS_STARTED;
transaction->lastSuccessfulSubXact = transaction->lastQueuedSubXact;
}
PQclear(result);
ForgetResults(connection);
if (!transaction->transactionFailed)
{
Assert(PQtransactionStatus(connection->pgConn) == PQTRANS_INTRANS);

View File

@ -384,7 +384,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
int rowCount = 0;
int rowIndex = 0;
List *transactionNames = NIL;
int coordinatorId = 0;
int coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus_%d_%%'",
@ -478,8 +478,8 @@ DeleteTransactionRecord(int32 groupId, char *transactionName)
{
Relation pgDistTransaction = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
ScanKeyData scanKey[2];
int scanKeyCount = 2;
bool indexOK = true;
HeapTuple heapTuple = NULL;
bool heapTupleFound = false;
@ -488,9 +488,11 @@ DeleteTransactionRecord(int32 groupId, char *transactionName)
ScanKeyInit(&scanKey[0], Anum_pg_dist_transaction_groupid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(groupId));
ScanKeyInit(&scanKey[1], Anum_pg_dist_transaction_gid,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(transactionName));
scanDescriptor = systable_beginscan(pgDistTransaction,
DistTransactionGroupIndexId(), indexOK,
DistTransactionRecordIndexId(), indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);

View File

@ -112,6 +112,9 @@ get_colocated_shard_array(PG_FUNCTION_ARGS)
Oid arrayTypeId = OIDOID;
int colocatedShardIndex = 0;
/* sort to get consistent output */
colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById);
foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShardInterval = (ShardInterval *) lfirst(

View File

@ -281,7 +281,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* this causes us to cache a stale pg_dist_node OID. We'd actually expect
* all invalidations to arrive after obtaining a lock in LockCitusExtension.
*/
ClearMetadataOIDCache();
InvalidateMetadataSystemCache();
/*
* Perform Work. If a specific task needs to be called sooner than

View File

@ -113,6 +113,7 @@ typedef struct MetadataCacheData
Oid distPlacementGroupidIndexId;
Oid distTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid distTransactionRecordIndexId;
Oid extraDataContainerFuncId;
Oid workerHashFunctionId;
Oid extensionOwner;
@ -642,11 +643,18 @@ LookupShardCacheEntry(int64 shardId)
if (!shardEntry->tableEntry->isValid)
{
Oid oldRelationId = shardEntry->tableEntry->relationId;
Oid currentRelationId = LookupShardRelation(shardId);
/*
* The cache entry might not be valid right now. Reload cache entry
* and recheck (as the offset might have changed).
* The relation OID to which the shard belongs could have changed,
* most notably when the extension is dropped and a shard ID is
* reused. Reload the cache entries for both old and new relation
* ID and then look up the shard entry again.
*/
LookupDistTableCacheEntry(shardEntry->tableEntry->relationId);
LookupDistTableCacheEntry(oldRelationId);
LookupDistTableCacheEntry(currentRelationId);
recheck = true;
}
}
@ -1789,6 +1797,17 @@ DistTransactionGroupIndexId(void)
}
/* return oid of pg_dist_transaction_unique_constraint */
Oid
DistTransactionRecordIndexId(void)
{
CachedRelationLookup("pg_dist_transaction_unique_constraint",
&MetadataCache.distTransactionRecordIndexId);
return MetadataCache.distTransactionRecordIndexId;
}
/* return oid of pg_dist_placement_groupid_index */
Oid
DistPlacementGroupidIndexId(void)
@ -2694,18 +2713,21 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
*/
if (relationId != InvalidOid && relationId == MetadataCache.distPartitionRelationId)
{
ClearMetadataOIDCache();
InvalidateMetadataSystemCache();
}
}
/*
* ClearMetadataOIDCache resets all the cached OIDs and the extensionLoaded flag.
* InvalidateMetadataSystemCache resets all the cached OIDs and the extensionLoaded flag,
* and invalidates the worker node and local group ID caches.
*/
void
ClearMetadataOIDCache(void)
InvalidateMetadataSystemCache(void)
{
memset(&MetadataCache, 0, sizeof(MetadataCache));
workerNodeHashValid = false;
LocalGroupId = -1;
}

View File

@ -125,8 +125,8 @@ typedef struct
bool varprefix; /* TRUE to print prefixes on Vars */
Oid distrelid; /* the distributed table being modified, if valid */
int64 shardid; /* a distributed table's shardid, if positive */
ParseExprKind special_exprkind; /* set only for exprkinds needing
* special handling */
ParseExprKind special_exprkind; /* set only for exprkinds needing special
* handling */
} deparse_context;
/*
@ -390,6 +390,9 @@ static void get_rule_expr(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_toplevel(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit);
static bool looks_like_function(Node *node);
static void get_oper_expr(OpExpr *expr, deparse_context *context);
static void get_func_expr(FuncExpr *expr, deparse_context *context,
bool showimplicit);
@ -3299,8 +3302,11 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/*
* We must dig down into the expr to see if it's a PARAM_MULTIEXPR
* Param. That could be buried under FieldStores and ArrayRefs
* (cf processIndirection()), and underneath those there could be
* an implicit type coercion.
* and CoerceToDomains (cf processIndirection()), and underneath
* those there could be an implicit type coercion. Because we
* would ignore implicit type coercions anyway, we don't need to
* be as careful as processIndirection() is about descending past
* implicit CoerceToDomains.
*/
expr = (Node *) tle->expr;
while (expr)
@ -3319,6 +3325,14 @@ get_update_query_targetlist_def(Query *query, List *targetList,
break;
expr = (Node *) aref->refassgnexpr;
}
else if (IsA(expr, CoerceToDomain))
{
CoerceToDomain *cdomain = (CoerceToDomain *) expr;
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
expr = (Node *) cdomain->arg;
}
else
break;
}
@ -4423,6 +4437,7 @@ isSimpleNode(Node *node, Node *parentNode, int prettyFlags)
case T_MinMaxExpr:
case T_SQLValueFunction:
case T_XmlExpr:
case T_NextValueExpr:
case T_NullIfExpr:
case T_Aggref:
case T_WindowFunc:
@ -5752,6 +5767,22 @@ get_rule_expr(Node *node, deparse_context *context,
}
break;
case T_NextValueExpr:
{
NextValueExpr *nvexpr = (NextValueExpr *) node;
/*
* This isn't exactly nextval(), but that seems close enough
* for EXPLAIN's purposes.
*/
appendStringInfoString(buf, "nextval(");
simple_quote_literal(buf,
generate_relation_name(nvexpr->seqid,
NIL));
appendStringInfoChar(buf, ')');
}
break;
case T_InferenceElem:
{
InferenceElem *iexpr = (InferenceElem *) node;
@ -5810,12 +5841,11 @@ get_rule_expr(Node *node, deparse_context *context,
case PARTITION_STRATEGY_LIST:
Assert(spec->listdatums != NIL);
appendStringInfoString(buf, "FOR VALUES");
appendStringInfoString(buf, " IN (");
appendStringInfoString(buf, "FOR VALUES IN (");
sep = "";
foreach(cell, spec->listdatums)
{
Const *val = lfirst(cell);
Const *val = castNode(Const, lfirst(cell));
appendStringInfoString(buf, sep);
get_const_expr(val, context, -1);
@ -5831,50 +5861,9 @@ get_rule_expr(Node *node, deparse_context *context,
list_length(spec->lowerdatums) ==
list_length(spec->upperdatums));
appendStringInfoString(buf, "FOR VALUES");
appendStringInfoString(buf, " FROM");
appendStringInfoString(buf, " (");
sep = "";
foreach(cell, spec->lowerdatums)
{
PartitionRangeDatum *datum = lfirst(cell);
Const *val;
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
val = (Const *) datum->value;
get_const_expr(val, context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
appendStringInfoString(buf, " TO");
appendStringInfoString(buf, " (");
sep = "";
foreach(cell, spec->upperdatums)
{
PartitionRangeDatum *datum = lfirst(cell);
Const *val;
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
val = (Const *) datum->value;
get_const_expr(val, context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
appendStringInfo(buf, "FOR VALUES FROM %s TO %s",
get_range_partbound_string(spec->lowerdatums),
get_range_partbound_string(spec->upperdatums));
break;
default:
@ -5931,6 +5920,64 @@ get_rule_expr_toplevel(Node *node, deparse_context *context,
get_rule_expr(node, context, showimplicit);
}
/*
* get_rule_expr_funccall - Parse back a function-call expression
*
* Same as get_rule_expr(), except that we guarantee that the output will
* look like a function call, or like one of the things the grammar treats as
* equivalent to a function call (see the func_expr_windowless production).
* This is needed in places where the grammar uses func_expr_windowless and
* you can't substitute a parenthesized a_expr. If what we have isn't going
* to look like a function call, wrap it in a dummy CAST() expression, which
* will satisfy the grammar --- and, indeed, is likely what the user wrote to
* produce such a thing.
*/
static void
get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit)
{
if (looks_like_function(node))
get_rule_expr(node, context, showimplicit);
else
{
StringInfo buf = context->buf;
appendStringInfoString(buf, "CAST(");
/* no point in showing any top-level implicit cast */
get_rule_expr(node, context, false);
appendStringInfo(buf, " AS %s)",
format_type_with_typemod(exprType(node),
exprTypmod(node)));
}
}
/*
* Helper function to identify node types that satisfy func_expr_windowless.
* If in doubt, "false" is always a safe answer.
*/
static bool
looks_like_function(Node *node)
{
if (node == NULL)
return false; /* probably shouldn't happen */
switch (nodeTag(node))
{
case T_FuncExpr:
/* OK, unless it's going to deparse as a cast */
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL);
case T_NullIfExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_SQLValueFunction:
case T_XmlExpr:
/* these are all accepted by func_expr_common_subexpr */
return true;
default:
break;
}
return false;
}
/*
* get_oper_expr - Parse back an OpExpr node
@ -6921,7 +6968,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (list_length(rte->functions) == 1 &&
(rtfunc1->funccolnames == NIL || !rte->funcordinality))
{
get_rule_expr(rtfunc1->funcexpr, context, true);
get_rule_expr_funccall(rtfunc1->funcexpr, context, true);
/* we'll print the coldeflist below, if it has one */
}
else
@ -6984,7 +7031,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (funcno > 0)
appendStringInfoString(buf, ", ");
get_rule_expr(rtfunc->funcexpr, context, true);
get_rule_expr_funccall(rtfunc->funcexpr, context, true);
if (rtfunc->funccolnames != NIL)
{
/* Reconstruct the column definition list */
@ -7177,6 +7224,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (!PRETTY_PAREN(context))
appendStringInfoChar(buf, ')');
}
else if (j->jointype != JOIN_INNER)
{
/* If we didn't say CROSS JOIN above, we must provide an ON */
appendStringInfoString(buf, " ON TRUE");
}
if (!PRETTY_PAREN(context) || j->alias != NULL)
appendStringInfoChar(buf, ')');
@ -7373,13 +7425,17 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
*
* We strip any top-level FieldStore or assignment ArrayRef nodes that
* appear in the input, printing them as decoration for the base column
* name (which we assume the caller just printed). Return the subexpression
* that's to be assigned.
* name (which we assume the caller just printed). We might also need to
* strip CoerceToDomain nodes, but only ones that appear above assignment
* nodes.
*
* Returns the subexpression that's to be assigned.
*/
static Node *
processIndirection(Node *node, deparse_context *context)
{
StringInfo buf = context->buf;
CoerceToDomain *cdomain = NULL;
for (;;)
{
@ -7427,10 +7483,28 @@ processIndirection(Node *node, deparse_context *context)
*/
node = (Node *) aref->refassgnexpr;
}
else if (IsA(node, CoerceToDomain))
{
cdomain = (CoerceToDomain *) node;
/* If it's an explicit domain coercion, we're done */
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
/* Tentatively descend past the CoerceToDomain */
node = (Node *) cdomain->arg;
}
else
break;
}
/*
* If we descended past a CoerceToDomain whose argument turned out not to
* be a FieldStore or array assignment, back up to the CoerceToDomain.
* (This is not enough to be fully correct if there are nested implicit
* CoerceToDomains, but such cases shouldn't ever occur.)
*/
if (cdomain && node == (Node *) cdomain->arg)
node = (Node *) cdomain;
return node;
}
@ -7791,4 +7865,44 @@ generate_operator_name(Oid operid, Oid arg1, Oid arg2)
return buf.data;
}
/*
* get_one_range_partition_bound_string
* A C string representation of one range partition bound
*/
char *
get_range_partbound_string(List *bound_datums)
{
deparse_context context;
StringInfo buf = makeStringInfo();
ListCell *cell;
char *sep;
memset(&context, 0, sizeof(deparse_context));
context.buf = buf;
appendStringInfoString(buf, "(");
sep = "";
foreach(cell, bound_datums)
{
PartitionRangeDatum *datum =
castNode(PartitionRangeDatum, lfirst(cell));
appendStringInfoString(buf, sep);
if (datum->kind == PARTITION_RANGE_DATUM_MINVALUE)
appendStringInfoString(buf, "MINVALUE");
else if (datum->kind == PARTITION_RANGE_DATUM_MAXVALUE)
appendStringInfoString(buf, "MAXVALUE");
else
{
Const *val = castNode(Const, datum->value);
get_const_expr(val, &context, -1);
}
sep = ", ";
}
appendStringInfoString(buf, ")");
return buf->data;
}
#endif /* (PG_VERSION_NUM >= 100000) */

View File

@ -386,6 +386,9 @@ static void get_rule_expr(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_toplevel(Node *node, deparse_context *context,
bool showimplicit);
static void get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit);
static bool looks_like_function(Node *node);
static void get_oper_expr(OpExpr *expr, deparse_context *context);
static void get_func_expr(FuncExpr *expr, deparse_context *context,
bool showimplicit);
@ -3282,8 +3285,11 @@ get_update_query_targetlist_def(Query *query, List *targetList,
/*
* We must dig down into the expr to see if it's a PARAM_MULTIEXPR
* Param. That could be buried under FieldStores and ArrayRefs
* (cf processIndirection()), and underneath those there could be
* an implicit type coercion.
* and CoerceToDomains (cf processIndirection()), and underneath
* those there could be an implicit type coercion. Because we
* would ignore implicit type coercions anyway, we don't need to
* be as careful as processIndirection() is about descending past
* implicit CoerceToDomains.
*/
expr = (Node *) tle->expr;
while (expr)
@ -3302,6 +3308,14 @@ get_update_query_targetlist_def(Query *query, List *targetList,
break;
expr = (Node *) aref->refassgnexpr;
}
else if (IsA(expr, CoerceToDomain))
{
CoerceToDomain *cdomain = (CoerceToDomain *) expr;
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
expr = (Node *) cdomain->arg;
}
else
break;
}
@ -5763,6 +5777,63 @@ get_rule_expr_toplevel(Node *node, deparse_context *context,
get_rule_expr(node, context, showimplicit);
}
/*
* get_rule_expr_funccall - Parse back a function-call expression
*
* Same as get_rule_expr(), except that we guarantee that the output will
* look like a function call, or like one of the things the grammar treats as
* equivalent to a function call (see the func_expr_windowless production).
* This is needed in places where the grammar uses func_expr_windowless and
* you can't substitute a parenthesized a_expr. If what we have isn't going
* to look like a function call, wrap it in a dummy CAST() expression, which
* will satisfy the grammar --- and, indeed, is likely what the user wrote to
* produce such a thing.
*/
static void
get_rule_expr_funccall(Node *node, deparse_context *context,
bool showimplicit)
{
if (looks_like_function(node))
get_rule_expr(node, context, showimplicit);
else
{
StringInfo buf = context->buf;
appendStringInfoString(buf, "CAST(");
/* no point in showing any top-level implicit cast */
get_rule_expr(node, context, false);
appendStringInfo(buf, " AS %s)",
format_type_with_typemod(exprType(node),
exprTypmod(node)));
}
}
/*
* Helper function to identify node types that satisfy func_expr_windowless.
* If in doubt, "false" is always a safe answer.
*/
static bool
looks_like_function(Node *node)
{
if (node == NULL)
return false; /* probably shouldn't happen */
switch (nodeTag(node))
{
case T_FuncExpr:
/* OK, unless it's going to deparse as a cast */
return (((FuncExpr *) node)->funcformat == COERCE_EXPLICIT_CALL);
case T_NullIfExpr:
case T_CoalesceExpr:
case T_MinMaxExpr:
case T_XmlExpr:
/* these are all accepted by func_expr_common_subexpr */
return true;
default:
break;
}
return false;
}
/*
* get_oper_expr - Parse back an OpExpr node
@ -6641,7 +6712,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (list_length(rte->functions) == 1 &&
(rtfunc1->funccolnames == NIL || !rte->funcordinality))
{
get_rule_expr(rtfunc1->funcexpr, context, true);
get_rule_expr_funccall(rtfunc1->funcexpr, context, true);
/* we'll print the coldeflist below, if it has one */
}
else
@ -6704,7 +6775,7 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (funcno > 0)
appendStringInfoString(buf, ", ");
get_rule_expr(rtfunc->funcexpr, context, true);
get_rule_expr_funccall(rtfunc->funcexpr, context, true);
if (rtfunc->funccolnames != NIL)
{
/* Reconstruct the column definition list */
@ -6894,6 +6965,11 @@ get_from_clause_item(Node *jtnode, Query *query, deparse_context *context)
if (!PRETTY_PAREN(context))
appendStringInfoChar(buf, ')');
}
else if (j->jointype != JOIN_INNER)
{
/* If we didn't say CROSS JOIN above, we must provide an ON */
appendStringInfoString(buf, " ON TRUE");
}
if (!PRETTY_PAREN(context) || j->alias != NULL)
appendStringInfoChar(buf, ')');
@ -7090,13 +7166,17 @@ get_opclass_name(Oid opclass, Oid actual_datatype,
*
* We strip any top-level FieldStore or assignment ArrayRef nodes that
* appear in the input, printing them as decoration for the base column
* name (which we assume the caller just printed). Return the subexpression
* that's to be assigned.
* name (which we assume the caller just printed). We might also need to
* strip CoerceToDomain nodes, but only ones that appear above assignment
* nodes.
*
* Returns the subexpression that's to be assigned.
*/
static Node *
processIndirection(Node *node, deparse_context *context)
{
StringInfo buf = context->buf;
CoerceToDomain *cdomain = NULL;
for (;;)
{
@ -7144,10 +7224,28 @@ processIndirection(Node *node, deparse_context *context)
*/
node = (Node *) aref->refassgnexpr;
}
else if (IsA(node, CoerceToDomain))
{
cdomain = (CoerceToDomain *) node;
/* If it's an explicit domain coercion, we're done */
if (cdomain->coercionformat != COERCE_IMPLICIT_CAST)
break;
/* Tentatively descend past the CoerceToDomain */
node = (Node *) cdomain->arg;
}
else
break;
}
/*
* If we descended past a CoerceToDomain whose argument turned out not to
* be a FieldStore or array assignment, back up to the CoerceToDomain.
* (This is not enough to be fully correct if there are nested implicit
* CoerceToDomains, but such cases shouldn't ever occur.)
*/
if (cdomain && node == (Node *) cdomain->arg)
node = (Node *) cdomain;
return node;
}

View File

@ -56,10 +56,12 @@ bool ExpireCachedShards = false;
/* Local functions forward declarations */
static void FetchRegularFile(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename);
static void FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename,
StringInfo localFilename);
static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
StringInfo transmitCommand, StringInfo filePath);
const char *nodeUser, StringInfo transmitCommand,
StringInfo filePath);
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
int32 fileDescriptor);
static void DeleteFile(const char *filename);
@ -131,7 +133,9 @@ worker_fetch_partition_file(PG_FUNCTION_ARGS)
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -175,7 +179,9 @@ worker_fetch_query_results_file(PG_FUNCTION_ARGS)
}
nodeName = text_to_cstring(nodeNameText);
FetchRegularFile(nodeName, nodePort, remoteFilename, taskFilename);
/* we've made sure the file names are sanitized, safe to fetch as superuser */
FetchRegularFileAsSuperUser(nodeName, nodePort, remoteFilename, taskFilename);
PG_RETURN_VOID();
}
@ -194,11 +200,16 @@ TaskFilename(StringInfo directoryName, uint32 taskId)
}
/* Helper function to transfer the remote file in an idempotent manner. */
/*
* FetchRegularFileAsSuperUser copies a file from a remote node in an idempotent
* manner. It connects to the remote node as superuser to give file access.
* Callers must make sure that the file names are sanitized.
*/
static void
FetchRegularFile(const char *nodeName, uint32 nodePort,
FetchRegularFileAsSuperUser(const char *nodeName, uint32 nodePort,
StringInfo remoteFilename, StringInfo localFilename)
{
char *nodeUser = NULL;
StringInfo attemptFilename = NULL;
StringInfo transmitCommand = NULL;
uint32 randomId = (uint32) random();
@ -217,7 +228,11 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilename->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, attemptFilename);
/* connect as superuser to give file access */
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
attemptFilename);
if (!received)
{
ereport(ERROR, (errmsg("could not receive file \"%s\" from %s:%u",
@ -244,7 +259,7 @@ FetchRegularFile(const char *nodeName, uint32 nodePort,
* and returns false.
*/
static bool
ReceiveRegularFile(const char *nodeName, uint32 nodePort,
ReceiveRegularFile(const char *nodeName, uint32 nodePort, const char *nodeUser,
StringInfo transmitCommand, StringInfo filePath)
{
int32 fileDescriptor = -1;
@ -276,7 +291,7 @@ ReceiveRegularFile(const char *nodeName, uint32 nodePort,
nodeDatabase = get_database_name(MyDatabaseId);
/* connect to remote node */
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, nodeUser);
if (connectionId == INVALID_CONNECTION_ID)
{
ReceiveResourceCleanup(connectionId, filename, fileDescriptor);
@ -821,7 +836,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
remoteCopyCommand = makeStringInfo();
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
received = ReceiveRegularFile(nodeName, nodePort, NULL, remoteCopyCommand,
localFilePath);
if (!received)
{
return false;
@ -861,8 +877,6 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
CommandCounterIncrement();
}
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
/*
* Copy local file into the relation. We call ProcessUtility() instead of
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
@ -881,6 +895,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
/* finally delete the temporary file we created */
DeleteFile(localFilePath->data);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
return true;
}
@ -896,6 +912,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
static bool
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
{
const char *nodeUser = NULL;
StringInfo localFilePath = NULL;
StringInfo remoteFilePath = NULL;
StringInfo transmitCommand = NULL;
@ -922,7 +939,16 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
transmitCommand = makeStringInfo();
appendStringInfo(transmitCommand, TRANSMIT_REGULAR_COMMAND, remoteFilePath->data);
received = ReceiveRegularFile(nodeName, nodePort, transmitCommand, localFilePath);
/*
* We allow some arbitrary input in the file name and connect to the remote
* node as superuser to transmit. Therefore, we only allow calling this
* function when already running as superuser.
*/
EnsureSuperUser();
nodeUser = CitusExtensionOwnerName();
received = ReceiveRegularFile(nodeName, nodePort, nodeUser, transmitCommand,
localFilePath);
if (!received)
{
return false;
@ -1239,7 +1265,7 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
sourceCopyCommand = makeStringInfo();
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, sourceCopyCommand,
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
localFilePath);
if (!received)
{

View File

@ -86,7 +86,7 @@ extern List * DistTableOidList(void);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern void ClearMetadataOIDCache(void);
extern void InvalidateMetadataSystemCache(void);
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);
@ -117,6 +117,7 @@ extern Oid DistPlacementShardidIndexId(void);
extern Oid DistPlacementPlacementidIndexId(void);
extern Oid DistTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistTransactionRecordIndexId(void);
extern Oid DistPlacementGroupidIndexId(void);
/* function oids */

View File

@ -98,7 +98,7 @@ typedef struct WaitInfo
extern int32 MultiClientConnect(const char *nodeName, uint32 nodePort,
const char *nodeDatabase, const char *nodeUser);
extern int32 MultiClientConnectStart(const char *nodeName, uint32 nodePort,
const char *nodeDatabase);
const char *nodeDatabase, const char *nodeUser);
extern ConnectStatus MultiClientConnectPoll(int32 connectionId);
extern void MultiClientDisconnect(int32 connectionId);
extern bool MultiClientConnectionUp(int32 connectionId);

View File

@ -156,6 +156,7 @@ typedef struct TaskTracker
{
uint32 workerPort; /* node's port; part of hash table key */
char workerName[WORKER_LENGTH]; /* node's name; part of hash table key */
char *userName; /* which user to connect as */
TrackerStatus trackerStatus;
int32 connectionId;
uint32 connectPollCount;

View File

@ -26,6 +26,7 @@ extern bool LogRemoteCommands;
/* simple helpers */
extern bool IsResponseOK(struct pg_result *result);
extern void ForgetResults(MultiConnection *connection);
extern bool ClearResults(MultiConnection *connection, bool raiseErrors);
extern bool NonblockingForgetResults(MultiConnection *connection);
extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
@ -33,6 +34,7 @@ extern bool SqlStateMatchesCategory(char *sqlStateString, int category);
extern void ReportConnectionError(MultiConnection *connection, int elevel);
extern void ReportResultError(MultiConnection *connection, struct pg_result *result,
int elevel);
extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command);
/* wrappers around libpq functions, with command logging support */

View File

@ -266,39 +266,39 @@ SELECT shards_colocated(1300000, 1300020);
(1 row)
-- check co-located table list
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass;
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass ORDER BY 1;
unnest
---------------
table2_group1
table1_group1
table2_group1
(2 rows)
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass;
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass ORDER BY 1;
unnest
---------------
table5_groupx
(1 row)
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass;
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass ORDER BY 1;
unnest
---------------
table6_append
(1 row)
-- check co-located shard list
SELECT get_colocated_shard_array(1300000);
SELECT get_colocated_shard_array(1300000) ORDER BY 1;
get_colocated_shard_array
---------------------------
{1300004,1300000}
{1300000,1300004}
(1 row)
SELECT get_colocated_shard_array(1300016);
SELECT get_colocated_shard_array(1300016) ORDER BY 1;
get_colocated_shard_array
---------------------------
{1300016}
(1 row)
SELECT get_colocated_shard_array(1300020);
SELECT get_colocated_shard_array(1300020) ORDER BY 1;
get_colocated_shard_array
---------------------------
{1300020}

View File

@ -873,5 +873,60 @@ SELECT count(*) FROM tt1;
6
(1 row)
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
NOTICE: Copying data from local table...
create_reference_table
------------------------
(1 row)
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
NOTICE: Copying data from local table...
create_reference_table
------------------------
(1 row)
COMMIT;
DROP TABLE tt1;
DROP TABLE tt2;
DROP SCHEMA sc CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc.ref
drop cascades to table sc.hash
DROP SCHEMA sc2 CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc2.hash
drop cascades to table sc2.ref

View File

@ -2,11 +2,12 @@
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version;
major_version
---------------
10
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
t
(1 row)
\a\t

View File

@ -2,11 +2,12 @@
-- MULTI_EXPLAIN
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version;
major_version
---------------
9.6
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
f
(1 row)
\a\t

View File

@ -129,7 +129,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-15';
SHOW citus.version;
citus.version
---------------
7.0devel
7.0.3
(1 row)
-- ensure no objects were created outside pg_catalog
@ -329,3 +329,42 @@ FROM
-- we don't need the schema and the function anymore
DROP SCHEMA test_deamon CASCADE;
NOTICE: drop cascades to function test_deamon.maintenance_deamon_died(text)
-- verify citus does not crash while creating a table when run against an older worker
-- create_distributed_table piggybacks multiple commands into single one, if one worker
-- did not have the required UDF it should fail instead of crash.
-- create a test database, configure citus with single node
CREATE DATABASE another;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :worker_1_port
CREATE DATABASE another;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DETAIL: Citus does not propagate CREATE DATABASE command to workers
HINT: You can manually create a database and its extensions on workers.
\c - - - :master_port
\c another
CREATE EXTENSION citus;
SELECT FROM master_add_node('localhost', :worker_1_port);
--
(1 row)
\c - - - :worker_1_port
CREATE EXTENSION citus;
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
RENAME TO dummy_assign_function;
\c - - - :master_port
SET citus.shard_replication_factor to 1;
-- create_distributed_table command should fail
CREATE TABLE t1(a int, b int);
SELECT create_distributed_table('t1', 'a');
WARNING: function assign_distributed_transaction_id(integer, integer, unknown) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:57637
ERROR: current transaction is aborted, commands ignored until end of transaction block
CONTEXT: while executing command on localhost:57637
\c regression
\c - - - :worker_1_port
DROP DATABASE another;
\c - - - :master_port
DROP DATABASE another;

View File

@ -7,12 +7,12 @@
-- executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 690000;
SET citus.enable_unique_job_ids TO off;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
10
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
t
(1 row)
BEGIN;

View File

@ -7,12 +7,12 @@
-- executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 690000;
SET citus.enable_unique_job_ids TO off;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
9
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
f
(1 row)
BEGIN;

View File

@ -6,12 +6,12 @@
-- from a sql task to its depended tasks. Note that we set the executor type to task
-- tracker executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 710000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
10
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
t
(1 row)
BEGIN;

View File

@ -6,12 +6,12 @@
-- from a sql task to its depended tasks. Note that we set the executor type to task
-- tracker executor here, as we cannot run repartition jobs with real time executor.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 710000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
9
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
f
(1 row)
BEGIN;

View File

@ -7,7 +7,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1420000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
create_distributed_table
--------------------------
@ -56,6 +56,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
-- create prepare tests
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
ERROR: absolute path not allowed
-- check full permission
SET ROLE full_access;
EXECUTE prepare_insert(1);
@ -85,7 +88,22 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
-- check read permission
SET ROLE read_access;
EXECUTE prepare_insert(1);
@ -117,6 +135,17 @@ SELECT count(*) FROM test;
2
(1 row)
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
count
-------
0
(1 row)
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
-- check no permission
SET ROLE no_access;
@ -133,6 +162,13 @@ ERROR: permission denied for relation test
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
ERROR: permission denied for relation test
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
ERROR: permission denied for relation test
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
ERROR: operation is not allowed
HINT: Run the command with a superuser.
SET citus.task_executor_type TO 'real-time';
RESET ROLE;
DROP TABLE test;

View File

@ -0,0 +1,135 @@
-- Tests for running transaction recovery from a worker node
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
create_distributed_table
--------------------------
(1 row)
\c - - - :worker_1_port
SET citus.multi_shard_commit_protocol TO '2pc';
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- If the groupid of the worker changes this query will produce a
-- different result and the prepared statement names should be adapted
-- accordingly.
SELECT * FROM pg_dist_local_group;
groupid
---------
12
(1 row)
BEGIN;
CREATE TABLE table_should_abort (value int);
PREPARE TRANSACTION 'citus_12_should_abort';
BEGIN;
CREATE TABLE table_should_commit (value int);
PREPARE TRANSACTION 'citus_12_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
SELECT recover_prepared_transactions();
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_abort'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: ROLLBACK PREPARED 'citus_12_should_be_sorted_into_middle'
NOTICE: recovered a prepared transaction on localhost:57637
CONTEXT: COMMIT PREPARED 'citus_12_should_commit'
recover_prepared_transactions
-------------------------------
3
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort';
count
-------
0
(1 row)
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit';
count
-------
1
(1 row)
-- plain INSERT does not use 2PC
INSERT INTO test_recovery VALUES ('hello');
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- Multi-statement transactions should write 2 transaction recovery records
BEGIN;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction;
count
-------
4
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- Committed COPY should write 3 transaction records (2 fall into the same shard)
COPY test_recovery (x) FROM STDIN CSV;
SELECT count(*) FROM pg_dist_transaction;
count
-------
3
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
DROP TABLE table_should_commit;
\c - - - :master_port
DROP TABLE test_recovery_ref;
ERROR: table "test_recovery_ref" does not exist
DROP TABLE test_recovery;

View File

@ -4,12 +4,12 @@
-- This test checks that we can handle null min/max values in shard statistics
-- and that we don't partition or join prune shards that have null values.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 760000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
10
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
t
(1 row)
SET client_min_messages TO DEBUG2;

View File

@ -4,12 +4,12 @@
-- This test checks that we can handle null min/max values in shard statistics
-- and that we don't partition or join prune shards that have null values.
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 760000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
9
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
f
(1 row)
SET client_min_messages TO DEBUG2;

View File

@ -998,6 +998,42 @@ SELECT key, value FROM domain_partition_column_table ORDER BY key;
(6 rows)
DROP TABLE domain_partition_column_table;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
);
SELECT create_distributed_table('http_request', 'site_id');
create_distributed_table
--------------------------
(1 row)
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
1, clock_timestamp(), 'http://example.com/path', 'USA',
inet '88.250.10.123', 200, 10
);
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
count
-------
6
(1 row)
DROP TABLE http_request;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will

View File

@ -2152,7 +2152,6 @@ BEGIN;
INSERT INTO failure_test VALUES (1, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard
@ -2171,7 +2170,6 @@ ROLLBACK;
INSERT INTO failure_test VALUES (2, 1);
WARNING: connection error: localhost:57638
DETAIL: no connection to the server
SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement
WHERE shardid IN (
SELECT shardid FROM pg_dist_shard

View File

@ -2,12 +2,12 @@
-- MULTI_TASK_ASSIGNMENT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 880000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
10
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
t
(1 row)
SET citus.explain_distributed_queries TO off;

View File

@ -2,12 +2,12 @@
-- MULTI_TASK_ASSIGNMENT
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 880000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
major_version
---------------
9
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
version_above_nine
--------------------
f
(1 row)
SET citus.explain_distributed_queries TO off;

View File

@ -26,7 +26,7 @@ test: multi_mx_tpch_query7_nested multi_mx_ddl
test: multi_mx_repartition_udt_prepare
test: multi_mx_repartition_join_w1 multi_mx_repartition_join_w2 multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: multi_mx_metadata
test: multi_mx_modifications
test: multi_mx_modifications multi_mx_transaction_recovery
test: multi_mx_modifying_xacts
test: multi_mx_explain
test: multi_mx_reference_table

View File

@ -881,19 +881,15 @@ ALTER USER test_user WITH nologin;
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 2: "2,2"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 3: "3,3"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport
@ -915,7 +911,6 @@ SELECT shardid, shardstate, nodename, nodeport
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
ERROR: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_reference, line 1: "3,1"
-- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport
@ -933,11 +928,9 @@ SELECT shardid, shardstate, nodename, nodeport
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
ERROR: could not connect to any active placements
CONTEXT: COPY numbers_hash_other, line 1: "1,1"

View File

@ -134,14 +134,14 @@ SELECT shards_colocated(1300000, 1300016);
SELECT shards_colocated(1300000, 1300020);
-- check co-located table list
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass;
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass;
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass;
SELECT UNNEST(get_colocated_table_array('table1_group1'))::regclass ORDER BY 1;
SELECT UNNEST(get_colocated_table_array('table5_groupX'))::regclass ORDER BY 1;
SELECT UNNEST(get_colocated_table_array('table6_append'))::regclass ORDER BY 1;
-- check co-located shard list
SELECT get_colocated_shard_array(1300000);
SELECT get_colocated_shard_array(1300016);
SELECT get_colocated_shard_array(1300020);
SELECT get_colocated_shard_array(1300000) ORDER BY 1;
SELECT get_colocated_shard_array(1300016) ORDER BY 1;
SELECT get_colocated_shard_array(1300020) ORDER BY 1;
-- check FindShardIntervalIndex function
SELECT find_shard_interval_index(1300000);

View File

@ -482,5 +482,41 @@ END;
SELECT count(*) FROM tt1;
-- the goal of the following test is to make sure that
-- both create_reference_table and create_distributed_table
-- calls creates the schemas without leading to any deadlocks
-- first create reference table, then hash distributed table
BEGIN;
CREATE SCHEMA sc;
CREATE TABLE sc.ref(a int);
insert into sc.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc.ref');
CREATE TABLE sc.hash(a int);
insert into sc.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc.hash', 'a');
COMMIT;
-- first create hash distributed table, then reference table
BEGIN;
CREATE SCHEMA sc2;
CREATE TABLE sc2.hash(a int);
insert into sc2.hash SELECT s FROM generate_series(0, 100) s;
SELECT create_distributed_table('sc2.hash', 'a');
CREATE TABLE sc2.ref(a int);
insert into sc2.ref SELECT s FROM generate_series(0, 100) s;
SELECT create_reference_table('sc2.ref');
COMMIT;
DROP TABLE tt1;
DROP TABLE tt2;
DROP SCHEMA sc CASCADE;
DROP SCHEMA sc2 CASCADE;

View File

@ -2,9 +2,9 @@
-- MULTI_CREATE_TABLE_NEW_FEATURES
--
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
-- Verify that the GENERATED ... AS IDENTITY feature in PostgreSQL 10
-- is forbidden in distributed tables.

View File

@ -4,8 +4,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 570000;
-- print major version to make version-specific tests clear
SELECT substring(version(), '\d+(?:\.\d+)?') AS major_version;
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
\a\t

View File

@ -307,3 +307,37 @@ FROM
-- we don't need the schema and the function anymore
DROP SCHEMA test_deamon CASCADE;
-- verify citus does not crash while creating a table when run against an older worker
-- create_distributed_table piggybacks multiple commands into single one, if one worker
-- did not have the required UDF it should fail instead of crash.
-- create a test database, configure citus with single node
CREATE DATABASE another;
\c - - - :worker_1_port
CREATE DATABASE another;
\c - - - :master_port
\c another
CREATE EXTENSION citus;
SELECT FROM master_add_node('localhost', :worker_1_port);
\c - - - :worker_1_port
CREATE EXTENSION citus;
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
RENAME TO dummy_assign_function;
\c - - - :master_port
SET citus.shard_replication_factor to 1;
-- create_distributed_table command should fail
CREATE TABLE t1(a int, b int);
SELECT create_distributed_table('t1', 'a');
\c regression
\c - - - :worker_1_port
DROP DATABASE another;
\c - - - :master_port
DROP DATABASE another;

View File

@ -10,9 +10,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 690000;
SET citus.enable_unique_job_ids TO off;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
BEGIN;
SET client_min_messages TO DEBUG4;

View File

@ -9,9 +9,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 710000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
BEGIN;

View File

@ -10,7 +10,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1420000;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 2;
CREATE TABLE test (id integer);
CREATE TABLE test (id integer, val integer);
SELECT create_distributed_table('test', 'id');
-- turn off propagation to avoid Enterprise processing the following section
@ -47,6 +47,9 @@ GRANT SELECT ON TABLE test_1420001 TO read_access;
PREPARE prepare_insert AS INSERT INTO test VALUES ($1);
PREPARE prepare_select AS SELECT count(*) FROM test;
-- not allowed to read absolute paths, even as superuser
COPY "/etc/passwd" TO STDOUT WITH (format transmit);
-- check full permission
SET ROLE full_access;
@ -59,8 +62,18 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
-- check read permission
SET ROLE read_access;
@ -73,6 +86,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query (needs to transmit intermediate results)
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
-- check no permission
@ -87,6 +107,13 @@ SELECT count(*) FROM test WHERE id = 1;
SET citus.task_executor_type TO 'task-tracker';
SELECT count(*) FROM test;
-- test re-partition query
SELECT count(*) FROM test a JOIN test b ON (a.val = b.val) WHERE a.id = 1 AND b.id = 2;
-- should not be able to transmit directly
COPY "postgresql.conf" TO STDOUT WITH (format transmit);
SET citus.task_executor_type TO 'real-time';
RESET ROLE;

View File

@ -0,0 +1,78 @@
-- Tests for running transaction recovery from a worker node
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.replication_model TO streaming;
CREATE TABLE test_recovery (x text);
SELECT create_distributed_table('test_recovery', 'x');
\c - - - :worker_1_port
SET citus.multi_shard_commit_protocol TO '2pc';
-- Ensure pg_dist_transaction is empty for test
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
-- If the groupid of the worker changes this query will produce a
-- different result and the prepared statement names should be adapted
-- accordingly.
SELECT * FROM pg_dist_local_group;
BEGIN;
CREATE TABLE table_should_abort (value int);
PREPARE TRANSACTION 'citus_12_should_abort';
BEGIN;
CREATE TABLE table_should_commit (value int);
PREPARE TRANSACTION 'citus_12_should_commit';
BEGIN;
CREATE TABLE should_be_sorted_into_middle (value int);
PREPARE TRANSACTION 'citus_12_should_be_sorted_into_middle';
-- Add "fake" pg_dist_transaction records and run recovery
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_commit');
INSERT INTO pg_dist_transaction VALUES (12, 'citus_12_should_be_forgotten');
SELECT recover_prepared_transactions();
SELECT count(*) FROM pg_dist_transaction;
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_abort';
SELECT count(*) FROM pg_tables WHERE tablename = 'table_should_commit';
-- plain INSERT does not use 2PC
INSERT INTO test_recovery VALUES ('hello');
SELECT count(*) FROM pg_dist_transaction;
-- Multi-statement transactions should write 2 transaction recovery records
BEGIN;
INSERT INTO test_recovery VALUES ('hello');
INSERT INTO test_recovery VALUES ('world');
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- Committed INSERT..SELECT via coordinator should write 4 transaction recovery records
INSERT INTO test_recovery (x) SELECT 'hello-'||s FROM generate_series(1,100) s;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- Committed COPY should write 3 transaction records (2 fall into the same shard)
COPY test_recovery (x) FROM STDIN CSV;
hello-0
hello-1
world-0
world-1
\.
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
DROP TABLE table_should_commit;
\c - - - :master_port
DROP TABLE test_recovery_ref;
DROP TABLE test_recovery;

View File

@ -8,9 +8,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 760000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
SET client_min_messages TO DEBUG2;
SET citus.explain_all_tasks TO on;

View File

@ -533,6 +533,37 @@ SELECT key, value FROM domain_partition_column_table ORDER BY key;
DROP TABLE domain_partition_column_table;
-- verify we re-evaluate volatile functions every time
CREATE TABLE http_request (
site_id INT,
ingest_time TIMESTAMPTZ DEFAULT now(),
url TEXT,
request_country TEXT,
ip_address TEXT,
status_code INT,
response_time_msec INT
);
SELECT create_distributed_table('http_request', 'site_id');
PREPARE FOO AS INSERT INTO http_request (
site_id, ingest_time, url, request_country,
ip_address, status_code, response_time_msec
) VALUES (
1, clock_timestamp(), 'http://example.com/path', 'USA',
inet '88.250.10.123', 200, 10
);
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
EXECUTE foo;
SELECT count(distinct ingest_time) FROM http_request WHERE site_id = 1;
DROP TABLE http_request;
-- verify placement state updates invalidate shard state
--
-- We use a immutable function to check for that. The planner will

View File

@ -5,9 +5,9 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 880000;
-- print major version to make version-specific tests clear
-- print whether we're using version > 9 to make version-specific tests clear
SHOW server_version \gset
SELECT substring(:'server_version', '\d+') AS major_version;
SELECT substring(:'server_version', '\d+')::int > 9 AS version_above_nine;
SET citus.explain_distributed_queries TO off;