diff --git a/README.md b/README.md
index 93c9d4c0f..9d88b8108 100644
--- a/README.md
+++ b/README.md
@@ -34,10 +34,18 @@ To learn more, visit [citusdata.com](https://www.citusdata.com) and join
the [mailing list](https://groups.google.com/forum/#!forum/citus-users) to
stay on top of the latest developments.
-### Quickstart
+### Getting started with Citus
+
+The fastest way to get up and running is to create a Citus Cloud account. You can also setup a local Citus cluster with Docker.
+
+#### Citus Cloud
+
+Citus Cloud runs on top of AWS as a fully managed database as a service and has development plans available for getting started. You can provision a Citus Cloud account at [https://console.citusdata.com](https://console.citusdata.com) and get started with just a few clicks.
#### Local Citus Cluster
+If you're looking to get started locally, you can follow the following steps to get up and running.
+
* Install docker-compose: [Mac][mac_install] | [Linux][linux_install]
* (Mac only) connect to Docker VM
```bash
@@ -100,7 +108,7 @@ stay on top of the latest developments.
Training and Support |
See our support
+ href="https://www.citusdata.com/support">support
page for training and dedicated support options. |
@@ -140,7 +148,7 @@ Video](https://www.youtube.com/watch?v=NVl9_6J1G60&list=PLixnExCn6lRpP10ZlpJwx6A
___
-Copyright © 2012–2016 Citus Data, Inc.
+Copyright © 2012–2017 Citus Data, Inc.
[faq]: https://www.citusdata.com/frequently-asked-questions
[linux_install]: https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-compose-on-ubuntu-14-04
diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c
index 7345d18f5..a6e0c6e9f 100644
--- a/src/backend/distributed/commands/multi_copy.c
+++ b/src/backend/distributed/commands/multi_copy.c
@@ -45,86 +45,33 @@
*/
#include "postgres.h"
-#include "fmgr.h"
-#include "funcapi.h"
#include "libpq-fe.h"
#include "miscadmin.h"
-#include "plpgsql.h"
-#include
-#include
+#include /* for htons */
+#include /* for htons */
#include
-#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/htup.h"
-#include "access/nbtree.h"
#include "access/sdir.h"
-#include "access/tupdesc.h"
-#include "access/xact.h"
#include "catalog/namespace.h"
-#include "catalog/pg_class.h"
#include "catalog/pg_type.h"
-#include "catalog/pg_am.h"
-#include "catalog/pg_collation.h"
-#include "commands/extension.h"
#include "commands/copy.h"
#include "commands/defrem.h"
-#include "distributed/citus_ruleutils.h"
-#include "distributed/commit_protocol.h"
-#include "distributed/connection_cache.h"
-#include "distributed/listutils.h"
-#include "distributed/metadata_cache.h"
-#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h"
-#include "distributed/pg_dist_partition.h"
+#include "distributed/placement_connection.h"
+#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
-#include "distributed/shardinterval_utils.h"
-#include "distributed/worker_protocol.h"
-#include "executor/execdesc.h"
#include "executor/executor.h"
-#include "executor/instrument.h"
-#include "executor/tuptable.h"
-#include "lib/stringinfo.h"
-#include "nodes/execnodes.h"
-#include "nodes/makefuncs.h"
-#include "nodes/memnodes.h"
-#include "nodes/nodeFuncs.h"
-#include "nodes/nodes.h"
-#include "nodes/params.h"
-#include "nodes/parsenodes.h"
-#include "nodes/pg_list.h"
-#include "nodes/plannodes.h"
-#include "nodes/primnodes.h"
-#include "optimizer/clauses.h"
-#include "optimizer/cost.h"
-#include "optimizer/planner.h"
-#include "optimizer/var.h"
-#include "parser/parser.h"
-#include "parser/analyze.h"
-#include "parser/parse_node.h"
-#include "parser/parsetree.h"
-#include "parser/parse_type.h"
-#include "storage/lock.h"
-#include "tcop/dest.h"
-#include "tcop/tcopprot.h"
-#include "tcop/utility.h"
#include "tsearch/ts_locale.h"
#include "utils/builtins.h"
-#include "utils/elog.h"
-#include "utils/errcodes.h"
-#include "utils/guc.h"
#include "utils/lsyscache.h"
-#include "utils/typcache.h"
-#include "utils/palloc.h"
#include "utils/rel.h"
-#include "utils/relcache.h"
-#include "utils/snapmgr.h"
-#include "utils/tuplestore.h"
#include "utils/memutils.h"
@@ -132,7 +79,7 @@
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */
-static PGconn *masterConnection = NULL;
+static MultiConnection *masterConnection = NULL;
/* Local functions forward declarations */
@@ -141,29 +88,33 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
-static void OpenCopyTransactions(CopyStmt *copyStatement,
- ShardConnections *shardConnections, bool stopOnFailure,
- bool useBinaryCopyFormat);
+static void OpenCopyConnections(CopyStmt *copyStatement,
+ ShardConnections *shardConnections, bool stopOnFailure,
+ bool useBinaryCopyFormat);
+
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
-static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
-static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
+
+static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
+ List *connectionList);
+static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
+ List *connectionList);
+
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat);
-static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
-static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
- int64 shardId);
-static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
-static void ReportCopyError(PGconn *connection, PGresult *result);
+static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
+static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
+ MultiConnection *connection);
+static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
+static void ReportCopyError(MultiConnection *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
-static void StartCopyToNewShard(ShardConnections *shardConnections,
- CopyStmt *copyStatement, bool useBinaryCopyFormat);
+static int64 StartCopyToNewShard(ShardConnections *shardConnections,
+ CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName);
-static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId);
@@ -187,6 +138,12 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{
bool isCopyFromWorker = false;
+ BeginOrContinueCoordinatedTransaction();
+ if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
+ {
+ CoordinatedTransactionUse2PC();
+ }
+
/* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser())
{
@@ -208,6 +165,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
}
}
+ masterConnection = NULL; /* reset, might still be set after error */
isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker)
{
@@ -268,79 +226,41 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
- char *nodeUser = CurrentUserName();
+ Oid relationId = InvalidOid;
+ char partitionMethod = 0;
+ char *schemaName = NULL;
+ uint32 connectionFlags = FOR_DML;
- if (XactModificationLevel > XACT_MODIFICATION_NONE)
+ masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort);
+ ClaimConnectionExclusively(masterConnection);
+
+ RemoteTransactionBeginIfNecessary(masterConnection);
+
+ /* strip schema name for local reference */
+ schemaName = copyStatement->relation->schemaname;
+ copyStatement->relation->schemaname = NULL;
+
+ relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
+
+ /* put schema name back */
+ copyStatement->relation->schemaname = schemaName;
+ partitionMethod = MasterPartitionMethod(copyStatement->relation);
+ if (partitionMethod != DISTRIBUTE_BY_APPEND)
{
- ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
- errmsg("distributed copy operations must not appear in "
- "transaction blocks containing other distributed "
- "modifications")));
+ ereport(ERROR, (errmsg("copy from worker nodes is only supported "
+ "for append-partitioned tables")));
}
- masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
+ /*
+ * Remove master node options from the copy statement because they are not
+ * recognized by PostgreSQL machinery.
+ */
+ RemoveMasterOptions(copyStatement);
- PG_TRY();
- {
- PGresult *queryResult = NULL;
- Oid relationId = InvalidOid;
- char partitionMethod = 0;
+ CopyToNewShards(copyStatement, completionTag, relationId);
- /* strip schema name for local reference */
- char *schemaName = copyStatement->relation->schemaname;
- copyStatement->relation->schemaname = NULL;
-
- relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
-
- /* put schema name back */
- copyStatement->relation->schemaname = schemaName;
-
- partitionMethod = MasterPartitionMethod(copyStatement->relation);
- if (partitionMethod != DISTRIBUTE_BY_APPEND)
- {
- ereport(ERROR, (errmsg("copy from worker nodes is only supported "
- "for append-partitioned tables")));
- }
-
- /* run all metadata commands in a transaction */
- queryResult = PQexec(masterConnection, "BEGIN");
- if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
- {
- ereport(ERROR, (errmsg("could not start to update master node metadata")));
- }
-
- PQclear(queryResult);
-
- /*
- * Remove master node options from the copy statement because they are not
- * recognized by PostgreSQL machinery.
- */
- RemoveMasterOptions(copyStatement);
-
- CopyToNewShards(copyStatement, completionTag, relationId);
-
- /* commit metadata transactions */
- queryResult = PQexec(masterConnection, "COMMIT");
- if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
- {
- ereport(ERROR, (errmsg("could not commit master node metadata changes")));
- }
-
- PQclear(queryResult);
-
- /* close the connection */
- CloseConnectionByPGconn(masterConnection);
- masterConnection = NULL;
- }
- PG_CATCH();
- {
- /* close the connection */
- CloseConnectionByPGconn(masterConnection);
- masterConnection = NULL;
-
- PG_RE_THROW();
- }
- PG_END_TRY();
+ UnclaimConnection(masterConnection);
+ masterConnection = NULL;
}
@@ -371,9 +291,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
ShardInterval **shardIntervalCache = NULL;
bool useBinarySearch = false;
- HTAB *copyConnectionHash = NULL;
+ HTAB *shardConnectionHash = NULL;
ShardConnections *shardConnections = NULL;
- List *connectionList = NIL;
+ List *shardConnectionsList = NIL;
+ ListCell *shardConnectionsCell = NULL;
EState *executorState = NULL;
MemoryContext executorTupleContext = NULL;
@@ -387,6 +308,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
Var *partitionColumn = PartitionColumn(tableId, 0);
char partitionMethod = PartitionMethod(tableId);
+ ErrorContextCallback errorCallback;
+
/* get hash function for partition column */
hashFunction = cacheEntry->hashFunction;
@@ -454,6 +377,11 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
useBinarySearch = true;
}
+ if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
+ {
+ CoordinatedTransactionUse2PC();
+ }
+
/* initialize copy state to read from COPY data source */
copyState = BeginCopyFrom(distributedRelation,
copyStatement->filename,
@@ -475,175 +403,140 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
- /*
- * Create a mapping of shard id to a connection for each of its placements.
- * The hash should be initialized before the PG_TRY, since it is used in
- * PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
- * documentation).
- */
- copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
+ /* create a mapping of shard id to a connection for each of its placements */
+ shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
- /* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
- PG_TRY();
+ /* set up callback to identify error line number */
+ errorCallback.callback = CopyFromErrorCallback;
+ errorCallback.arg = (void *) copyState;
+ errorCallback.previous = error_context_stack;
+ error_context_stack = &errorCallback;
+
+ while (true)
{
- ErrorContextCallback errorCallback;
+ bool nextRowFound = false;
+ Datum partitionColumnValue = 0;
+ ShardInterval *shardInterval = NULL;
+ int64 shardId = 0;
+ bool shardConnectionsFound = false;
+ MemoryContext oldContext = NULL;
- /* set up callback to identify error line number */
- errorCallback.callback = CopyFromErrorCallback;
- errorCallback.arg = (void *) copyState;
- errorCallback.previous = error_context_stack;
- error_context_stack = &errorCallback;
+ ResetPerTupleExprContext(executorState);
- /* ensure transactions have unique names on worker nodes */
- InitializeDistributedTransaction();
+ oldContext = MemoryContextSwitchTo(executorTupleContext);
- while (true)
+ /* parse a row from the input */
+ nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
+ columnValues, columnNulls, NULL);
+
+ if (!nextRowFound)
{
- bool nextRowFound = false;
- Datum partitionColumnValue = 0;
- ShardInterval *shardInterval = NULL;
- int64 shardId = 0;
- bool shardConnectionsFound = false;
- MemoryContext oldContext = NULL;
-
- ResetPerTupleExprContext(executorState);
-
- oldContext = MemoryContextSwitchTo(executorTupleContext);
-
- /* parse a row from the input */
- nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
- columnValues, columnNulls, NULL);
-
- if (!nextRowFound)
- {
- MemoryContextSwitchTo(oldContext);
- break;
- }
-
- CHECK_FOR_INTERRUPTS();
-
- /*
- * Find the partition column value and corresponding shard interval
- * for non-reference tables.
- * Get the existing (and only a single) shard interval for the reference
- * tables. Note that, reference tables has NULL partition column values so
- * skip the check.
- */
- if (partitionColumn != NULL)
- {
- if (columnNulls[partitionColumn->varattno - 1])
- {
- ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
- errmsg("cannot copy row with NULL value "
- "in partition column")));
- }
-
- partitionColumnValue = columnValues[partitionColumn->varattno - 1];
- }
-
- /*
- * Find the shard interval and id for the partition column value for
- * non-reference tables.
- * For reference table, this function blindly returns the tables single
- * shard.
- */
- shardInterval = FindShardInterval(partitionColumnValue,
- shardIntervalCache,
- shardCount, partitionMethod,
- compareFunction, hashFunction,
- useBinarySearch);
-
- if (shardInterval == NULL)
- {
- ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
- errmsg("could not find shard for partition column "
- "value")));
- }
-
- shardId = shardInterval->shardId;
-
MemoryContextSwitchTo(oldContext);
-
- /* get existing connections to the shard placements, if any */
- shardConnections = GetShardHashConnections(copyConnectionHash, shardId,
- &shardConnectionsFound);
- if (!shardConnectionsFound)
- {
- bool stopOnFailure = false;
-
- if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
- {
- stopOnFailure = true;
- }
-
- /* open connections and initiate COPY on shard placements */
- OpenCopyTransactions(copyStatement, shardConnections, stopOnFailure,
- copyOutState->binary);
-
- /* send copy binary headers to shard placements */
- if (copyOutState->binary)
- {
- SendCopyBinaryHeaders(copyOutState,
- shardConnections->connectionList);
- }
- }
-
- /* replicate row to shard placements */
- resetStringInfo(copyOutState->fe_msgbuf);
- AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
- copyOutState, columnOutputFunctions);
- SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList);
-
- processedRowCount += 1;
+ break;
}
- connectionList = ConnectionList(copyConnectionHash);
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Find the partition column value and corresponding shard interval
+ * for non-reference tables.
+ * Get the existing (and only a single) shard interval for the reference
+ * tables. Note that, reference tables has NULL partition column values so
+ * skip the check.
+ */
+ if (partitionColumn != NULL)
+ {
+ if (columnNulls[partitionColumn->varattno - 1])
+ {
+ ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("cannot copy row with NULL value "
+ "in partition column")));
+ }
+
+ partitionColumnValue = columnValues[partitionColumn->varattno - 1];
+ }
+
+ /*
+ * Find the shard interval and id for the partition column value for
+ * non-reference tables.
+ * For reference table, this function blindly returns the tables single
+ * shard.
+ */
+ shardInterval = FindShardInterval(partitionColumnValue,
+ shardIntervalCache,
+ shardCount, partitionMethod,
+ compareFunction, hashFunction,
+ useBinarySearch);
+
+ if (shardInterval == NULL)
+ {
+ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("could not find shard for partition column "
+ "value")));
+ }
+
+ shardId = shardInterval->shardId;
+
+ MemoryContextSwitchTo(oldContext);
+
+ /* get existing connections to the shard placements, if any */
+ shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
+ &shardConnectionsFound);
+ if (!shardConnectionsFound)
+ {
+ bool stopOnFailure = false;
+
+ if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
+ {
+ stopOnFailure = true;
+ }
+
+ /* open connections and initiate COPY on shard placements */
+ OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
+ copyOutState->binary);
+
+ /* send copy binary headers to shard placements */
+ if (copyOutState->binary)
+ {
+ SendCopyBinaryHeaders(copyOutState, shardId,
+ shardConnections->connectionList);
+ }
+ }
+
+ /* replicate row to shard placements */
+ resetStringInfo(copyOutState->fe_msgbuf);
+ AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
+ copyOutState, columnOutputFunctions);
+ SendCopyDataToAll(copyOutState->fe_msgbuf, shardId,
+ shardConnections->connectionList);
+
+ processedRowCount += 1;
+ }
+
+ /* all lines have been copied, stop showing line number in errors */
+ error_context_stack = errorCallback.previous;
+
+ shardConnectionsList = ShardConnectionList(shardConnectionHash);
+ foreach(shardConnectionsCell, shardConnectionsList)
+ {
+ ShardConnections *shardConnections = (ShardConnections *) lfirst(
+ shardConnectionsCell);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
- SendCopyBinaryFooters(copyOutState, connectionList);
+ SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
+ shardConnections->connectionList);
}
- /* all lines have been copied, stop showing line number in errors */
- error_context_stack = errorCallback.previous;
-
/* close the COPY input on all shard placements */
- EndRemoteCopy(connectionList, true);
-
- if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
- cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
- {
- PrepareRemoteTransactions(connectionList);
- }
-
- EndCopyFrom(copyState);
- heap_close(distributedRelation, NoLock);
-
- /* check for cancellation one last time before committing */
- CHECK_FOR_INTERRUPTS();
+ EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
}
- PG_CATCH();
- {
- List *abortConnectionList = NIL;
- /* roll back all transactions */
- abortConnectionList = ConnectionList(copyConnectionHash);
- EndRemoteCopy(abortConnectionList, false);
- AbortRemoteTransactions(abortConnectionList);
- CloseConnections(abortConnectionList);
+ EndCopyFrom(copyState);
+ heap_close(distributedRelation, NoLock);
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- /*
- * Ready to commit the transaction, this code is below the PG_TRY block because
- * we do not want any of the transactions rolled back if a failure occurs. Instead,
- * they should be rolled forward.
- */
- CommitRemoteTransactions(connectionList, false);
- CloseConnections(connectionList);
+ CHECK_FOR_INTERRUPTS();
if (completionTag != NULL)
{
@@ -676,11 +569,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
- /*
- * Shard connections should be initialized before the PG_TRY, since it is
- * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
- * (see sigsetjmp documentation).
- */
+ ErrorContextCallback errorCallback;
+
+ int64 currentShardId = INVALID_SHARD_ID;
+ uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
+ uint64 copiedDataSizeInBytes = 0;
+ uint64 processedRowCount = 0;
+
ShardConnections *shardConnections =
(ShardConnections *) palloc0(sizeof(ShardConnections));
@@ -701,140 +596,124 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
- /* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
- PG_TRY();
+ /* set up callback to identify error line number */
+ errorCallback.callback = CopyFromErrorCallback;
+ errorCallback.arg = (void *) copyState;
+ errorCallback.previous = error_context_stack;
+
+ while (true)
{
- uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
- uint64 copiedDataSizeInBytes = 0;
- uint64 processedRowCount = 0;
+ bool nextRowFound = false;
+ MemoryContext oldContext = NULL;
+ uint64 messageBufferSize = 0;
- /* set up callback to identify error line number */
- ErrorContextCallback errorCallback;
+ ResetPerTupleExprContext(executorState);
- errorCallback.callback = CopyFromErrorCallback;
- errorCallback.arg = (void *) copyState;
- errorCallback.previous = error_context_stack;
+ /* switch to tuple memory context and start showing line number in errors */
+ error_context_stack = &errorCallback;
+ oldContext = MemoryContextSwitchTo(executorTupleContext);
- while (true)
+ /* parse a row from the input */
+ nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
+ columnValues, columnNulls, NULL);
+
+ if (!nextRowFound)
{
- bool nextRowFound = false;
- MemoryContext oldContext = NULL;
- uint64 messageBufferSize = 0;
-
- ResetPerTupleExprContext(executorState);
-
- /* switch to tuple memory context and start showing line number in errors */
- error_context_stack = &errorCallback;
- oldContext = MemoryContextSwitchTo(executorTupleContext);
-
- /* parse a row from the input */
- nextRowFound = NextCopyFrom(copyState, executorExpressionContext,
- columnValues, columnNulls, NULL);
-
- if (!nextRowFound)
- {
- /* switch to regular memory context and stop showing line number in errors */
- MemoryContextSwitchTo(oldContext);
- error_context_stack = errorCallback.previous;
- break;
- }
-
- CHECK_FOR_INTERRUPTS();
-
/* switch to regular memory context and stop showing line number in errors */
MemoryContextSwitchTo(oldContext);
error_context_stack = errorCallback.previous;
-
- /*
- * If copied data size is zero, this means either this is the first
- * line in the copy or we just filled the previous shard up to its
- * capacity. Either way, we need to create a new shard and
- * start copying new rows into it.
- */
- if (copiedDataSizeInBytes == 0)
- {
- /* create shard and open connections to shard placements */
- StartCopyToNewShard(shardConnections, copyStatement,
- copyOutState->binary);
-
- /* send copy binary headers to shard placements */
- if (copyOutState->binary)
- {
- SendCopyBinaryHeaders(copyOutState,
- shardConnections->connectionList);
- }
- }
-
- /* replicate row to shard placements */
- resetStringInfo(copyOutState->fe_msgbuf);
- AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
- copyOutState, columnOutputFunctions);
- SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList);
-
- messageBufferSize = copyOutState->fe_msgbuf->len;
- copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
-
- /*
- * If we filled up this shard to its capacity, send copy binary footers
- * to shard placements, commit copy transactions, close connections
- * and finally update shard statistics.
- *
- * */
- if (copiedDataSizeInBytes > shardMaxSizeInBytes)
- {
- if (copyOutState->binary)
- {
- SendCopyBinaryFooters(copyOutState,
- shardConnections->connectionList);
- }
- FinalizeCopyToNewShard(shardConnections);
- MasterUpdateShardStatistics(shardConnections->shardId);
-
- copiedDataSizeInBytes = 0;
- }
-
- processedRowCount += 1;
+ break;
}
- /*
- * For the last shard, send copy binary footers to shard placements,
- * commit copy transactions, close connections and finally update shard
- * statistics. If no row is send, there is no shard to finalize the
- * copy command.
- */
- if (copiedDataSizeInBytes > 0)
- {
- if (copyOutState->binary)
- {
- SendCopyBinaryFooters(copyOutState,
- shardConnections->connectionList);
- }
- FinalizeCopyToNewShard(shardConnections);
- MasterUpdateShardStatistics(shardConnections->shardId);
- }
-
- EndCopyFrom(copyState);
- heap_close(distributedRelation, NoLock);
-
- /* check for cancellation one last time before returning */
CHECK_FOR_INTERRUPTS();
- if (completionTag != NULL)
- {
- snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
- "COPY " UINT64_FORMAT, processedRowCount);
- }
- }
- PG_CATCH();
- {
- /* roll back all transactions */
- EndRemoteCopy(shardConnections->connectionList, false);
- AbortRemoteTransactions(shardConnections->connectionList);
- CloseConnections(shardConnections->connectionList);
+ /* switch to regular memory context and stop showing line number in errors */
+ MemoryContextSwitchTo(oldContext);
+ error_context_stack = errorCallback.previous;
- PG_RE_THROW();
+ /*
+ * If copied data size is zero, this means either this is the first
+ * line in the copy or we just filled the previous shard up to its
+ * capacity. Either way, we need to create a new shard and
+ * start copying new rows into it.
+ */
+ if (copiedDataSizeInBytes == 0)
+ {
+ /* create shard and open connections to shard placements */
+ currentShardId = StartCopyToNewShard(shardConnections, copyStatement,
+ copyOutState->binary);
+
+ /* send copy binary headers to shard placements */
+ if (copyOutState->binary)
+ {
+ SendCopyBinaryHeaders(copyOutState, currentShardId,
+ shardConnections->connectionList);
+ }
+ }
+
+ /* replicate row to shard placements */
+ resetStringInfo(copyOutState->fe_msgbuf);
+ AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
+ copyOutState, columnOutputFunctions);
+ SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId,
+ shardConnections->connectionList);
+
+ messageBufferSize = copyOutState->fe_msgbuf->len;
+ copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
+
+ /*
+ * If we filled up this shard to its capacity, send copy binary footers
+ * to shard placements, and update shard statistics.
+ */
+ if (copiedDataSizeInBytes > shardMaxSizeInBytes)
+ {
+ Assert(currentShardId != INVALID_SHARD_ID);
+
+ if (copyOutState->binary)
+ {
+ SendCopyBinaryFooters(copyOutState, currentShardId,
+ shardConnections->connectionList);
+ }
+
+ EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
+ MasterUpdateShardStatistics(shardConnections->shardId);
+
+ copiedDataSizeInBytes = 0;
+ currentShardId = INVALID_SHARD_ID;
+ }
+
+ processedRowCount += 1;
+ }
+
+ /*
+ * For the last shard, send copy binary footers to shard placements,
+ * and update shard statistics. If no row is send, there is no shard
+ * to finalize the copy command.
+ */
+ if (copiedDataSizeInBytes > 0)
+ {
+ Assert(currentShardId != INVALID_SHARD_ID);
+
+ if (copyOutState->binary)
+ {
+ SendCopyBinaryFooters(copyOutState, currentShardId,
+ shardConnections->connectionList);
+ }
+ EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
+ MasterUpdateShardStatistics(shardConnections->shardId);
+ }
+
+ EndCopyFrom(copyState);
+ heap_close(distributedRelation, NoLock);
+
+ /* check for cancellation one last time before returning */
+ CHECK_FOR_INTERRUPTS();
+
+ if (completionTag != NULL)
+ {
+ snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
+ "COPY " UINT64_FORMAT, processedRowCount);
}
- PG_END_TRY();
}
@@ -890,7 +769,7 @@ MasterPartitionMethod(RangeVar *relation)
StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
- queryResult = PQexec(masterConnection, partitionMethodCommand->data);
+ queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
@@ -904,7 +783,7 @@ MasterPartitionMethod(RangeVar *relation)
}
else
{
- WarnRemoteError(masterConnection, queryResult);
+ ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not get the partition method of the "
"distributed table")));
}
@@ -945,24 +824,23 @@ RemoveMasterOptions(CopyStmt *copyStatement)
/*
- * OpenCopyTransactions opens a connection for each placement of a shard and
- * starts a COPY transaction. If a connection cannot be opened, then the shard
- * placement is marked as inactive and the COPY continues with the remaining
+ * OpenCopyConnections opens a connection for each placement of a shard and
+ * starts a COPY transaction if necessary. If a connection cannot be opened,
+ * then the shard placement is marked as inactive and the COPY continues with the remaining
* shard placements.
*/
static void
-OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
- bool stopOnFailure, bool useBinaryCopyFormat)
+OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
+ bool stopOnFailure, bool useBinaryCopyFormat)
{
List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *placementCell = NULL;
- ListCell *failedPlacementCell = NULL;
List *connectionList = NULL;
int64 shardId = shardConnections->shardId;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
- "OpenCopyTransactions",
+ "OpenCopyConnections",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
@@ -974,7 +852,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
MemoryContextSwitchTo(oldContext);
- if (XactModificationLevel > XACT_MODIFICATION_NONE)
+ if (XactModificationLevel > XACT_MODIFICATION_DATA)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
@@ -987,27 +865,15 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
- WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
- int workerGroupId = 0;
char *nodeUser = CurrentUserName();
- PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
-
- TransactionConnection *transactionConnection = NULL;
+ MultiConnection *connection = NULL;
+ uint32 connectionFlags = FOR_DML;
StringInfo copyCommand = NULL;
PGresult *result = NULL;
- /*
- * When a copy is initiated from a worker, the information about the connected
- * worker node may not be found if pg_dist_node entries are not synced to this
- * node. In that case we leave the groupId as 0. Fortunately, it is unused since
- * COPY from a worker does not initiate a 2PC.
- */
- if (workerNode != NULL)
- {
- workerGroupId = workerNode->groupId;
- }
+ connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
- if (connection == NULL)
+ if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
if (stopOnFailure)
{
@@ -1019,42 +885,35 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
continue;
}
- result = PQexec(connection, "BEGIN");
- if (PQresultStatus(result) != PGRES_COMMAND_OK)
+ /*
+ * If errors are supposed to cause immediate aborts (i.e. we don't
+ * want to/can't invalidate placements), mark the connection as
+ * critical so later errors cause failures.
+ */
+ if (stopOnFailure)
{
- WarnRemoteError(connection, result);
- failedPlacementList = lappend(failedPlacementList, placement);
-
- PQclear(result);
- continue;
+ MarkRemoteTransactionCritical(connection);
}
-
- PQclear(result);
+ ClaimConnectionExclusively(connection);
+ RemoteTransactionBeginIfNecessary(connection);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat);
+ result = PQexec(connection->pgConn, copyCommand->data);
- result = PQexec(connection, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN)
{
- WarnRemoteError(connection, result);
- failedPlacementList = lappend(failedPlacementList, placement);
+ ReportConnectionError(connection, WARNING);
+ MarkRemoteTransactionFailed(connection, true);
PQclear(result);
+
+ /* failed placements will be invalidated by transaction machinery */
+ failedPlacementList = lappend(failedPlacementList, placement);
continue;
}
PQclear(result);
-
- transactionConnection = palloc0(sizeof(TransactionConnection));
-
- transactionConnection->groupId = workerGroupId;
- transactionConnection->connectionId = shardConnections->shardId;
- transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED;
- transactionConnection->connection = connection;
- transactionConnection->nodeName = nodeName;
- transactionConnection->nodePort = nodePort;
-
- connectionList = lappend(connectionList, transactionConnection);
+ connectionList = lappend(connectionList, connection);
}
/* if all placements failed, error out */
@@ -1070,14 +929,6 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
*/
Assert(!stopOnFailure || list_length(failedPlacementList) == 0);
- /* otherwise, mark failed placements as inactive: they're stale */
- foreach(failedPlacementCell, failedPlacementList)
- {
- ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell);
-
- UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
- }
-
shardConnections->connectionList = connectionList;
MemoryContextReset(localContext);
@@ -1161,7 +1012,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
- queryResult = PQexec(masterConnection, shardPlacementsCommand->data);
+ queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
int rowCount = PQntuples(queryResult);
@@ -1196,21 +1047,21 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
/* Send copy binary headers to given connections */
static void
-SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
+SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList)
{
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState);
- SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
+ SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
}
/* Send copy binary footers to given connections */
static void
-SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
+SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList)
{
resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState);
- SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList);
+ SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
}
@@ -1253,17 +1104,13 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
* SendCopyDataToAll sends copy data to all connections in a list.
*/
static void
-SendCopyDataToAll(StringInfo dataBuffer, List *connectionList)
+SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
- TransactionConnection *transactionConnection =
- (TransactionConnection *) lfirst(connectionCell);
- PGconn *connection = transactionConnection->connection;
- int64 shardId = transactionConnection->connectionId;
-
- SendCopyDataToPlacement(dataBuffer, connection, shardId);
+ MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
+ SendCopyDataToPlacement(dataBuffer, shardId, connection);
}
}
@@ -1273,73 +1120,61 @@ SendCopyDataToAll(StringInfo dataBuffer, List *connectionList)
* over the given connection.
*/
static void
-SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId)
+SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection)
{
- int copyResult = PQputCopyData(connection, dataBuffer->data, dataBuffer->len);
+ int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len);
if (copyResult != 1)
{
- char *nodeName = ConnectionGetOptionValue(connection, "host");
- char *nodePort = ConnectionGetOptionValue(connection, "port");
-
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
- errmsg("failed to COPY to shard %ld on %s:%s",
- shardId, nodeName, nodePort)));
+ errmsg("failed to COPY to shard %ld on %s:%d",
+ shardId, connection->hostname, connection->port),
+ errdetail("failed to send %d bytes %s", dataBuffer->len,
+ dataBuffer->data)));
}
}
/*
- * EndRemoteCopy ends the COPY input on all connections. If stopOnFailure
- * is true, then EndRemoteCopy reports an error on failure, otherwise it
- * reports a warning or continues.
+ * EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
+ * If stopOnFailure is true, then EndRemoteCopy reports an error on failure,
+ * otherwise it reports a warning or continues.
*/
static void
-EndRemoteCopy(List *connectionList, bool stopOnFailure)
+EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
- TransactionConnection *transactionConnection =
- (TransactionConnection *) lfirst(connectionCell);
- PGconn *connection = transactionConnection->connection;
- int64 shardId = transactionConnection->connectionId;
+ MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
int copyEndResult = 0;
PGresult *result = NULL;
- if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED)
- {
- /* a failure occurred after having previously called EndRemoteCopy */
- continue;
- }
-
/* end the COPY input */
- copyEndResult = PQputCopyEnd(connection, NULL);
- transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
+ copyEndResult = PQputCopyEnd(connection->pgConn, NULL);
if (copyEndResult != 1)
{
if (stopOnFailure)
{
- char *nodeName = ConnectionGetOptionValue(connection, "host");
- char *nodePort = ConnectionGetOptionValue(connection, "port");
-
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
- errmsg("failed to COPY to shard %ld on %s:%s",
- shardId, nodeName, nodePort)));
+ errmsg("failed to COPY to shard %ld on %s:%d",
+ shardId, connection->hostname, connection->port)));
}
continue;
}
/* check whether there were any COPY errors */
- result = PQgetResult(connection);
+ result = PQgetResult(connection->pgConn);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{
ReportCopyError(connection, result);
}
PQclear(result);
+ ForgetResults(connection);
+ UnclaimConnection(connection);
}
}
@@ -1349,7 +1184,7 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
* the remote COPY error messages.
*/
static void
-ReportCopyError(PGconn *connection, PGresult *result)
+ReportCopyError(MultiConnection *connection, PGresult *result)
{
char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
@@ -1365,10 +1200,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
{
/* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL;
- char *nodeName = ConnectionGetOptionValue(connection, "host");
- char *nodePort = ConnectionGetOptionValue(connection, "port");
- remoteMessage = PQerrorMessage(connection);
+ remoteMessage = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */
@@ -1378,7 +1211,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
}
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
- errmsg("failed to complete COPY on %s:%s", nodeName, nodePort),
+ errmsg("failed to complete COPY on %s:%d", connection->hostname,
+ connection->port),
errdetail("%s", remoteMessage)));
}
}
@@ -1581,23 +1415,25 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* StartCopyToNewShard creates a new shard and related shard placements and
* opens connections to shard placements.
*/
-static void
+static int64
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
bool useBinaryCopyFormat)
{
char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
-
int64 shardId = MasterCreateEmptyShard(qualifiedName);
+ bool stopOnFailure = true;
shardConnections->shardId = shardId;
- list_free_deep(shardConnections->connectionList);
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
- OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat);
+ OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
+ useBinaryCopyFormat);
+
+ return shardId;
}
@@ -1654,7 +1490,7 @@ RemoteCreateEmptyShard(char *relationName)
StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
- queryResult = PQexec(masterConnection, createEmptyShardCommand->data);
+ queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
@@ -1663,7 +1499,7 @@ RemoteCreateEmptyShard(char *relationName)
}
else
{
- WarnRemoteError(masterConnection, queryResult);
+ ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
}
@@ -1673,22 +1509,6 @@ RemoteCreateEmptyShard(char *relationName)
}
-/*
- * FinalizeCopyToNewShard commits copy transaction and closes connections to
- * shard placements.
- */
-static void
-FinalizeCopyToNewShard(ShardConnections *shardConnections)
-{
- /* close the COPY input on all shard placements */
- EndRemoteCopy(shardConnections->connectionList, true);
-
- /* commit transactions and close connections */
- CommitRemoteTransactions(shardConnections->connectionList, true);
- CloseConnections(shardConnections->connectionList);
-}
-
-
/*
* MasterUpdateShardStatistics dispatches the update shard statistics call
* between local or remote master node according to the master connection state.
@@ -1719,7 +1539,7 @@ RemoteUpdateShardStatistics(uint64 shardId)
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId);
- queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data);
+ queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{
ereport(ERROR, (errmsg("could not update shard statistics")));
diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c
index 5534de9aa..915ac3dbc 100644
--- a/src/backend/distributed/master/master_node_protocol.c
+++ b/src/backend/distributed/master/master_node_protocol.c
@@ -64,7 +64,7 @@
/* Shard related configuration */
int ShardCount = 32;
-int ShardReplicationFactor = 2; /* desired replication factor for shards */
+int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c
index 0e3e90b33..fd4f59cef 100644
--- a/src/backend/distributed/planner/multi_explain.c
+++ b/src/backend/distributed/planner/multi_explain.c
@@ -52,8 +52,6 @@
/* Config variables that enable printing distributed query plans */
-bool ExplainMultiLogicalPlan = false;
-bool ExplainMultiPhysicalPlan = false;
bool ExplainDistributedQueries = true;
bool ExplainAllTasks = false;
@@ -79,6 +77,9 @@ static void ExplainTask(Task *task, int placementIndex, List *explainOutputList,
static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
ExplainState *es);
static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
+static void MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into,
+ ExplainState *es, const char *queryString,
+ ParamListInfo params, const instr_time *planDuration);
/* Static Explain functions copied from explain.c */
static void ExplainOpenGroup(const char *objtype, const char *labelname,
@@ -100,17 +101,10 @@ void
MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params)
{
- MultiPlan *multiPlan = NULL;
- CmdType commandType = CMD_UNKNOWN;
- PlannedStmt *initialPlan = NULL;
- Job *workerJob = NULL;
- bool routerExecutablePlan = false;
instr_time planStart;
instr_time planDuration;
- Query *originalQuery = NULL;
- RelationRestrictionContext *restrictionContext = NULL;
- bool localQuery = !NeedsDistributedPlanning(query);
int cursorOptions = 0;
+ PlannedStmt *plan = NULL;
#if PG_VERSION_NUM >= 90600
@@ -124,85 +118,54 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
}
#endif
- /* handle local queries in the same way as ExplainOneQuery */
- if (localQuery)
- {
- PlannedStmt *plan = NULL;
-
- INSTR_TIME_SET_CURRENT(planStart);
-
- /* plan the query */
- plan = pg_plan_query(query, cursorOptions, params);
-
- INSTR_TIME_SET_CURRENT(planDuration);
- INSTR_TIME_SUBTRACT(planDuration, planStart);
-
- /* run it (if needed) and produce output */
- ExplainOnePlan(plan, into, es, queryString, params, &planDuration);
-
- return;
- }
-
- /*
- * standard_planner scribbles on it's input, but for deparsing we need the
- * unmodified form. So copy once we're sure it's a distributed query.
- */
- originalQuery = copyObject(query);
-
- /* measure the full planning time to display in EXPLAIN ANALYZE */
+ /* plan query, just like ExplainOneQuery does */
INSTR_TIME_SET_CURRENT(planStart);
- restrictionContext = CreateAndPushRestrictionContext();
-
- PG_TRY();
- {
- /* call standard planner to modify the query structure before multi planning */
- initialPlan = standard_planner(query, cursorOptions, params);
-
- commandType = initialPlan->commandType;
- if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
- commandType == CMD_DELETE)
- {
- if (es->analyze)
- {
- ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
- "distributed tables is not supported.")));
- }
- }
-
- multiPlan = CreatePhysicalPlan(originalQuery, query, restrictionContext);
- }
- PG_CATCH();
- {
- PopRestrictionContext();
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- PopRestrictionContext();
+ /* plan the query */
+ plan = pg_plan_query(query, cursorOptions, params);
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
- if (ExplainMultiLogicalPlan)
+ /* if not a distributed query, use plain explain infrastructure */
+ if (!HasCitusToplevelNode(plan))
{
- MultiTreeRoot *multiTree = MultiLogicalPlanCreate(query);
- char *logicalPlanString = CitusNodeToString(multiTree);
- char *formattedPlanString = pretty_format_node_dump(logicalPlanString);
+ /* run it (if needed) and produce output */
+ ExplainOnePlan(plan, into, es, queryString, params, &planDuration);
+ }
+ else
+ {
+ MultiExplainOnePlan(plan, into, es, queryString, params, &planDuration);
+ }
+}
- appendStringInfo(es->str, "logical plan:\n");
- appendStringInfo(es->str, "%s\n", formattedPlanString);
+
+/*
+ * MultiExplainOnePlan explains the plan for an individual distributed query.
+ */
+static void
+MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into,
+ ExplainState *es, const char *queryString,
+ ParamListInfo params, const instr_time *planDuration)
+{
+ MultiPlan *multiPlan = NULL;
+ CmdType commandType = CMD_UNKNOWN;
+ Job *workerJob = NULL;
+ bool routerExecutablePlan = false;
+
+ commandType = plan->commandType;
+ if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
+ commandType == CMD_DELETE)
+ {
+ if (es->analyze)
+ {
+ ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Using ANALYZE for INSERT/UPDATE/DELETE on "
+ "distributed tables is not supported.")));
+ }
}
- if (ExplainMultiPhysicalPlan)
- {
- char *physicalPlanString = CitusNodeToString(multiPlan);
- char *formattedPlanString = pretty_format_node_dump(physicalPlanString);
-
- appendStringInfo(es->str, "physical plan:\n");
- appendStringInfo(es->str, "%s\n", formattedPlanString);
- }
+ multiPlan = GetMultiPlan(plan);
if (!ExplainDistributedQueries)
{
@@ -268,8 +231,6 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
if (!routerExecutablePlan)
{
- PlannedStmt *masterPlan = MultiQueryContainerNode(initialPlan, multiPlan);
-
if (es->format == EXPLAIN_FORMAT_TEXT)
{
appendStringInfoSpaces(es->str, es->indent * 2);
@@ -279,7 +240,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
ExplainOpenGroup("Master Query", "Master Query", false, es);
- ExplainMasterPlan(masterPlan, into, es, queryString, params, &planDuration);
+ ExplainMasterPlan(plan, into, es, queryString, params, planDuration);
ExplainCloseGroup("Master Query", "Master Query", false, es);
diff --git a/src/backend/distributed/planner/multi_planner.c b/src/backend/distributed/planner/multi_planner.c
index d72f9db0c..c21665d9c 100644
--- a/src/backend/distributed/planner/multi_planner.c
+++ b/src/backend/distributed/planner/multi_planner.c
@@ -36,10 +36,15 @@ static List *relationRestrictionContextList = NIL;
/* local function forward declarations */
static void CheckNodeIsDumpable(Node *node);
-
-
-/* local function forward declarations */
static char * GetMultiPlanString(PlannedStmt *result);
+static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
+ struct MultiPlan *multiPlan);
+static struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
+ RelationRestrictionContext *
+ restrictionContext);
+static RelationRestrictionContext * CreateAndPushRestrictionContext(void);
+static RelationRestrictionContext * CurrentRestrictionContext(void);
+static void PopRestrictionContext(void);
/* Distributed planner hook */
@@ -123,7 +128,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* target shards. SELECT queries go through the full logical plan/optimize/
* physical plan process needed to produce distributed query plans.
*/
-MultiPlan *
+static MultiPlan *
CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext)
{
diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c
index 509ab0381..c1452392c 100644
--- a/src/backend/distributed/shared_library_init.c
+++ b/src/backend/distributed/shared_library_init.c
@@ -295,30 +295,6 @@ RegisterCitusConfigVariables(void)
0,
NULL, NULL, NULL);
- DefineCustomBoolVariable(
- "citus.explain_multi_logical_plan",
- gettext_noop("Enables Explain to print out distributed logical plans."),
- gettext_noop("We use this private configuration entry as a debugging aid. "
- "If enabled, the Explain command prints out the optimized "
- "logical plan for distributed queries."),
- &ExplainMultiLogicalPlan,
- false,
- PGC_USERSET,
- GUC_NO_SHOW_ALL,
- NULL, NULL, NULL);
-
- DefineCustomBoolVariable(
- "citus.explain_multi_physical_plan",
- gettext_noop("Enables Explain to print out distributed physical plans."),
- gettext_noop("We use this private configuration entry as a debugging aid. "
- "If enabled, the Explain command prints out the physical "
- "plan for distributed queries."),
- &ExplainMultiPhysicalPlan,
- false,
- PGC_USERSET,
- GUC_NO_SHOW_ALL,
- NULL, NULL, NULL);
-
DefineCustomBoolVariable(
"citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."),
@@ -393,7 +369,7 @@ RegisterCitusConfigVariables(void)
"configuration value at sharded table creation time, "
"and later reuse the initially read value."),
&ShardReplicationFactor,
- 2, 1, 100,
+ 1, 1, 100,
PGC_USERSET,
0,
NULL, NULL, NULL);
diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c
index e04da91a8..7e43ac595 100644
--- a/src/backend/distributed/transaction/multi_shard_transaction.c
+++ b/src/backend/distributed/transaction/multi_shard_transaction.c
@@ -202,12 +202,12 @@ GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFo
/*
- * ConnectionList flattens the connection hash to a list of placement connections.
+ * ShardConnectionList returns the list of ShardConnections in connectionHash.
*/
List *
-ConnectionList(HTAB *connectionHash)
+ShardConnectionList(HTAB *connectionHash)
{
- List *connectionList = NIL;
+ List *shardConnectionsList = NIL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
@@ -221,13 +221,12 @@ ConnectionList(HTAB *connectionHash)
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
- List *shardConnectionsList = list_copy(shardConnections->connectionList);
- connectionList = list_concat(connectionList, shardConnectionsList);
+ shardConnectionsList = lappend(shardConnectionsList, shardConnections);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
- return connectionList;
+ return shardConnectionsList;
}
diff --git a/src/include/distributed/multi_explain.h b/src/include/distributed/multi_explain.h
index 984b8e2ab..55f4bf75d 100644
--- a/src/include/distributed/multi_explain.h
+++ b/src/include/distributed/multi_explain.h
@@ -13,8 +13,6 @@
#include "executor/executor.h"
/* Config variables managed via guc.c to explain distributed query plans */
-extern bool ExplainMultiLogicalPlan;
-extern bool ExplainMultiPhysicalPlan;
extern bool ExplainDistributedQueries;
extern bool ExplainAllTasks;
diff --git a/src/include/distributed/multi_planner.h b/src/include/distributed/multi_planner.h
index 7c96b55bf..a425232fa 100644
--- a/src/include/distributed/multi_planner.h
+++ b/src/include/distributed/multi_planner.h
@@ -53,16 +53,8 @@ extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan;
-extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
- RelationRestrictionContext *
- restrictionContext);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
-extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
- struct MultiPlan *multiPlan);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte);
-extern RelationRestrictionContext * CreateAndPushRestrictionContext(void);
-extern RelationRestrictionContext * CurrentRestrictionContext(void);
-extern void PopRestrictionContext(void);
#endif /* MULTI_PLANNER_H */
diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h
index b79e0e4f9..c6c66bc46 100644
--- a/src/include/distributed/multi_shard_transaction.h
+++ b/src/include/distributed/multi_shard_transaction.h
@@ -22,11 +22,7 @@ typedef struct ShardConnections
{
int64 shardId;
- /*
- * XXX: this list contains MultiConnection for multi-shard transactions
- * or TransactionConnection for COPY, the latter should be converted to
- * use MultiConnection as well.
- */
+ /* list of MultiConnection structs */
List *connectionList;
} ShardConnections;
@@ -36,7 +32,7 @@ extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound);
-extern List * ConnectionList(HTAB *connectionHash);
+extern List * ShardConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList);
extern void ResetShardPlacementTransactionState(void);
diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out
index afcd90436..491406d55 100644
--- a/src/test/regress/expected/multi_insert_select.out
+++ b/src/test/regress/expected/multi_insert_select.out
@@ -1456,17 +1456,15 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
ROLLBACK;
-- Insert after copy is currently disallowed because of the way the
--- transaction modification state is currently handled. Copy still
--- goes through despite rollback.
+-- transaction modification state is currently handled. Copy is also
+-- rolled back.
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation.
--- Since the COPY commits immediately, the result is visible in the
--- next operation. Copy goes through despite rollback, while insert
--- rolls back.
+-- Both insert and copy are rolled back successfully.
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101;
@@ -1477,9 +1475,7 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101;
(1 row)
ROLLBACK;
--- Copy after insert is disallowed since the insert is not immediately
--- committed and the copy uses different connections that will not yet
--- see the result of the insert.
+-- Copy after insert is currently disallowed.
BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
@@ -1489,8 +1485,6 @@ ROLLBACK;
BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
-ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
-CONTEXT: COPY raw_events_first, line 1: "103,103"
ROLLBACK;
-- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first;
@@ -1499,7 +1493,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second;
count
-------
- 11
+ 9
(1 row)
INSERT INTO raw_events_second SELECT * FROM test_view;
@@ -1509,7 +1503,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second;
count
-------
- 13
+ 11
(1 row)
-- inserting into views does not
diff --git a/src/test/regress/expected/multi_modifying_xacts.out b/src/test/regress/expected/multi_modifying_xacts.out
index c8308dc38..b4fe5931e 100644
--- a/src/test/regress/expected/multi_modifying_xacts.out
+++ b/src/test/regress/expected/multi_modifying_xacts.out
@@ -192,37 +192,178 @@ SELECT * FROM labs WHERE id = 6;
----+------
(0 rows)
--- COPY can't happen second,
+-- COPY can happen after single row INSERT
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ','
-ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
-CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT;
--- though it will work if before any modifications
+-- COPY can happen before single row INSERT
BEGIN;
\copy labs from stdin delimiter ','
SELECT name FROM labs WHERE id = 10;
name
----------------
Weyland-Yutani
-(1 row)
+ Weyland-Yutani
+(2 rows)
INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT;
--- but a double-copy isn't allowed (the first will persist)
+-- two consecutive COPYs in a transaction are allowed
BEGIN;
\copy labs from stdin delimiter ','
\copy labs from stdin delimiter ','
-ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
-CONTEXT: COPY labs, line 1: "12,fsociety"
COMMIT;
-SELECT name FROM labs WHERE id = 11;
+SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
name
----------------
Planet Express
+ fsociety
+(2 rows)
+
+-- 1pc failure test
+SELECT recover_prepared_transactions();
+ recover_prepared_transactions
+-------------------------------
+ 0
(1 row)
+-- copy with unique index violation
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+\copy researchers FROM STDIN delimiter ','
+ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
+DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
+COMMIT;
+-- verify rollback
+SELECT * FROM researchers WHERE lab_id = 6;
+ id | lab_id | name
+----+--------+------
+(0 rows)
+
+SELECT count(*) FROM pg_dist_transaction;
+ count
+-------
+ 0
+(1 row)
+
+-- 2pc failure and success tests
+SET citus.multi_shard_commit_protocol TO '2pc';
+SELECT recover_prepared_transactions();
+ recover_prepared_transactions
+-------------------------------
+ 0
+(1 row)
+
+-- copy with unique index violation
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+\copy researchers FROM STDIN delimiter ','
+ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
+DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
+COMMIT;
+-- verify rollback
+SELECT * FROM researchers WHERE lab_id = 6;
+ id | lab_id | name
+----+--------+------
+(0 rows)
+
+SELECT count(*) FROM pg_dist_transaction;
+ count
+-------
+ 0
+(1 row)
+
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+\copy researchers FROM STDIN delimiter ','
+COMMIT;
+-- verify success
+SELECT * FROM researchers WHERE lab_id = 6;
+ id | lab_id | name
+----+--------+----------------------
+ 17 | 6 | 'Bjarne Stroustrup'
+ 18 | 6 | 'Dennis Ritchie'
+(2 rows)
+
+-- verify 2pc
+SELECT count(*) FROM pg_dist_transaction;
+ count
+-------
+ 2
+(1 row)
+
+RESET citus.multi_shard_commit_protocol;
+-- create a check function
+SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
+ BEGIN
+ IF (NEW.id > 30) THEN
+ RAISE ''illegal value'';
+ END IF;
+
+ RETURN NEW;
+ END;
+$rli$ LANGUAGE plpgsql;')
+ORDER BY nodeport;
+ nodename | nodeport | success | result
+-----------+----------+---------+-----------------
+ localhost | 57637 | t | CREATE FUNCTION
+ localhost | 57638 | t | CREATE FUNCTION
+(2 rows)
+
+-- register after insert trigger
+SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
+ORDER BY nodeport, shardid;
+ nodename | nodeport | shardid | success | result
+-----------+----------+---------+---------+----------------
+ localhost | 57637 | 1200000 | t | CREATE TRIGGER
+ localhost | 57637 | 1200001 | t | CREATE TRIGGER
+ localhost | 57638 | 1200000 | t | CREATE TRIGGER
+ localhost | 57638 | 1200001 | t | CREATE TRIGGER
+(4 rows)
+
+-- hide postgresql version dependend messages for next test only
+\set VERBOSITY terse
+-- deferred check should abort the transaction
+BEGIN;
+DELETE FROM researchers WHERE lab_id = 6;
+\copy researchers FROM STDIN delimiter ','
+\copy researchers FROM STDIN delimiter ','
+COMMIT;
+WARNING: illegal value
+WARNING: failed to commit transaction on localhost:57638
+WARNING: illegal value
+WARNING: failed to commit transaction on localhost:57637
+WARNING: could not commit transaction for shard 1200001 on any active node
+ERROR: could not commit transaction on any active node
+\unset VERBOSITY
+-- verify everyhing including delete is rolled back
+SELECT * FROM researchers WHERE lab_id = 6;
+ id | lab_id | name
+----+--------+----------------------
+ 17 | 6 | 'Bjarne Stroustrup'
+ 18 | 6 | 'Dennis Ritchie'
+(2 rows)
+
+-- cleanup triggers and the function
+SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
+ORDER BY nodeport, shardid;
+ nodename | nodeport | shardid | success | result
+-----------+----------+---------+---------+--------------
+ localhost | 57637 | 1200000 | t | DROP TRIGGER
+ localhost | 57637 | 1200001 | t | DROP TRIGGER
+ localhost | 57638 | 1200000 | t | DROP TRIGGER
+ localhost | 57638 | 1200001 | t | DROP TRIGGER
+(4 rows)
+
+SELECT * FROM run_command_on_workers('drop function reject_large_id()')
+ORDER BY nodeport;
+ nodename | nodeport | success | result
+-----------+----------+---------+---------------
+ localhost | 57637 | t | DROP FUNCTION
+ localhost | 57638 | t | DROP FUNCTION
+(2 rows)
+
-- finally, ALTER and copy aren't compatible
BEGIN;
ALTER TABLE labs ADD COLUMN motto text;
@@ -238,11 +379,6 @@ COMMIT;
id | bigint | not null
name | text | not null
-SELECT * FROM labs WHERE id = 12;
- id | name
-----+------
-(0 rows)
-
-- and if the copy is before the ALTER...
BEGIN;
\copy labs from stdin delimiter ','
@@ -269,7 +405,7 @@ ALTER TABLE labs ADD COLUMN motto text;
SELECT master_modify_multiple_shards('DELETE FROM labs');
master_modify_multiple_shards
-------------------------------
- 5
+ 7
(1 row)
ALTER TABLE labs ADD COLUMN score float;
diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source
index 64880627b..4722a83c6 100644
--- a/src/test/regress/output/multi_copy.source
+++ b/src/test/regress/output/multi_copy.source
@@ -707,3 +707,4 @@ HINT: No function matches the given name and argument types. You might need to
CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164
DETAIL: Setting shard statistics to NULL
+ERROR: failure on connection marked as essential: localhost:57637
diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl
index 961c7d96c..4c10cc896 100755
--- a/src/test/regress/pg_regress_multi.pl
+++ b/src/test/regress/pg_regress_multi.pl
@@ -219,6 +219,7 @@ push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4");
push(@pgOptions, '-c', "citus.expire_cached_shards=on");
push(@pgOptions, '-c', "citus.task_tracker_delay=10ms");
push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms");
+push(@pgOptions, '-c', "citus.shard_replication_factor=2");
# Add externally added options last, so they overwrite the default ones above
for my $option (@userPgOptions)
diff --git a/src/test/regress/sql/multi_insert_select.sql b/src/test/regress/sql/multi_insert_select.sql
index 4b15b67ee..ea11ef1b7 100644
--- a/src/test/regress/sql/multi_insert_select.sql
+++ b/src/test/regress/sql/multi_insert_select.sql
@@ -706,8 +706,8 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ROLLBACK;
-- Insert after copy is currently disallowed because of the way the
--- transaction modification state is currently handled. Copy still
--- goes through despite rollback.
+-- transaction modification state is currently handled. Copy is also
+-- rolled back.
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
100,100
@@ -716,9 +716,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation.
--- Since the COPY commits immediately, the result is visible in the
--- next operation. Copy goes through despite rollback, while insert
--- rolls back.
+-- Both insert and copy are rolled back successfully.
BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
101,101
@@ -727,9 +725,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101
SELECT user_id FROM raw_events_first WHERE user_id = 101;
ROLLBACK;
--- Copy after insert is disallowed since the insert is not immediately
--- committed and the copy uses different connections that will not yet
--- see the result of the insert.
+-- Copy after insert is currently disallowed.
BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
diff --git a/src/test/regress/sql/multi_modifying_xacts.sql b/src/test/regress/sql/multi_modifying_xacts.sql
index 634740cc6..139e20703 100644
--- a/src/test/regress/sql/multi_modifying_xacts.sql
+++ b/src/test/regress/sql/multi_modifying_xacts.sql
@@ -148,7 +148,7 @@ COMMIT;
\d labs
SELECT * FROM labs WHERE id = 6;
--- COPY can't happen second,
+-- COPY can happen after single row INSERT
BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ','
@@ -156,7 +156,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
\.
COMMIT;
--- though it will work if before any modifications
+-- COPY can happen before single row INSERT
BEGIN;
\copy labs from stdin delimiter ','
10,Weyland-Yutani
@@ -165,7 +165,7 @@ SELECT name FROM labs WHERE id = 10;
INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT;
--- but a double-copy isn't allowed (the first will persist)
+-- two consecutive COPYs in a transaction are allowed
BEGIN;
\copy labs from stdin delimiter ','
11,Planet Express
@@ -175,7 +175,93 @@ BEGIN;
\.
COMMIT;
-SELECT name FROM labs WHERE id = 11;
+SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
+
+-- 1pc failure test
+SELECT recover_prepared_transactions();
+-- copy with unique index violation
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+17, 6, 'Bjarne Stroustrup'
+\.
+\copy researchers FROM STDIN delimiter ','
+18, 6, 'Bjarne Stroustrup'
+\.
+COMMIT;
+-- verify rollback
+SELECT * FROM researchers WHERE lab_id = 6;
+SELECT count(*) FROM pg_dist_transaction;
+
+-- 2pc failure and success tests
+SET citus.multi_shard_commit_protocol TO '2pc';
+SELECT recover_prepared_transactions();
+-- copy with unique index violation
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+17, 6, 'Bjarne Stroustrup'
+\.
+\copy researchers FROM STDIN delimiter ','
+18, 6, 'Bjarne Stroustrup'
+\.
+COMMIT;
+-- verify rollback
+SELECT * FROM researchers WHERE lab_id = 6;
+SELECT count(*) FROM pg_dist_transaction;
+
+BEGIN;
+\copy researchers FROM STDIN delimiter ','
+17, 6, 'Bjarne Stroustrup'
+\.
+\copy researchers FROM STDIN delimiter ','
+18, 6, 'Dennis Ritchie'
+\.
+COMMIT;
+-- verify success
+SELECT * FROM researchers WHERE lab_id = 6;
+-- verify 2pc
+SELECT count(*) FROM pg_dist_transaction;
+
+RESET citus.multi_shard_commit_protocol;
+
+-- create a check function
+SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
+ BEGIN
+ IF (NEW.id > 30) THEN
+ RAISE ''illegal value'';
+ END IF;
+
+ RETURN NEW;
+ END;
+$rli$ LANGUAGE plpgsql;')
+ORDER BY nodeport;
+
+-- register after insert trigger
+SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
+ORDER BY nodeport, shardid;
+
+-- hide postgresql version dependend messages for next test only
+\set VERBOSITY terse
+-- deferred check should abort the transaction
+BEGIN;
+DELETE FROM researchers WHERE lab_id = 6;
+\copy researchers FROM STDIN delimiter ','
+31, 6, 'Bjarne Stroustrup'
+\.
+\copy researchers FROM STDIN delimiter ','
+30, 6, 'Dennis Ritchie'
+\.
+COMMIT;
+\unset VERBOSITY
+
+-- verify everyhing including delete is rolled back
+SELECT * FROM researchers WHERE lab_id = 6;
+
+-- cleanup triggers and the function
+SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
+ORDER BY nodeport, shardid;
+
+SELECT * FROM run_command_on_workers('drop function reject_large_id()')
+ORDER BY nodeport;
-- finally, ALTER and copy aren't compatible
BEGIN;
@@ -187,7 +273,6 @@ COMMIT;
-- but the DDL should correctly roll back
\d labs
-SELECT * FROM labs WHERE id = 12;
-- and if the copy is before the ALTER...
BEGIN;