Merge branch 'master' into fix_command_counter_increment

pull/1148/head
Önder Kalacı 2017-01-21 09:21:19 +02:00 committed by GitHub
commit 594fa761e1
16 changed files with 686 additions and 718 deletions

View File

@ -34,10 +34,18 @@ To learn more, visit [citusdata.com](https://www.citusdata.com) and join
the [mailing list](https://groups.google.com/forum/#!forum/citus-users) to the [mailing list](https://groups.google.com/forum/#!forum/citus-users) to
stay on top of the latest developments. stay on top of the latest developments.
### Quickstart ### Getting started with Citus
The fastest way to get up and running is to create a Citus Cloud account. You can also setup a local Citus cluster with Docker.
#### Citus Cloud
Citus Cloud runs on top of AWS as a fully managed database as a service and has development plans available for getting started. You can provision a Citus Cloud account at [https://console.citusdata.com](https://console.citusdata.com) and get started with just a few clicks.
#### Local Citus Cluster #### Local Citus Cluster
If you're looking to get started locally, you can follow the following steps to get up and running.
* Install docker-compose: [Mac][mac_install] | [Linux][linux_install] * Install docker-compose: [Mac][mac_install] | [Linux][linux_install]
* (Mac only) connect to Docker VM * (Mac only) connect to Docker VM
```bash ```bash
@ -100,7 +108,7 @@ stay on top of the latest developments.
<tr> <tr>
<td>Training and Support</td> <td>Training and Support</td>
<td>See our <a <td>See our <a
href="https://www.citusdata.com/citus-products/citus-data-pricing">support href="https://www.citusdata.com/support">support
page</a> for training and dedicated support options.</td> page</a> for training and dedicated support options.</td>
</tr> </tr>
</table> </table>
@ -140,7 +148,7 @@ Video](https://www.youtube.com/watch?v=NVl9_6J1G60&list=PLixnExCn6lRpP10ZlpJwx6A
___ ___
Copyright © 20122016 Citus Data, Inc. Copyright © 20122017 Citus Data, Inc.
[faq]: https://www.citusdata.com/frequently-asked-questions [faq]: https://www.citusdata.com/frequently-asked-questions
[linux_install]: https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-compose-on-ubuntu-14-04 [linux_install]: https://www.digitalocean.com/community/tutorials/how-to-install-and-use-docker-compose-on-ubuntu-14-04

View File

@ -45,86 +45,33 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "fmgr.h"
#include "funcapi.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "plpgsql.h"
#include <arpa/inet.h> #include <arpa/inet.h> /* for htons */
#include <netinet/in.h> #include <netinet/in.h> /* for htons */
#include <string.h> #include <string.h>
#include "access/heapam.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/htup.h" #include "access/htup.h"
#include "access/nbtree.h"
#include "access/sdir.h" #include "access/sdir.h"
#include "access/tupdesc.h"
#include "access/xact.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "catalog/pg_am.h"
#include "catalog/pg_collation.h"
#include "commands/extension.h"
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/pg_dist_partition.h" #include "distributed/placement_connection.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "executor/execdesc.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "executor/instrument.h"
#include "executor/tuptable.h"
#include "lib/stringinfo.h"
#include "nodes/execnodes.h"
#include "nodes/makefuncs.h"
#include "nodes/memnodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/nodes.h"
#include "nodes/params.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "nodes/plannodes.h"
#include "nodes/primnodes.h"
#include "optimizer/clauses.h"
#include "optimizer/cost.h"
#include "optimizer/planner.h"
#include "optimizer/var.h"
#include "parser/parser.h"
#include "parser/analyze.h"
#include "parser/parse_node.h"
#include "parser/parsetree.h"
#include "parser/parse_type.h"
#include "storage/lock.h"
#include "tcop/dest.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
#include "tsearch/ts_locale.h" #include "tsearch/ts_locale.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/errcodes.h"
#include "utils/guc.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/typcache.h"
#include "utils/palloc.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
#include "utils/tuplestore.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -132,7 +79,7 @@
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */ /* use a global connection to the master node in order to skip passing it around */
static PGconn *masterConnection = NULL; static MultiConnection *masterConnection = NULL;
/* Local functions forward declarations */ /* Local functions forward declarations */
@ -141,29 +88,33 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation); static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement); static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyTransactions(CopyStmt *copyStatement, static void OpenCopyConnections(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool stopOnFailure, ShardConnections *shardConnections, bool stopOnFailure,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState); CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId); static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, static void SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId,
int64 shardId); MultiConnection *connection);
static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result); static void ReportCopyError(MultiConnection *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections, static int64 StartCopyToNewShard(ShardConnections *shardConnections,
CopyStmt *copyStatement, bool useBinaryCopyFormat); CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName); static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName);
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
static void MasterUpdateShardStatistics(uint64 shardId); static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId); static void RemoteUpdateShardStatistics(uint64 shardId);
@ -187,6 +138,12 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{ {
bool isCopyFromWorker = false; bool isCopyFromWorker = false;
BeginOrContinueCoordinatedTransaction();
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* disallow COPY to/from file or program except for superusers */ /* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser()) if (copyStatement->filename != NULL && !superuser())
{ {
@ -208,6 +165,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
} }
} }
masterConnection = NULL; /* reset, might still be set after error */
isCopyFromWorker = IsCopyFromWorker(copyStatement); isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker) if (isCopyFromWorker)
{ {
@ -268,33 +226,24 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName; char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort; int32 nodePort = masterNodeAddress->nodePort;
char *nodeUser = CurrentUserName();
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in "
"transaction blocks containing other distributed "
"modifications")));
}
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
PG_TRY();
{
PGresult *queryResult = NULL;
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
char partitionMethod = 0; char partitionMethod = 0;
char *schemaName = NULL;
uint32 connectionFlags = FOR_DML;
masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort);
ClaimConnectionExclusively(masterConnection);
RemoteTransactionBeginIfNecessary(masterConnection);
/* strip schema name for local reference */ /* strip schema name for local reference */
char *schemaName = copyStatement->relation->schemaname; schemaName = copyStatement->relation->schemaname;
copyStatement->relation->schemaname = NULL; copyStatement->relation->schemaname = NULL;
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
/* put schema name back */ /* put schema name back */
copyStatement->relation->schemaname = schemaName; copyStatement->relation->schemaname = schemaName;
partitionMethod = MasterPartitionMethod(copyStatement->relation); partitionMethod = MasterPartitionMethod(copyStatement->relation);
if (partitionMethod != DISTRIBUTE_BY_APPEND) if (partitionMethod != DISTRIBUTE_BY_APPEND)
{ {
@ -302,15 +251,6 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
"for append-partitioned tables"))); "for append-partitioned tables")));
} }
/* run all metadata commands in a transaction */
queryResult = PQexec(masterConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not start to update master node metadata")));
}
PQclear(queryResult);
/* /*
* Remove master node options from the copy statement because they are not * Remove master node options from the copy statement because they are not
* recognized by PostgreSQL machinery. * recognized by PostgreSQL machinery.
@ -319,29 +259,9 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
CopyToNewShards(copyStatement, completionTag, relationId); CopyToNewShards(copyStatement, completionTag, relationId);
/* commit metadata transactions */ UnclaimConnection(masterConnection);
queryResult = PQexec(masterConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not commit master node metadata changes")));
}
PQclear(queryResult);
/* close the connection */
CloseConnectionByPGconn(masterConnection);
masterConnection = NULL; masterConnection = NULL;
} }
PG_CATCH();
{
/* close the connection */
CloseConnectionByPGconn(masterConnection);
masterConnection = NULL;
PG_RE_THROW();
}
PG_END_TRY();
}
/* /*
@ -371,9 +291,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
ShardInterval **shardIntervalCache = NULL; ShardInterval **shardIntervalCache = NULL;
bool useBinarySearch = false; bool useBinarySearch = false;
HTAB *copyConnectionHash = NULL; HTAB *shardConnectionHash = NULL;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *shardConnectionsList = NIL;
ListCell *shardConnectionsCell = NULL;
EState *executorState = NULL; EState *executorState = NULL;
MemoryContext executorTupleContext = NULL; MemoryContext executorTupleContext = NULL;
@ -387,6 +308,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
Var *partitionColumn = PartitionColumn(tableId, 0); Var *partitionColumn = PartitionColumn(tableId, 0);
char partitionMethod = PartitionMethod(tableId); char partitionMethod = PartitionMethod(tableId);
ErrorContextCallback errorCallback;
/* get hash function for partition column */ /* get hash function for partition column */
hashFunction = cacheEntry->hashFunction; hashFunction = cacheEntry->hashFunction;
@ -454,6 +377,11 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
useBinarySearch = true; useBinarySearch = true;
} }
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
{
CoordinatedTransactionUse2PC();
}
/* initialize copy state to read from COPY data source */ /* initialize copy state to read from COPY data source */
copyState = BeginCopyFrom(distributedRelation, copyState = BeginCopyFrom(distributedRelation,
copyStatement->filename, copyStatement->filename,
@ -475,18 +403,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* /* create a mapping of shard id to a connection for each of its placements */
* Create a mapping of shard id to a connection for each of its placements. shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
* The hash should be initialized before the PG_TRY, since it is used in
* PG_CATCH. Otherwise, it may be undefined in the PG_CATCH (see sigsetjmp
* documentation).
*/
copyConnectionHash = CreateShardConnectionHash(TopTransactionContext);
/* we use a PG_TRY block to roll back on errors (e.g. in NextCopyFrom) */
PG_TRY();
{
ErrorContextCallback errorCallback;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
@ -494,9 +412,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
errorCallback.previous = error_context_stack; errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback; error_context_stack = &errorCallback;
/* ensure transactions have unique names on worker nodes */
InitializeDistributedTransaction();
while (true) while (true)
{ {
bool nextRowFound = false; bool nextRowFound = false;
@ -565,7 +480,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
/* get existing connections to the shard placements, if any */ /* get existing connections to the shard placements, if any */
shardConnections = GetShardHashConnections(copyConnectionHash, shardId, shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
&shardConnectionsFound); &shardConnectionsFound);
if (!shardConnectionsFound) if (!shardConnectionsFound)
{ {
@ -577,13 +492,13 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
} }
/* open connections and initiate COPY on shard placements */ /* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections, stopOnFailure, OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
copyOutState->binary); copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryHeaders(copyOutState, SendCopyBinaryHeaders(copyOutState, shardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
} }
@ -592,58 +507,36 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions); copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId,
shardConnections->connectionList);
processedRowCount += 1; processedRowCount += 1;
} }
connectionList = ConnectionList(copyConnectionHash);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, connectionList);
}
/* all lines have been copied, stop showing line number in errors */ /* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous; error_context_stack = errorCallback.previous;
/* close the COPY input on all shard placements */ shardConnectionsList = ShardConnectionList(shardConnectionHash);
EndRemoteCopy(connectionList, true); foreach(shardConnectionsCell, shardConnectionsList)
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC ||
cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
{ {
PrepareRemoteTransactions(connectionList); ShardConnections *shardConnections = (ShardConnections *) lfirst(
shardConnectionsCell);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
shardConnections->connectionList);
}
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
} }
EndCopyFrom(copyState); EndCopyFrom(copyState);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* check for cancellation one last time before committing */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
}
PG_CATCH();
{
List *abortConnectionList = NIL;
/* roll back all transactions */
abortConnectionList = ConnectionList(copyConnectionHash);
EndRemoteCopy(abortConnectionList, false);
AbortRemoteTransactions(abortConnectionList);
CloseConnections(abortConnectionList);
PG_RE_THROW();
}
PG_END_TRY();
/*
* Ready to commit the transaction, this code is below the PG_TRY block because
* we do not want any of the transactions rolled back if a failure occurs. Instead,
* they should be rolled forward.
*/
CommitRemoteTransactions(connectionList, false);
CloseConnections(connectionList);
if (completionTag != NULL) if (completionTag != NULL)
{ {
@ -676,11 +569,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
const char *delimiterCharacter = "\t"; const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N"; const char *nullPrintCharacter = "\\N";
/* ErrorContextCallback errorCallback;
* Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH int64 currentShardId = INVALID_SHARD_ID;
* (see sigsetjmp documentation). uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
*/ uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
ShardConnections *shardConnections = ShardConnections *shardConnections =
(ShardConnections *) palloc0(sizeof(ShardConnections)); (ShardConnections *) palloc0(sizeof(ShardConnections));
@ -701,16 +596,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
PG_TRY();
{
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
ErrorContextCallback errorCallback;
errorCallback.callback = CopyFromErrorCallback; errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState; errorCallback.arg = (void *) copyState;
errorCallback.previous = error_context_stack; errorCallback.previous = error_context_stack;
@ -754,13 +640,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (copiedDataSizeInBytes == 0) if (copiedDataSizeInBytes == 0)
{ {
/* create shard and open connections to shard placements */ /* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, copyStatement, currentShardId = StartCopyToNewShard(shardConnections, copyStatement,
copyOutState->binary); copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryHeaders(copyOutState, SendCopyBinaryHeaders(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
} }
@ -769,28 +655,31 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
copyOutState, columnOutputFunctions); copyOutState, columnOutputFunctions);
SendCopyDataToAll(copyOutState->fe_msgbuf, shardConnections->connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, currentShardId,
shardConnections->connectionList);
messageBufferSize = copyOutState->fe_msgbuf->len; messageBufferSize = copyOutState->fe_msgbuf->len;
copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize; copiedDataSizeInBytes = copiedDataSizeInBytes + messageBufferSize;
/* /*
* If we filled up this shard to its capacity, send copy binary footers * If we filled up this shard to its capacity, send copy binary footers
* to shard placements, commit copy transactions, close connections * to shard placements, and update shard statistics.
* and finally update shard statistics. */
*
* */
if (copiedDataSizeInBytes > shardMaxSizeInBytes) if (copiedDataSizeInBytes > shardMaxSizeInBytes)
{ {
Assert(currentShardId != INVALID_SHARD_ID);
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryFooters(copyOutState, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
FinalizeCopyToNewShard(shardConnections);
EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0; copiedDataSizeInBytes = 0;
currentShardId = INVALID_SHARD_ID;
} }
processedRowCount += 1; processedRowCount += 1;
@ -798,18 +687,19 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
/* /*
* For the last shard, send copy binary footers to shard placements, * For the last shard, send copy binary footers to shard placements,
* commit copy transactions, close connections and finally update shard * and update shard statistics. If no row is send, there is no shard
* statistics. If no row is send, there is no shard to finalize the * to finalize the copy command.
* copy command.
*/ */
if (copiedDataSizeInBytes > 0) if (copiedDataSizeInBytes > 0)
{ {
Assert(currentShardId != INVALID_SHARD_ID);
if (copyOutState->binary) if (copyOutState->binary)
{ {
SendCopyBinaryFooters(copyOutState, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
FinalizeCopyToNewShard(shardConnections); EndRemoteCopy(currentShardId, shardConnections->connectionList, true);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
} }
@ -825,17 +715,6 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
"COPY " UINT64_FORMAT, processedRowCount); "COPY " UINT64_FORMAT, processedRowCount);
} }
} }
PG_CATCH();
{
/* roll back all transactions */
EndRemoteCopy(shardConnections->connectionList, false);
AbortRemoteTransactions(shardConnections->connectionList);
CloseConnections(shardConnections->connectionList);
PG_RE_THROW();
}
PG_END_TRY();
}
/* /*
@ -890,7 +769,7 @@ MasterPartitionMethod(RangeVar *relation)
StringInfo partitionMethodCommand = makeStringInfo(); StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
queryResult = PQexec(masterConnection, partitionMethodCommand->data); queryResult = PQexec(masterConnection->pgConn, partitionMethodCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -904,7 +783,7 @@ MasterPartitionMethod(RangeVar *relation)
} }
else else
{ {
WarnRemoteError(masterConnection, queryResult); ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not get the partition method of the " ereport(ERROR, (errmsg("could not get the partition method of the "
"distributed table"))); "distributed table")));
} }
@ -945,24 +824,23 @@ RemoveMasterOptions(CopyStmt *copyStatement)
/* /*
* OpenCopyTransactions opens a connection for each placement of a shard and * OpenCopyConnections opens a connection for each placement of a shard and
* starts a COPY transaction. If a connection cannot be opened, then the shard * starts a COPY transaction if necessary. If a connection cannot be opened,
* placement is marked as inactive and the COPY continues with the remaining * then the shard placement is marked as inactive and the COPY continues with the remaining
* shard placements. * shard placements.
*/ */
static void static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool stopOnFailure, bool useBinaryCopyFormat)
{ {
List *finalizedPlacementList = NIL; List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL; List *failedPlacementList = NIL;
ListCell *placementCell = NULL; ListCell *placementCell = NULL;
ListCell *failedPlacementCell = NULL;
List *connectionList = NULL; List *connectionList = NULL;
int64 shardId = shardConnections->shardId; int64 shardId = shardConnections->shardId;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"OpenCopyTransactions", "OpenCopyConnections",
ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE); ALLOCSET_DEFAULT_MAXSIZE);
@ -974,7 +852,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (XactModificationLevel > XACT_MODIFICATION_NONE) if (XactModificationLevel > XACT_MODIFICATION_DATA)
{ {
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION), ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("distributed copy operations must not appear in " errmsg("distributed copy operations must not appear in "
@ -987,27 +865,15 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
char *nodeName = placement->nodeName; char *nodeName = placement->nodeName;
int nodePort = placement->nodePort; int nodePort = placement->nodePort;
WorkerNode *workerNode = FindWorkerNode(nodeName, nodePort);
int workerGroupId = 0;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); MultiConnection *connection = NULL;
uint32 connectionFlags = FOR_DML;
TransactionConnection *transactionConnection = NULL;
StringInfo copyCommand = NULL; StringInfo copyCommand = NULL;
PGresult *result = NULL; PGresult *result = NULL;
/* connection = GetPlacementConnection(connectionFlags, placement, nodeUser);
* When a copy is initiated from a worker, the information about the connected
* worker node may not be found if pg_dist_node entries are not synced to this
* node. In that case we leave the groupId as 0. Fortunately, it is unused since
* COPY from a worker does not initiate a 2PC.
*/
if (workerNode != NULL)
{
workerGroupId = workerNode->groupId;
}
if (connection == NULL) if (PQstatus(connection->pgConn) != CONNECTION_OK)
{ {
if (stopOnFailure) if (stopOnFailure)
{ {
@ -1019,42 +885,35 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
continue; continue;
} }
result = PQexec(connection, "BEGIN"); /*
if (PQresultStatus(result) != PGRES_COMMAND_OK) * If errors are supposed to cause immediate aborts (i.e. we don't
* want to/can't invalidate placements), mark the connection as
* critical so later errors cause failures.
*/
if (stopOnFailure)
{ {
WarnRemoteError(connection, result); MarkRemoteTransactionCritical(connection);
failedPlacementList = lappend(failedPlacementList, placement);
PQclear(result);
continue;
} }
ClaimConnectionExclusively(connection);
PQclear(result); RemoteTransactionBeginIfNecessary(connection);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat); useBinaryCopyFormat);
result = PQexec(connection->pgConn, copyCommand->data);
result = PQexec(connection, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN) if (PQresultStatus(result) != PGRES_COPY_IN)
{ {
WarnRemoteError(connection, result); ReportConnectionError(connection, WARNING);
failedPlacementList = lappend(failedPlacementList, placement); MarkRemoteTransactionFailed(connection, true);
PQclear(result); PQclear(result);
/* failed placements will be invalidated by transaction machinery */
failedPlacementList = lappend(failedPlacementList, placement);
continue; continue;
} }
PQclear(result); PQclear(result);
connectionList = lappend(connectionList, connection);
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->groupId = workerGroupId;
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_COPY_STARTED;
transactionConnection->connection = connection;
transactionConnection->nodeName = nodeName;
transactionConnection->nodePort = nodePort;
connectionList = lappend(connectionList, transactionConnection);
} }
/* if all placements failed, error out */ /* if all placements failed, error out */
@ -1070,14 +929,6 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
*/ */
Assert(!stopOnFailure || list_length(failedPlacementList) == 0); Assert(!stopOnFailure || list_length(failedPlacementList) == 0);
/* otherwise, mark failed placements as inactive: they're stale */
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *failedPlacement = (ShardPlacement *) lfirst(failedPlacementCell);
UpdateShardPlacementState(failedPlacement->placementId, FILE_INACTIVE);
}
shardConnections->connectionList = connectionList; shardConnections->connectionList = connectionList;
MemoryContextReset(localContext); MemoryContextReset(localContext);
@ -1161,7 +1012,7 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
StringInfo shardPlacementsCommand = makeStringInfo(); StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
queryResult = PQexec(masterConnection, shardPlacementsCommand->data); queryResult = PQexec(masterConnection->pgConn, shardPlacementsCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
int rowCount = PQntuples(queryResult); int rowCount = PQntuples(queryResult);
@ -1196,21 +1047,21 @@ RemoteFinalizedShardPlacementList(uint64 shardId)
/* Send copy binary headers to given connections */ /* Send copy binary headers to given connections */
static void static void
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList) SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList)
{ {
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryHeaders(copyOutState); AppendCopyBinaryHeaders(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
} }
/* Send copy binary footers to given connections */ /* Send copy binary footers to given connections */
static void static void
SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, List *connectionList)
{ {
resetStringInfo(copyOutState->fe_msgbuf); resetStringInfo(copyOutState->fe_msgbuf);
AppendCopyBinaryFooters(copyOutState); AppendCopyBinaryFooters(copyOutState);
SendCopyDataToAll(copyOutState->fe_msgbuf, connectionList); SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, connectionList);
} }
@ -1253,17 +1104,13 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
* SendCopyDataToAll sends copy data to all connections in a list. * SendCopyDataToAll sends copy data to all connections in a list.
*/ */
static void static void
SendCopyDataToAll(StringInfo dataBuffer, List *connectionList) SendCopyDataToAll(StringInfo dataBuffer, int64 shardId, List *connectionList)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList) foreach(connectionCell, connectionList)
{ {
TransactionConnection *transactionConnection = MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
(TransactionConnection *) lfirst(connectionCell); SendCopyDataToPlacement(dataBuffer, shardId, connection);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
SendCopyDataToPlacement(dataBuffer, connection, shardId);
} }
} }
@ -1273,73 +1120,61 @@ SendCopyDataToAll(StringInfo dataBuffer, List *connectionList)
* over the given connection. * over the given connection.
*/ */
static void static void
SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId) SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *connection)
{ {
int copyResult = PQputCopyData(connection, dataBuffer->data, dataBuffer->len); int copyResult = PQputCopyData(connection->pgConn, dataBuffer->data, dataBuffer->len);
if (copyResult != 1) if (copyResult != 1)
{ {
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%s", errmsg("failed to COPY to shard %ld on %s:%d",
shardId, nodeName, nodePort))); shardId, connection->hostname, connection->port),
errdetail("failed to send %d bytes %s", dataBuffer->len,
dataBuffer->data)));
} }
} }
/* /*
* EndRemoteCopy ends the COPY input on all connections. If stopOnFailure * EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
* is true, then EndRemoteCopy reports an error on failure, otherwise it * If stopOnFailure is true, then EndRemoteCopy reports an error on failure,
* reports a warning or continues. * otherwise it reports a warning or continues.
*/ */
static void static void
EndRemoteCopy(List *connectionList, bool stopOnFailure) EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList) foreach(connectionCell, connectionList)
{ {
TransactionConnection *transactionConnection = MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
int64 shardId = transactionConnection->connectionId;
int copyEndResult = 0; int copyEndResult = 0;
PGresult *result = NULL; PGresult *result = NULL;
if (transactionConnection->transactionState != TRANSACTION_STATE_COPY_STARTED)
{
/* a failure occurred after having previously called EndRemoteCopy */
continue;
}
/* end the COPY input */ /* end the COPY input */
copyEndResult = PQputCopyEnd(connection, NULL); copyEndResult = PQputCopyEnd(connection->pgConn, NULL);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
if (copyEndResult != 1) if (copyEndResult != 1)
{ {
if (stopOnFailure) if (stopOnFailure)
{ {
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard %ld on %s:%s", errmsg("failed to COPY to shard %ld on %s:%d",
shardId, nodeName, nodePort))); shardId, connection->hostname, connection->port)));
} }
continue; continue;
} }
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
result = PQgetResult(connection); result = PQgetResult(connection->pgConn);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure)
{ {
ReportCopyError(connection, result); ReportCopyError(connection, result);
} }
PQclear(result); PQclear(result);
ForgetResults(connection);
UnclaimConnection(connection);
} }
} }
@ -1349,7 +1184,7 @@ EndRemoteCopy(List *connectionList, bool stopOnFailure)
* the remote COPY error messages. * the remote COPY error messages.
*/ */
static void static void
ReportCopyError(PGconn *connection, PGresult *result) ReportCopyError(MultiConnection *connection, PGresult *result)
{ {
char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY); char *remoteMessage = PQresultErrorField(result, PG_DIAG_MESSAGE_PRIMARY);
@ -1365,10 +1200,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
{ {
/* probably a connection problem, get the message from the connection */ /* probably a connection problem, get the message from the connection */
char *lastNewlineIndex = NULL; char *lastNewlineIndex = NULL;
char *nodeName = ConnectionGetOptionValue(connection, "host");
char *nodePort = ConnectionGetOptionValue(connection, "port");
remoteMessage = PQerrorMessage(connection); remoteMessage = PQerrorMessage(connection->pgConn);
lastNewlineIndex = strrchr(remoteMessage, '\n'); lastNewlineIndex = strrchr(remoteMessage, '\n');
/* trim trailing newline, if any */ /* trim trailing newline, if any */
@ -1378,7 +1211,8 @@ ReportCopyError(PGconn *connection, PGresult *result)
} }
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to complete COPY on %s:%s", nodeName, nodePort), errmsg("failed to complete COPY on %s:%d", connection->hostname,
connection->port),
errdetail("%s", remoteMessage))); errdetail("%s", remoteMessage)));
} }
} }
@ -1581,23 +1415,25 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* StartCopyToNewShard creates a new shard and related shard placements and * StartCopyToNewShard creates a new shard and related shard placements and
* opens connections to shard placements. * opens connections to shard placements.
*/ */
static void static int64
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
bool useBinaryCopyFormat) bool useBinaryCopyFormat)
{ {
char *relationName = copyStatement->relation->relname; char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname; char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName); char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = MasterCreateEmptyShard(qualifiedName); int64 shardId = MasterCreateEmptyShard(qualifiedName);
bool stopOnFailure = true;
shardConnections->shardId = shardId; shardConnections->shardId = shardId;
list_free_deep(shardConnections->connectionList);
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */ /* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
useBinaryCopyFormat);
return shardId;
} }
@ -1654,7 +1490,7 @@ RemoteCreateEmptyShard(char *relationName)
StringInfo createEmptyShardCommand = makeStringInfo(); StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
queryResult = PQexec(masterConnection, createEmptyShardCommand->data); queryResult = PQexec(masterConnection->pgConn, createEmptyShardCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{ {
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
@ -1663,7 +1499,7 @@ RemoteCreateEmptyShard(char *relationName)
} }
else else
{ {
WarnRemoteError(masterConnection, queryResult); ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
} }
@ -1673,22 +1509,6 @@ RemoteCreateEmptyShard(char *relationName)
} }
/*
* FinalizeCopyToNewShard commits copy transaction and closes connections to
* shard placements.
*/
static void
FinalizeCopyToNewShard(ShardConnections *shardConnections)
{
/* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->connectionList, true);
/* commit transactions and close connections */
CommitRemoteTransactions(shardConnections->connectionList, true);
CloseConnections(shardConnections->connectionList);
}
/* /*
* MasterUpdateShardStatistics dispatches the update shard statistics call * MasterUpdateShardStatistics dispatches the update shard statistics call
* between local or remote master node according to the master connection state. * between local or remote master node according to the master connection state.
@ -1719,7 +1539,7 @@ RemoteUpdateShardStatistics(uint64 shardId)
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId); shardId);
queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data); queryResult = PQexec(masterConnection->pgConn, updateShardStatisticsCommand->data);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{ {
ereport(ERROR, (errmsg("could not update shard statistics"))); ereport(ERROR, (errmsg("could not update shard statistics")));

View File

@ -64,7 +64,7 @@
/* Shard related configuration */ /* Shard related configuration */
int ShardCount = 32; int ShardCount = 32;
int ShardReplicationFactor = 2; /* desired replication factor for shards */ int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;

View File

@ -52,8 +52,6 @@
/* Config variables that enable printing distributed query plans */ /* Config variables that enable printing distributed query plans */
bool ExplainMultiLogicalPlan = false;
bool ExplainMultiPhysicalPlan = false;
bool ExplainDistributedQueries = true; bool ExplainDistributedQueries = true;
bool ExplainAllTasks = false; bool ExplainAllTasks = false;
@ -79,6 +77,9 @@ static void ExplainTask(Task *task, int placementIndex, List *explainOutputList,
static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList, static void ExplainTaskPlacement(ShardPlacement *taskPlacement, List *explainOutputList,
ExplainState *es); ExplainState *es);
static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es); static StringInfo BuildRemoteExplainQuery(char *queryString, ExplainState *es);
static void MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into,
ExplainState *es, const char *queryString,
ParamListInfo params, const instr_time *planDuration);
/* Static Explain functions copied from explain.c */ /* Static Explain functions copied from explain.c */
static void ExplainOpenGroup(const char *objtype, const char *labelname, static void ExplainOpenGroup(const char *objtype, const char *labelname,
@ -100,17 +101,10 @@ void
MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es, MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
const char *queryString, ParamListInfo params) const char *queryString, ParamListInfo params)
{ {
MultiPlan *multiPlan = NULL;
CmdType commandType = CMD_UNKNOWN;
PlannedStmt *initialPlan = NULL;
Job *workerJob = NULL;
bool routerExecutablePlan = false;
instr_time planStart; instr_time planStart;
instr_time planDuration; instr_time planDuration;
Query *originalQuery = NULL;
RelationRestrictionContext *restrictionContext = NULL;
bool localQuery = !NeedsDistributedPlanning(query);
int cursorOptions = 0; int cursorOptions = 0;
PlannedStmt *plan = NULL;
#if PG_VERSION_NUM >= 90600 #if PG_VERSION_NUM >= 90600
@ -124,11 +118,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
} }
#endif #endif
/* handle local queries in the same way as ExplainOneQuery */ /* plan query, just like ExplainOneQuery does */
if (localQuery)
{
PlannedStmt *plan = NULL;
INSTR_TIME_SET_CURRENT(planStart); INSTR_TIME_SET_CURRENT(planStart);
/* plan the query */ /* plan the query */
@ -137,29 +127,33 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
INSTR_TIME_SET_CURRENT(planDuration); INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart); INSTR_TIME_SUBTRACT(planDuration, planStart);
/* if not a distributed query, use plain explain infrastructure */
if (!HasCitusToplevelNode(plan))
{
/* run it (if needed) and produce output */ /* run it (if needed) and produce output */
ExplainOnePlan(plan, into, es, queryString, params, &planDuration); ExplainOnePlan(plan, into, es, queryString, params, &planDuration);
}
return; else
{
MultiExplainOnePlan(plan, into, es, queryString, params, &planDuration);
}
} }
/* /*
* standard_planner scribbles on it's input, but for deparsing we need the * MultiExplainOnePlan explains the plan for an individual distributed query.
* unmodified form. So copy once we're sure it's a distributed query.
*/ */
originalQuery = copyObject(query); static void
MultiExplainOnePlan(PlannedStmt *plan, IntoClause *into,
/* measure the full planning time to display in EXPLAIN ANALYZE */ ExplainState *es, const char *queryString,
INSTR_TIME_SET_CURRENT(planStart); ParamListInfo params, const instr_time *planDuration)
restrictionContext = CreateAndPushRestrictionContext();
PG_TRY();
{ {
/* call standard planner to modify the query structure before multi planning */ MultiPlan *multiPlan = NULL;
initialPlan = standard_planner(query, cursorOptions, params); CmdType commandType = CMD_UNKNOWN;
Job *workerJob = NULL;
bool routerExecutablePlan = false;
commandType = initialPlan->commandType; commandType = plan->commandType;
if (commandType == CMD_INSERT || commandType == CMD_UPDATE || if (commandType == CMD_INSERT || commandType == CMD_UPDATE ||
commandType == CMD_DELETE) commandType == CMD_DELETE)
{ {
@ -171,38 +165,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
} }
} }
multiPlan = CreatePhysicalPlan(originalQuery, query, restrictionContext); multiPlan = GetMultiPlan(plan);
}
PG_CATCH();
{
PopRestrictionContext();
PG_RE_THROW();
}
PG_END_TRY();
PopRestrictionContext();
INSTR_TIME_SET_CURRENT(planDuration);
INSTR_TIME_SUBTRACT(planDuration, planStart);
if (ExplainMultiLogicalPlan)
{
MultiTreeRoot *multiTree = MultiLogicalPlanCreate(query);
char *logicalPlanString = CitusNodeToString(multiTree);
char *formattedPlanString = pretty_format_node_dump(logicalPlanString);
appendStringInfo(es->str, "logical plan:\n");
appendStringInfo(es->str, "%s\n", formattedPlanString);
}
if (ExplainMultiPhysicalPlan)
{
char *physicalPlanString = CitusNodeToString(multiPlan);
char *formattedPlanString = pretty_format_node_dump(physicalPlanString);
appendStringInfo(es->str, "physical plan:\n");
appendStringInfo(es->str, "%s\n", formattedPlanString);
}
if (!ExplainDistributedQueries) if (!ExplainDistributedQueries)
{ {
@ -268,8 +231,6 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
if (!routerExecutablePlan) if (!routerExecutablePlan)
{ {
PlannedStmt *masterPlan = MultiQueryContainerNode(initialPlan, multiPlan);
if (es->format == EXPLAIN_FORMAT_TEXT) if (es->format == EXPLAIN_FORMAT_TEXT)
{ {
appendStringInfoSpaces(es->str, es->indent * 2); appendStringInfoSpaces(es->str, es->indent * 2);
@ -279,7 +240,7 @@ MultiExplainOneQuery(Query *query, IntoClause *into, ExplainState *es,
ExplainOpenGroup("Master Query", "Master Query", false, es); ExplainOpenGroup("Master Query", "Master Query", false, es);
ExplainMasterPlan(masterPlan, into, es, queryString, params, &planDuration); ExplainMasterPlan(plan, into, es, queryString, params, planDuration);
ExplainCloseGroup("Master Query", "Master Query", false, es); ExplainCloseGroup("Master Query", "Master Query", false, es);

View File

@ -36,10 +36,15 @@ static List *relationRestrictionContextList = NIL;
/* local function forward declarations */ /* local function forward declarations */
static void CheckNodeIsDumpable(Node *node); static void CheckNodeIsDumpable(Node *node);
/* local function forward declarations */
static char * GetMultiPlanString(PlannedStmt *result); static char * GetMultiPlanString(PlannedStmt *result);
static PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan);
static struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *
restrictionContext);
static RelationRestrictionContext * CreateAndPushRestrictionContext(void);
static RelationRestrictionContext * CurrentRestrictionContext(void);
static void PopRestrictionContext(void);
/* Distributed planner hook */ /* Distributed planner hook */
@ -123,7 +128,7 @@ multi_planner(Query *parse, int cursorOptions, ParamListInfo boundParams)
* target shards. SELECT queries go through the full logical plan/optimize/ * target shards. SELECT queries go through the full logical plan/optimize/
* physical plan process needed to produce distributed query plans. * physical plan process needed to produce distributed query plans.
*/ */
MultiPlan * static MultiPlan *
CreatePhysicalPlan(Query *originalQuery, Query *query, CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *restrictionContext) RelationRestrictionContext *restrictionContext)
{ {

View File

@ -295,30 +295,6 @@ RegisterCitusConfigVariables(void)
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.explain_multi_logical_plan",
gettext_noop("Enables Explain to print out distributed logical plans."),
gettext_noop("We use this private configuration entry as a debugging aid. "
"If enabled, the Explain command prints out the optimized "
"logical plan for distributed queries."),
&ExplainMultiLogicalPlan,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.explain_multi_physical_plan",
gettext_noop("Enables Explain to print out distributed physical plans."),
gettext_noop("We use this private configuration entry as a debugging aid. "
"If enabled, the Explain command prints out the physical "
"plan for distributed queries."),
&ExplainMultiPhysicalPlan,
false,
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.explain_distributed_queries", "citus.explain_distributed_queries",
gettext_noop("Enables Explain for distributed queries."), gettext_noop("Enables Explain for distributed queries."),
@ -393,7 +369,7 @@ RegisterCitusConfigVariables(void)
"configuration value at sharded table creation time, " "configuration value at sharded table creation time, "
"and later reuse the initially read value."), "and later reuse the initially read value."),
&ShardReplicationFactor, &ShardReplicationFactor,
2, 1, 100, 1, 1, 100,
PGC_USERSET, PGC_USERSET,
0, 0,
NULL, NULL, NULL); NULL, NULL, NULL);

View File

@ -202,12 +202,12 @@ GetShardHashConnections(HTAB *connectionHash, int64 shardId, bool *connectionsFo
/* /*
* ConnectionList flattens the connection hash to a list of placement connections. * ShardConnectionList returns the list of ShardConnections in connectionHash.
*/ */
List * List *
ConnectionList(HTAB *connectionHash) ShardConnectionList(HTAB *connectionHash)
{ {
List *connectionList = NIL; List *shardConnectionsList = NIL;
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
@ -221,13 +221,12 @@ ConnectionList(HTAB *connectionHash)
shardConnections = (ShardConnections *) hash_seq_search(&status); shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL) while (shardConnections != NULL)
{ {
List *shardConnectionsList = list_copy(shardConnections->connectionList); shardConnectionsList = lappend(shardConnectionsList, shardConnections);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status); shardConnections = (ShardConnections *) hash_seq_search(&status);
} }
return connectionList; return shardConnectionsList;
} }

View File

@ -13,8 +13,6 @@
#include "executor/executor.h" #include "executor/executor.h"
/* Config variables managed via guc.c to explain distributed query plans */ /* Config variables managed via guc.c to explain distributed query plans */
extern bool ExplainMultiLogicalPlan;
extern bool ExplainMultiPhysicalPlan;
extern bool ExplainDistributedQueries; extern bool ExplainDistributedQueries;
extern bool ExplainAllTasks; extern bool ExplainAllTasks;

View File

@ -53,16 +53,8 @@ extern PlannedStmt * multi_planner(Query *parse, int cursorOptions,
extern bool HasCitusToplevelNode(PlannedStmt *planStatement); extern bool HasCitusToplevelNode(PlannedStmt *planStatement);
struct MultiPlan; struct MultiPlan;
extern struct MultiPlan * CreatePhysicalPlan(Query *originalQuery, Query *query,
RelationRestrictionContext *
restrictionContext);
extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement); extern struct MultiPlan * GetMultiPlan(PlannedStmt *planStatement);
extern PlannedStmt * MultiQueryContainerNode(PlannedStmt *result,
struct MultiPlan *multiPlan);
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
Index index, RangeTblEntry *rte); Index index, RangeTblEntry *rte);
extern RelationRestrictionContext * CreateAndPushRestrictionContext(void);
extern RelationRestrictionContext * CurrentRestrictionContext(void);
extern void PopRestrictionContext(void);
#endif /* MULTI_PLANNER_H */ #endif /* MULTI_PLANNER_H */

View File

@ -22,11 +22,7 @@ typedef struct ShardConnections
{ {
int64 shardId; int64 shardId;
/* /* list of MultiConnection structs */
* XXX: this list contains MultiConnection for multi-shard transactions
* or TransactionConnection for COPY, the latter should be converted to
* use MultiConnection as well.
*/
List *connectionList; List *connectionList;
} ShardConnections; } ShardConnections;
@ -36,7 +32,7 @@ extern HTAB * CreateShardConnectionHash(MemoryContext memoryContext);
extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound); extern ShardConnections * GetShardConnections(int64 shardId, bool *shardConnectionsFound);
extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId, extern ShardConnections * GetShardHashConnections(HTAB *connectionHash, int64 shardId,
bool *connectionsFound); bool *connectionsFound);
extern List * ConnectionList(HTAB *connectionHash); extern List * ShardConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList); extern void CloseConnections(List *connectionList);
extern void ResetShardPlacementTransactionState(void); extern void ResetShardPlacementTransactionState(void);

View File

@ -1456,17 +1456,15 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications ERROR: single-shard DML commands must not appear in transaction blocks which contain multi-shard data modifications
ROLLBACK; ROLLBACK;
-- Insert after copy is currently disallowed because of the way the -- Insert after copy is currently disallowed because of the way the
-- transaction modification state is currently handled. Copy still -- transaction modification state is currently handled. Copy is also
-- goes through despite rollback. -- rolled back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands ERROR: multi-shard data modifications must not appear in transaction blocks which contain single-shard DML commands
ROLLBACK; ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation. -- Insert after copy is currently allowed for single-shard operation.
-- Since the COPY commits immediately, the result is visible in the -- Both insert and copy are rolled back successfully.
-- next operation. Copy goes through despite rollback, while insert
-- rolls back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101;
@ -1477,9 +1475,7 @@ SELECT user_id FROM raw_events_first WHERE user_id = 101;
(1 row) (1 row)
ROLLBACK; ROLLBACK;
-- Copy after insert is disallowed since the insert is not immediately -- Copy after insert is currently disallowed.
-- committed and the copy uses different connections that will not yet
-- see the result of the insert.
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
@ -1489,8 +1485,6 @@ ROLLBACK;
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100; INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY raw_events_first, line 1: "103,103"
ROLLBACK; ROLLBACK;
-- selecting from views works -- selecting from views works
CREATE VIEW test_view AS SELECT * FROM raw_events_first; CREATE VIEW test_view AS SELECT * FROM raw_events_first;
@ -1499,7 +1493,7 @@ INSERT INTO raw_events_first (user_id, time, value_1, value_2, value_3, value_4)
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
------- -------
11 9
(1 row) (1 row)
INSERT INTO raw_events_second SELECT * FROM test_view; INSERT INTO raw_events_second SELECT * FROM test_view;
@ -1509,7 +1503,7 @@ INSERT INTO raw_events_second SELECT * FROM test_view WHERE user_id = 17 GROUP B
SELECT count(*) FROM raw_events_second; SELECT count(*) FROM raw_events_second;
count count
------- -------
13 11
(1 row) (1 row)
-- inserting into views does not -- inserting into views does not

View File

@ -192,37 +192,178 @@ SELECT * FROM labs WHERE id = 6;
----+------ ----+------
(0 rows) (0 rows)
-- COPY can't happen second, -- COPY can happen after single row INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "10,Weyland-Yutani"
COMMIT; COMMIT;
-- though it will work if before any modifications -- COPY can happen before single row INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
SELECT name FROM labs WHERE id = 10; SELECT name FROM labs WHERE id = 10;
name name
---------------- ----------------
Weyland-Yutani Weyland-Yutani
(1 row) Weyland-Yutani
(2 rows)
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- two consecutive COPYs in a transaction are allowed
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
ERROR: distributed copy operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: COPY labs, line 1: "12,fsociety"
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
name name
---------------- ----------------
Planet Express Planet Express
fsociety
(2 rows)
-- 1pc failure test
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row) (1 row)
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
-- 2pc failure and success tests
SET citus.multi_shard_commit_protocol TO '2pc';
SELECT recover_prepared_transactions();
recover_prepared_transactions
-------------------------------
0
(1 row)
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
ERROR: duplicate key value violates unique constraint "avoid_name_confusion_idx_1200001"
DETAIL: Key (lab_id, name)=(6, 'Bjarne Stroustrup') already exists.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+------
(0 rows)
SELECT count(*) FROM pg_dist_transaction;
count
-------
0
(1 row)
BEGIN;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
COMMIT;
-- verify success
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
-- verify 2pc
SELECT count(*) FROM pg_dist_transaction;
count
-------
2
(1 row)
RESET citus.multi_shard_commit_protocol;
-- create a check function
SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
BEGIN
IF (NEW.id > 30) THEN
RAISE ''illegal value'';
END IF;
RETURN NEW;
END;
$rli$ LANGUAGE plpgsql;')
ORDER BY nodeport;
nodename | nodeport | success | result
-----------+----------+---------+-----------------
localhost | 57637 | t | CREATE FUNCTION
localhost | 57638 | t | CREATE FUNCTION
(2 rows)
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+----------------
localhost | 57637 | 1200000 | t | CREATE TRIGGER
localhost | 57637 | 1200001 | t | CREATE TRIGGER
localhost | 57638 | 1200000 | t | CREATE TRIGGER
localhost | 57638 | 1200001 | t | CREATE TRIGGER
(4 rows)
-- hide postgresql version dependend messages for next test only
\set VERBOSITY terse
-- deferred check should abort the transaction
BEGIN;
DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ','
\copy researchers FROM STDIN delimiter ','
COMMIT;
WARNING: illegal value
WARNING: failed to commit transaction on localhost:57638
WARNING: illegal value
WARNING: failed to commit transaction on localhost:57637
WARNING: could not commit transaction for shard 1200001 on any active node
ERROR: could not commit transaction on any active node
\unset VERBOSITY
-- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6;
id | lab_id | name
----+--------+----------------------
17 | 6 | 'Bjarne Stroustrup'
18 | 6 | 'Dennis Ritchie'
(2 rows)
-- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
ORDER BY nodeport, shardid;
nodename | nodeport | shardid | success | result
-----------+----------+---------+---------+--------------
localhost | 57637 | 1200000 | t | DROP TRIGGER
localhost | 57637 | 1200001 | t | DROP TRIGGER
localhost | 57638 | 1200000 | t | DROP TRIGGER
localhost | 57638 | 1200001 | t | DROP TRIGGER
(4 rows)
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
ORDER BY nodeport;
nodename | nodeport | success | result
-----------+----------+---------+---------------
localhost | 57637 | t | DROP FUNCTION
localhost | 57638 | t | DROP FUNCTION
(2 rows)
-- finally, ALTER and copy aren't compatible -- finally, ALTER and copy aren't compatible
BEGIN; BEGIN;
ALTER TABLE labs ADD COLUMN motto text; ALTER TABLE labs ADD COLUMN motto text;
@ -238,11 +379,6 @@ COMMIT;
id | bigint | not null id | bigint | not null
name | text | not null name | text | not null
SELECT * FROM labs WHERE id = 12;
id | name
----+------
(0 rows)
-- and if the copy is before the ALTER... -- and if the copy is before the ALTER...
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
@ -269,7 +405,7 @@ ALTER TABLE labs ADD COLUMN motto text;
SELECT master_modify_multiple_shards('DELETE FROM labs'); SELECT master_modify_multiple_shards('DELETE FROM labs');
master_modify_multiple_shards master_modify_multiple_shards
------------------------------- -------------------------------
5 7
(1 row) (1 row)
ALTER TABLE labs ADD COLUMN score float; ALTER TABLE labs ADD COLUMN score float;

View File

@ -707,3 +707,4 @@ HINT: No function matches the given name and argument types. You might need to
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164 WARNING: could not get statistics for shard public.composite_partition_column_table_560164
DETAIL: Setting shard statistics to NULL DETAIL: Setting shard statistics to NULL
ERROR: failure on connection marked as essential: localhost:57637

View File

@ -219,6 +219,7 @@ push(@pgOptions, '-c', "citus.max_running_tasks_per_node=4");
push(@pgOptions, '-c', "citus.expire_cached_shards=on"); push(@pgOptions, '-c', "citus.expire_cached_shards=on");
push(@pgOptions, '-c', "citus.task_tracker_delay=10ms"); push(@pgOptions, '-c', "citus.task_tracker_delay=10ms");
push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms"); push(@pgOptions, '-c', "citus.remote_task_check_interval=1ms");
push(@pgOptions, '-c', "citus.shard_replication_factor=2");
# Add externally added options last, so they overwrite the default ones above # Add externally added options last, so they overwrite the default ones above
for my $option (@userPgOptions) for my $option (@userPgOptions)

View File

@ -706,8 +706,8 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 100
ROLLBACK; ROLLBACK;
-- Insert after copy is currently disallowed because of the way the -- Insert after copy is currently disallowed because of the way the
-- transaction modification state is currently handled. Copy still -- transaction modification state is currently handled. Copy is also
-- goes through despite rollback. -- rolled back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
100,100 100,100
@ -716,9 +716,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second;
ROLLBACK; ROLLBACK;
-- Insert after copy is currently allowed for single-shard operation. -- Insert after copy is currently allowed for single-shard operation.
-- Since the COPY commits immediately, the result is visible in the -- Both insert and copy are rolled back successfully.
-- next operation. Copy goes through despite rollback, while insert
-- rolls back.
BEGIN; BEGIN;
COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_second (user_id, value_1) FROM STDIN DELIMITER ',';
101,101 101,101
@ -727,9 +725,7 @@ INSERT INTO raw_events_first SELECT * FROM raw_events_second WHERE user_id = 101
SELECT user_id FROM raw_events_first WHERE user_id = 101; SELECT user_id FROM raw_events_first WHERE user_id = 101;
ROLLBACK; ROLLBACK;
-- Copy after insert is disallowed since the insert is not immediately -- Copy after insert is currently disallowed.
-- committed and the copy uses different connections that will not yet
-- see the result of the insert.
BEGIN; BEGIN;
INSERT INTO raw_events_first SELECT * FROM raw_events_second; INSERT INTO raw_events_first SELECT * FROM raw_events_second;
COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ','; COPY raw_events_first (user_id, value_1) FROM STDIN DELIMITER ',';

View File

@ -148,7 +148,7 @@ COMMIT;
\d labs \d labs
SELECT * FROM labs WHERE id = 6; SELECT * FROM labs WHERE id = 6;
-- COPY can't happen second, -- COPY can happen after single row INSERT
BEGIN; BEGIN;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
@ -156,7 +156,7 @@ INSERT INTO labs VALUES (6, 'Bell Labs');
\. \.
COMMIT; COMMIT;
-- though it will work if before any modifications -- COPY can happen before single row INSERT
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
10,Weyland-Yutani 10,Weyland-Yutani
@ -165,7 +165,7 @@ SELECT name FROM labs WHERE id = 10;
INSERT INTO labs VALUES (6, 'Bell Labs'); INSERT INTO labs VALUES (6, 'Bell Labs');
COMMIT; COMMIT;
-- but a double-copy isn't allowed (the first will persist) -- two consecutive COPYs in a transaction are allowed
BEGIN; BEGIN;
\copy labs from stdin delimiter ',' \copy labs from stdin delimiter ','
11,Planet Express 11,Planet Express
@ -175,7 +175,93 @@ BEGIN;
\. \.
COMMIT; COMMIT;
SELECT name FROM labs WHERE id = 11; SELECT name FROM labs WHERE id = 11 OR id = 12 ORDER BY id;
-- 1pc failure test
SELECT recover_prepared_transactions();
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Bjarne Stroustrup'
\.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
SELECT count(*) FROM pg_dist_transaction;
-- 2pc failure and success tests
SET citus.multi_shard_commit_protocol TO '2pc';
SELECT recover_prepared_transactions();
-- copy with unique index violation
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Bjarne Stroustrup'
\.
COMMIT;
-- verify rollback
SELECT * FROM researchers WHERE lab_id = 6;
SELECT count(*) FROM pg_dist_transaction;
BEGIN;
\copy researchers FROM STDIN delimiter ','
17, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
18, 6, 'Dennis Ritchie'
\.
COMMIT;
-- verify success
SELECT * FROM researchers WHERE lab_id = 6;
-- verify 2pc
SELECT count(*) FROM pg_dist_transaction;
RESET citus.multi_shard_commit_protocol;
-- create a check function
SELECT * from run_command_on_workers('CREATE FUNCTION reject_large_id() RETURNS trigger AS $rli$
BEGIN
IF (NEW.id > 30) THEN
RAISE ''illegal value'';
END IF;
RETURN NEW;
END;
$rli$ LANGUAGE plpgsql;')
ORDER BY nodeport;
-- register after insert trigger
SELECT * FROM run_command_on_placements('researchers', 'CREATE CONSTRAINT TRIGGER reject_large_researcher_id AFTER INSERT ON %s DEFERRABLE INITIALLY DEFERRED FOR EACH ROW EXECUTE PROCEDURE reject_large_id()')
ORDER BY nodeport, shardid;
-- hide postgresql version dependend messages for next test only
\set VERBOSITY terse
-- deferred check should abort the transaction
BEGIN;
DELETE FROM researchers WHERE lab_id = 6;
\copy researchers FROM STDIN delimiter ','
31, 6, 'Bjarne Stroustrup'
\.
\copy researchers FROM STDIN delimiter ','
30, 6, 'Dennis Ritchie'
\.
COMMIT;
\unset VERBOSITY
-- verify everyhing including delete is rolled back
SELECT * FROM researchers WHERE lab_id = 6;
-- cleanup triggers and the function
SELECT * from run_command_on_placements('researchers', 'drop trigger reject_large_researcher_id on %s')
ORDER BY nodeport, shardid;
SELECT * FROM run_command_on_workers('drop function reject_large_id()')
ORDER BY nodeport;
-- finally, ALTER and copy aren't compatible -- finally, ALTER and copy aren't compatible
BEGIN; BEGIN;
@ -187,7 +273,6 @@ COMMIT;
-- but the DDL should correctly roll back -- but the DDL should correctly roll back
\d labs \d labs
SELECT * FROM labs WHERE id = 12;
-- and if the copy is before the ALTER... -- and if the copy is before the ALTER...
BEGIN; BEGIN;