mirror of https://github.com/citusdata/citus.git
Support for COPY FROM, based on pg_shard PR by Postres Pro
parent
754e16b737
commit
690252b222
File diff suppressed because it is too large
Load Diff
|
@ -17,6 +17,7 @@
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
|
@ -50,7 +51,7 @@ static bool IsTransmitStmt(Node *parsetree);
|
||||||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
|
|
||||||
/* Local functions forward declarations for processing distributed table commands */
|
/* Local functions forward declarations for processing distributed table commands */
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement);
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag);
|
||||||
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
||||||
const char *createIndexCommand);
|
const char *createIndexCommand);
|
||||||
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
||||||
|
@ -122,7 +123,12 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
|
|
||||||
if (IsA(parsetree, CopyStmt))
|
if (IsA(parsetree, CopyStmt))
|
||||||
{
|
{
|
||||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree);
|
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag);
|
||||||
|
|
||||||
|
if (parsetree == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsA(parsetree, IndexStmt))
|
if (IsA(parsetree, IndexStmt))
|
||||||
|
@ -300,7 +306,7 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
||||||
* COPYing from distributed tables and preventing unsupported actions.
|
* COPYing from distributed tables and preventing unsupported actions.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessCopyStmt(CopyStmt *copyStatement)
|
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
|
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
|
||||||
|
@ -344,9 +350,8 @@ ProcessCopyStmt(CopyStmt *copyStatement)
|
||||||
{
|
{
|
||||||
if (copyStatement->is_from)
|
if (copyStatement->is_from)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
CitusCopyFrom(copyStatement, completionTag);
|
||||||
errmsg("cannot execute COPY FROM on a distributed table "
|
return NULL;
|
||||||
"on master node")));
|
|
||||||
}
|
}
|
||||||
else if (!copyStatement->is_from)
|
else if (!copyStatement->is_from)
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/modify_planner.h"
|
#include "distributed/modify_planner.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_explain.h"
|
#include "distributed/multi_explain.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
|
@ -27,6 +28,7 @@
|
||||||
#include "distributed/multi_planner.h"
|
#include "distributed/multi_planner.h"
|
||||||
#include "distributed/multi_router_executor.h"
|
#include "distributed/multi_router_executor.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
|
#include "distributed/multi_transaction.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/task_tracker.h"
|
#include "distributed/task_tracker.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
|
@ -67,6 +69,12 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const struct config_enum_entry transaction_manager_options[] = {
|
||||||
|
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
||||||
|
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
||||||
|
{ NULL, 0, false }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/* shared library initialization function */
|
/* shared library initialization function */
|
||||||
void
|
void
|
||||||
|
@ -437,6 +445,21 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomEnumVariable(
|
||||||
|
"citus.copy_transaction_manager",
|
||||||
|
gettext_noop("Sets the transaction manager for COPY into distributed tables."),
|
||||||
|
gettext_noop("When a failure occurs during when copying into a distributed "
|
||||||
|
"table, 2PC is required to ensure data is never lost. Change "
|
||||||
|
"this setting to '2pc' from its default '1pc' to enable 2PC."
|
||||||
|
"You must also set max_prepared_transactions on the worker "
|
||||||
|
"nodes. Recovery from failed 2PCs is currently manual."),
|
||||||
|
&CopyTransactionManager,
|
||||||
|
TRANSACTION_MANAGER_1PC,
|
||||||
|
transaction_manager_options,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
DefineCustomEnumVariable(
|
||||||
"citus.task_assignment_policy",
|
"citus.task_assignment_policy",
|
||||||
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
||||||
|
|
|
@ -39,8 +39,6 @@ static HTAB *NodeConnectionHash = NULL;
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static HTAB * CreateNodeConnectionHash(void);
|
static HTAB * CreateNodeConnectionHash(void);
|
||||||
static PGconn * ConnectToNode(char *nodeName, char *nodePort);
|
|
||||||
static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -99,10 +97,7 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
if (needNewConnection)
|
if (needNewConnection)
|
||||||
{
|
{
|
||||||
StringInfo nodePortString = makeStringInfo();
|
connection = ConnectToNode(nodeName, nodePort);
|
||||||
appendStringInfo(nodePortString, "%d", nodePort);
|
|
||||||
|
|
||||||
connection = ConnectToNode(nodeName, nodePortString->data);
|
|
||||||
if (connection != NULL)
|
if (connection != NULL)
|
||||||
{
|
{
|
||||||
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
||||||
|
@ -264,8 +259,8 @@ CreateNodeConnectionHash(void)
|
||||||
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
|
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
|
||||||
* and return NULL.
|
* and return NULL.
|
||||||
*/
|
*/
|
||||||
static PGconn *
|
PGconn *
|
||||||
ConnectToNode(char *nodeName, char *nodePort)
|
ConnectToNode(char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
PGconn *connection = NULL;
|
PGconn *connection = NULL;
|
||||||
const char *clientEncoding = GetDatabaseEncodingName();
|
const char *clientEncoding = GetDatabaseEncodingName();
|
||||||
|
@ -276,11 +271,14 @@ ConnectToNode(char *nodeName, char *nodePort)
|
||||||
"host", "port", "fallback_application_name",
|
"host", "port", "fallback_application_name",
|
||||||
"client_encoding", "connect_timeout", "dbname", NULL
|
"client_encoding", "connect_timeout", "dbname", NULL
|
||||||
};
|
};
|
||||||
|
char nodePortString[12];
|
||||||
const char *valueArray[] = {
|
const char *valueArray[] = {
|
||||||
nodeName, nodePort, "citus", clientEncoding,
|
nodeName, nodePortString, "citus", clientEncoding,
|
||||||
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, NULL
|
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
sprintf(nodePortString, "%d", nodePort);
|
||||||
|
|
||||||
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
||||||
|
|
||||||
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
||||||
|
@ -313,7 +311,7 @@ ConnectToNode(char *nodeName, char *nodePort)
|
||||||
* The function returns NULL if the connection has no setting for an option with
|
* The function returns NULL if the connection has no setting for an option with
|
||||||
* the provided keyword.
|
* the provided keyword.
|
||||||
*/
|
*/
|
||||||
static char *
|
char *
|
||||||
ConnectionGetOptionValue(PGconn *connection, char *optionKeyword)
|
ConnectionGetOptionValue(PGconn *connection, char *optionKeyword)
|
||||||
{
|
{
|
||||||
char *optionValue = NULL;
|
char *optionValue = NULL;
|
||||||
|
|
|
@ -0,0 +1,211 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* multi_transaction.c
|
||||||
|
* This file contains functions for managing 1PC or 2PC transactions
|
||||||
|
* across many shard placements.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "distributed/connection_cache.h"
|
||||||
|
#include "distributed/multi_transaction.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* Local functions forward declarations */
|
||||||
|
static StringInfo BuildTransactionName(int connectionId);
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* PrepareTransactions prepares all transactions on connections in
|
||||||
|
* connectionList for commit if the 2PC transaction manager is enabled.
|
||||||
|
* On failure, it reports an error and stops.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
PrepareTransactions(List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
TransactionConnection *transactionConnection =
|
||||||
|
(TransactionConnection *) lfirst(connectionCell);
|
||||||
|
PGconn *connection = transactionConnection->connection;
|
||||||
|
int64 connectionId = transactionConnection->connectionId;
|
||||||
|
|
||||||
|
PGresult *result = NULL;
|
||||||
|
StringInfo command = makeStringInfo();
|
||||||
|
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||||
|
|
||||||
|
appendStringInfo(command, "PREPARE TRANSACTION '%s'", transactionName->data);
|
||||||
|
|
||||||
|
result = PQexec(connection, command->data);
|
||||||
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
/* a failure to prepare is an implicit rollback */
|
||||||
|
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||||
|
|
||||||
|
ReportRemoteError(connection, result);
|
||||||
|
PQclear(result);
|
||||||
|
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
|
||||||
|
errmsg("Failed to prepare transaction")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
|
||||||
|
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* AbortTransactions aborts all transactions on connections in connectionList.
|
||||||
|
* On failure, it reports a warning and continues to abort all of them.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
AbortTransactions(List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
TransactionConnection *transactionConnection =
|
||||||
|
(TransactionConnection *) lfirst(connectionCell);
|
||||||
|
PGconn *connection = transactionConnection->connection;
|
||||||
|
int64 connectionId = transactionConnection->connectionId;
|
||||||
|
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||||
|
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||||
|
PGresult *result = NULL;
|
||||||
|
|
||||||
|
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||||
|
{
|
||||||
|
StringInfo command = makeStringInfo();
|
||||||
|
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||||
|
|
||||||
|
appendStringInfo(command, "ROLLBACK PREPARED '%s'", transactionName->data);
|
||||||
|
|
||||||
|
result = PQexec(connection, command->data);
|
||||||
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
/* log a warning so the user may abort the transaction later */
|
||||||
|
ereport(WARNING, (errmsg("Failed to roll back prepared transaction '%s'",
|
||||||
|
transactionName->data),
|
||||||
|
errhint("Run ROLLBACK TRANSACTION '%s' on %s:%s",
|
||||||
|
transactionName->data, nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
|
||||||
|
{
|
||||||
|
/* try to roll back cleanly, if it fails then we won't commit anyway */
|
||||||
|
result = PQexec(connection, "ROLLBACK");
|
||||||
|
PQclear(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CommitTransactions commits all transactions on connections in connectionList.
|
||||||
|
* On failure, it reports a warning and continues committing all of them.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CommitTransactions(List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
TransactionConnection *transactionConnection =
|
||||||
|
(TransactionConnection *) lfirst(connectionCell);
|
||||||
|
PGconn *connection = transactionConnection->connection;
|
||||||
|
int64 connectionId = transactionConnection->connectionId;
|
||||||
|
char *nodeName = ConnectionGetOptionValue(connection, "host");
|
||||||
|
char *nodePort = ConnectionGetOptionValue(connection, "port");
|
||||||
|
PGresult *result = NULL;
|
||||||
|
|
||||||
|
if (transactionConnection->transactionState == TRANSACTION_STATE_PREPARED)
|
||||||
|
{
|
||||||
|
StringInfo command = makeStringInfo();
|
||||||
|
StringInfo transactionName = BuildTransactionName(connectionId);
|
||||||
|
|
||||||
|
/* we shouldn't be committing if any transactions are not prepared */
|
||||||
|
Assert(transactionConnection->transactionState == TRANSACTION_STATE_PREPARED);
|
||||||
|
|
||||||
|
appendStringInfo(command, "COMMIT PREPARED '%s'", transactionName->data);
|
||||||
|
|
||||||
|
result = PQexec(connection, command->data);
|
||||||
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
/* log a warning so the user may commit the transaction later */
|
||||||
|
ereport(WARNING, (errmsg("Failed to commit prepared transaction '%s'",
|
||||||
|
transactionName->data),
|
||||||
|
errhint("Run COMMIT TRANSACTION '%s' on %s:%s",
|
||||||
|
transactionName->data, nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* we shouldn't be committing if any transactions are not open */
|
||||||
|
Assert(transactionConnection->transactionState == TRANSACTION_STATE_OPEN);
|
||||||
|
|
||||||
|
/* try to commit, if it fails then the user might lose data */
|
||||||
|
result = PQexec(connection, "COMMIT");
|
||||||
|
if (PQresultStatus(result) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("Failed to commit transaction on %s:%s",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
|
||||||
|
transactionConnection->transactionState = TRANSACTION_STATE_CLOSED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildTransactionName constructs a unique transaction name from an ID.
|
||||||
|
*/
|
||||||
|
static StringInfo
|
||||||
|
BuildTransactionName(int connectionId)
|
||||||
|
{
|
||||||
|
StringInfo commandString = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
||||||
|
GetCurrentTransactionId(), connectionId);
|
||||||
|
|
||||||
|
return commandString;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CloseConnections closes all connections in connectionList.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CloseConnections(List *connectionList)
|
||||||
|
{
|
||||||
|
ListCell *connectionCell = NULL;
|
||||||
|
|
||||||
|
foreach(connectionCell, connectionList)
|
||||||
|
{
|
||||||
|
TransactionConnection *transactionConnection =
|
||||||
|
(TransactionConnection *) lfirst(connectionCell);
|
||||||
|
PGconn *connection = transactionConnection->connection;
|
||||||
|
|
||||||
|
PQfinish(connection);
|
||||||
|
}
|
||||||
|
}
|
|
@ -54,6 +54,8 @@ typedef struct NodeConnectionEntry
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
extern PGconn * ConnectToNode(char *nodeName, int nodePort);
|
||||||
|
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||||
|
|
||||||
|
|
||||||
#endif /* CONNECTION_CACHE_H */
|
#endif /* CONNECTION_CACHE_H */
|
||||||
|
|
|
@ -0,0 +1,27 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* multi_copy.h
|
||||||
|
* Declarations for public functions and variables used in COPY for
|
||||||
|
* distributed tables.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MULTI_COPY_H
|
||||||
|
#define MULTI_COPY_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "nodes/parsenodes.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* config variable managed via guc.c */
|
||||||
|
extern int CopyTransactionManager;
|
||||||
|
|
||||||
|
|
||||||
|
/* function declarations for copying into a distributed table */
|
||||||
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* MULTI_COPY_H */
|
|
@ -0,0 +1,57 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* multi_transaction.h
|
||||||
|
* Type and function declarations used in performing transactions across
|
||||||
|
* shard placements.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MULTI_TRANSACTION_H
|
||||||
|
#define MULTI_TRANSACTION_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
|
#include "nodes/pg_list.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* Enumeration that defines the different transaction managers available */
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
TRANSACTION_MANAGER_1PC = 0,
|
||||||
|
TRANSACTION_MANAGER_2PC = 1
|
||||||
|
} TransactionManagerType;
|
||||||
|
|
||||||
|
/* Enumeration that defines different remote transaction states */
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
TRANSACTION_STATE_INVALID = 0,
|
||||||
|
TRANSACTION_STATE_OPEN,
|
||||||
|
TRANSACTION_STATE_COPY_STARTED,
|
||||||
|
TRANSACTION_STATE_PREPARED,
|
||||||
|
TRANSACTION_STATE_CLOSED
|
||||||
|
} TransactionState;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TransactionConnection represents a connection to a remote node which is
|
||||||
|
* used to perform a transaction on shard placements.
|
||||||
|
*/
|
||||||
|
typedef struct TransactionConnection
|
||||||
|
{
|
||||||
|
int64 connectionId;
|
||||||
|
TransactionState transactionState;
|
||||||
|
PGconn *connection;
|
||||||
|
} TransactionConnection;
|
||||||
|
|
||||||
|
|
||||||
|
/* Functions declarations for transaction and connection management */
|
||||||
|
extern void PrepareTransactions(List *connectionList);
|
||||||
|
extern void AbortTransactions(List *connectionList);
|
||||||
|
extern void CommitTransactions(List *connectionList);
|
||||||
|
extern void CloseConnections(List *connectionList);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* MULTI_TRANSACTION_H */
|
|
@ -18,9 +18,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
||||||
COPY sharded_table TO STDOUT;
|
COPY sharded_table TO STDOUT;
|
||||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||||
0
|
0
|
||||||
-- but COPY in is not
|
|
||||||
COPY sharded_table FROM STDIN;
|
|
||||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
|
||||||
-- cursors may not involve distributed tables
|
-- cursors may not involve distributed tables
|
||||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||||
ERROR: DECLARE CURSOR can only be used in transaction blocks
|
ERROR: DECLARE CURSOR can only be used in transaction blocks
|
||||||
|
|
|
@ -188,9 +188,6 @@ VIETNAM
|
||||||
RUSSIA
|
RUSSIA
|
||||||
UNITED KINGDOM
|
UNITED KINGDOM
|
||||||
UNITED STATES
|
UNITED STATES
|
||||||
-- Ensure that preventing COPY FROM against distributed tables works
|
|
||||||
COPY customer FROM STDIN;
|
|
||||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
|
||||||
-- Test that we can create on-commit drop tables, and also test creating with
|
-- Test that we can create on-commit drop tables, and also test creating with
|
||||||
-- oids, along with changing column names
|
-- oids, along with changing column names
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
--
|
||||||
|
-- MULTI_COPY
|
||||||
|
--
|
||||||
|
-- Create a new hash-partitioned table into which to COPY
|
||||||
|
CREATE TABLE customer_copy_hash (
|
||||||
|
c_custkey integer,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
||||||
|
|
||||||
|
-- Test COPY into empty hash-partitioned table
|
||||||
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
|
||||||
|
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||||
|
|
||||||
|
-- Test empty copy
|
||||||
|
COPY customer_copy_hash FROM STDIN;
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Test syntax error
|
||||||
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
||||||
|
1,customer1
|
||||||
|
2,customer2,
|
||||||
|
notinteger,customernot
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Test primary key violation
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv');
|
||||||
|
1,customer1
|
||||||
|
2,customer2
|
||||||
|
2,customer2
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Test headers option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
||||||
|
# header
|
||||||
|
1,customer1
|
||||||
|
2,customer2
|
||||||
|
3,customer3
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that only first row was skipped
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Test force_not_null option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
||||||
|
"4","customer4",""
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that value is not null
|
||||||
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
||||||
|
|
||||||
|
-- Test force_null option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
||||||
|
"5","customer5",""
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that value is null
|
||||||
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
||||||
|
|
||||||
|
-- Test null violation
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv');
|
||||||
|
6,customer6
|
||||||
|
7,customer7
|
||||||
|
8,
|
||||||
|
\.
|
||||||
|
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Test server-side copy from program
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
||||||
|
WITH (DELIMITER ' ');
|
||||||
|
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
||||||
|
|
||||||
|
-- Test server-side copy from file
|
||||||
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
||||||
|
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Test client-side copy from file
|
||||||
|
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
||||||
|
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
|
||||||
|
-- Create a new range-partitioned table into which to COPY
|
||||||
|
CREATE TABLE customer_copy_range (
|
||||||
|
c_custkey integer,
|
||||||
|
c_name varchar(25),
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
||||||
|
|
||||||
|
-- Test COPY into empty range-partitioned table
|
||||||
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
|
||||||
|
-- Test copy into range-partitioned table
|
||||||
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
|
||||||
|
-- Check whether data went into the right shard (maybe)
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
||||||
|
FROM customer_copy_range WHERE c_custkey <= 500;
|
||||||
|
|
||||||
|
-- Check whether data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_range;
|
|
@ -100,6 +100,7 @@ test: multi_append_table_to_shard
|
||||||
# ---------
|
# ---------
|
||||||
test: multi_outer_join
|
test: multi_outer_join
|
||||||
|
|
||||||
|
#
|
||||||
# ---
|
# ---
|
||||||
# Tests covering mostly modification queries and required preliminary
|
# Tests covering mostly modification queries and required preliminary
|
||||||
# functionality related to metadata, shard creation, shard pruning and
|
# functionality related to metadata, shard creation, shard pruning and
|
||||||
|
@ -121,6 +122,11 @@ test: multi_create_insert_proxy
|
||||||
test: multi_data_types
|
test: multi_data_types
|
||||||
test: multi_repartitioned_subquery_udf
|
test: multi_repartitioned_subquery_udf
|
||||||
|
|
||||||
|
# ---------
|
||||||
|
# multi_copy creates hash and range-partitioned tables and performs COPY
|
||||||
|
# ---------
|
||||||
|
test: multi_copy
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# multi_large_shardid stages more shards into lineitem
|
# multi_large_shardid stages more shards into lineitem
|
||||||
# ----------
|
# ----------
|
||||||
|
|
|
@ -0,0 +1,174 @@
|
||||||
|
--
|
||||||
|
-- MULTI_COPY
|
||||||
|
--
|
||||||
|
-- Create a new hash-partitioned table into which to COPY
|
||||||
|
CREATE TABLE customer_copy_hash (
|
||||||
|
c_custkey integer,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test COPY into empty hash-partitioned table
|
||||||
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
ERROR: could not find any shards for query
|
||||||
|
DETAIL: No shards exist for distributed table "customer_copy_hash".
|
||||||
|
HINT: Run master_create_worker_shards to create shards and try again.
|
||||||
|
SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
|
||||||
|
master_create_worker_shards
|
||||||
|
-----------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test empty copy
|
||||||
|
COPY customer_copy_hash FROM STDIN;
|
||||||
|
-- Test syntax error
|
||||||
|
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
|
||||||
|
ERROR: invalid input syntax for integer: "1,customer1"
|
||||||
|
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test primary key violation
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv');
|
||||||
|
ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_103160"
|
||||||
|
DETAIL: Key (c_custkey)=(2) already exists.
|
||||||
|
CONTEXT: COPY customer_copy_hash, line 4: ""
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test headers option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey));
|
||||||
|
-- Confirm that only first row was skipped
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test force_not_null option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address));
|
||||||
|
-- Confirm that value is not null
|
||||||
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test force_null option
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address));
|
||||||
|
-- Confirm that value is null
|
||||||
|
SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test null violation
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM STDIN
|
||||||
|
WITH (FORMAT 'csv');
|
||||||
|
ERROR: null value in column "c_name" violates not-null constraint
|
||||||
|
DETAIL: Failing row contains (8, null, null, null, null, null, null, null).
|
||||||
|
CONTEXT: COPY customer_copy_hash, line 4: ""
|
||||||
|
-- Confirm that no data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
5
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test server-side copy from program
|
||||||
|
COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9'
|
||||||
|
WITH (DELIMITER ' ');
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test server-side copy from file
|
||||||
|
COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|');
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1006
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test client-side copy from file
|
||||||
|
\COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|');
|
||||||
|
-- Confirm that data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_hash;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
2006
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Create a new range-partitioned table into which to COPY
|
||||||
|
CREATE TABLE customer_copy_range (
|
||||||
|
c_custkey integer,
|
||||||
|
c_name varchar(25),
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test COPY into empty range-partitioned table
|
||||||
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
ERROR: could not find any shards for query
|
||||||
|
DETAIL: No shards exist for distributed table "customer_copy_range".
|
||||||
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 1, shardmaxvalue = 500
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
SELECT master_create_empty_shard('customer_copy_range') AS new_shard_id
|
||||||
|
\gset
|
||||||
|
UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000
|
||||||
|
WHERE shardid = :new_shard_id;
|
||||||
|
-- Test copy into range-partitioned table
|
||||||
|
COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITER '|');
|
||||||
|
-- Check whether data went into the right shard (maybe)
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*)
|
||||||
|
FROM customer_copy_range WHERE c_custkey <= 500;
|
||||||
|
min | max | avg | count
|
||||||
|
-----+-----+----------------------+-------
|
||||||
|
1 | 500 | 250.5000000000000000 | 500
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Check whether data was copied
|
||||||
|
SELECT count(*) FROM customer_copy_range;
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
1000
|
||||||
|
(1 row)
|
||||||
|
|
|
@ -10,9 +10,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
||||||
COPY sharded_table TO STDOUT;
|
COPY sharded_table TO STDOUT;
|
||||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||||
|
|
||||||
-- but COPY in is not
|
|
||||||
COPY sharded_table FROM STDIN;
|
|
||||||
|
|
||||||
-- cursors may not involve distributed tables
|
-- cursors may not involve distributed tables
|
||||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||||
|
|
||||||
|
|
|
@ -104,9 +104,6 @@ COPY nation TO STDOUT;
|
||||||
-- ensure individual cols can be copied out, too
|
-- ensure individual cols can be copied out, too
|
||||||
COPY nation(n_name) TO STDOUT;
|
COPY nation(n_name) TO STDOUT;
|
||||||
|
|
||||||
-- Ensure that preventing COPY FROM against distributed tables works
|
|
||||||
COPY customer FROM STDIN;
|
|
||||||
|
|
||||||
-- Test that we can create on-commit drop tables, and also test creating with
|
-- Test that we can create on-commit drop tables, and also test creating with
|
||||||
-- oids, along with changing column names
|
-- oids, along with changing column names
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue