mirror of https://github.com/citusdata/citus.git
Support for COPY FROM, based on pg_shard PR by Postres Pro
parent
297bd5768d
commit
d37ccff15b
File diff suppressed because it is too large
Load Diff
|
@ -17,6 +17,7 @@
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
|
@ -50,7 +51,7 @@ static bool IsTransmitStmt(Node *parsetree);
|
||||||
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
|
|
||||||
/* Local functions forward declarations for processing distributed table commands */
|
/* Local functions forward declarations for processing distributed table commands */
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement);
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag);
|
||||||
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
||||||
const char *createIndexCommand);
|
const char *createIndexCommand);
|
||||||
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
||||||
|
@ -122,7 +123,12 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
|
|
||||||
if (IsA(parsetree, CopyStmt))
|
if (IsA(parsetree, CopyStmt))
|
||||||
{
|
{
|
||||||
parsetree = ProcessCopyStmt((CopyStmt *) parsetree);
|
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag);
|
||||||
|
|
||||||
|
if (parsetree == NULL)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsA(parsetree, IndexStmt))
|
if (IsA(parsetree, IndexStmt))
|
||||||
|
@ -300,7 +306,7 @@ VerifyTransmitStmt(CopyStmt *copyStatement)
|
||||||
* COPYing from distributed tables and preventing unsupported actions.
|
* COPYing from distributed tables and preventing unsupported actions.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessCopyStmt(CopyStmt *copyStatement)
|
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
|
* We first check if we have a "COPY (query) TO filename". If we do, copy doesn't
|
||||||
|
@ -344,9 +350,8 @@ ProcessCopyStmt(CopyStmt *copyStatement)
|
||||||
{
|
{
|
||||||
if (copyStatement->is_from)
|
if (copyStatement->is_from)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
CitusCopyFrom(copyStatement, completionTag);
|
||||||
errmsg("cannot execute COPY FROM on a distributed table "
|
return NULL;
|
||||||
"on master node")));
|
|
||||||
}
|
}
|
||||||
else if (!copyStatement->is_from)
|
else if (!copyStatement->is_from)
|
||||||
{
|
{
|
||||||
|
|
|
@ -110,8 +110,6 @@ static MapMergeJob * BuildMapMergeJob(Query *jobQuery, List *dependedJobList,
|
||||||
Oid baseRelationId,
|
Oid baseRelationId,
|
||||||
BoundaryNodeJobType boundaryNodeJobType);
|
BoundaryNodeJobType boundaryNodeJobType);
|
||||||
static uint32 HashPartitionCount(void);
|
static uint32 HashPartitionCount(void);
|
||||||
static int CompareShardIntervals(const void *leftElement, const void *rightElement,
|
|
||||||
FmgrInfo *typeCompareFunction);
|
|
||||||
static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
|
static ArrayType * SplitPointObject(ShardInterval **shardIntervalArray,
|
||||||
uint32 shardIntervalCount);
|
uint32 shardIntervalCount);
|
||||||
|
|
||||||
|
@ -169,7 +167,6 @@ static List * RoundRobinAssignTaskList(List *taskList);
|
||||||
static List * RoundRobinReorder(Task *task, List *placementList);
|
static List * RoundRobinReorder(Task *task, List *placementList);
|
||||||
static List * ReorderAndAssignTaskList(List *taskList,
|
static List * ReorderAndAssignTaskList(List *taskList,
|
||||||
List * (*reorderFunction)(Task *, List *));
|
List * (*reorderFunction)(Task *, List *));
|
||||||
static int CompareTasksByShardId(const void *leftElement, const void *rightElement);
|
|
||||||
static List * ActiveShardPlacementLists(List *taskList);
|
static List * ActiveShardPlacementLists(List *taskList);
|
||||||
static List * ActivePlacementList(List *placementList);
|
static List * ActivePlacementList(List *placementList);
|
||||||
static List * LeftRotateList(List *list, uint32 rotateCount);
|
static List * LeftRotateList(List *list, uint32 rotateCount);
|
||||||
|
@ -1810,7 +1807,7 @@ SortedShardIntervalArray(List *shardIntervalList)
|
||||||
* CompareShardIntervals acts as a helper function to compare two shard interval
|
* CompareShardIntervals acts as a helper function to compare two shard interval
|
||||||
* pointers by their minimum values, using the value's type comparison function.
|
* pointers by their minimum values, using the value's type comparison function.
|
||||||
*/
|
*/
|
||||||
static int
|
int
|
||||||
CompareShardIntervals(const void *leftElement, const void *rightElement,
|
CompareShardIntervals(const void *leftElement, const void *rightElement,
|
||||||
FmgrInfo *typeCompareFunction)
|
FmgrInfo *typeCompareFunction)
|
||||||
{
|
{
|
||||||
|
@ -5073,7 +5070,7 @@ ReorderAndAssignTaskList(List *taskList, List * (*reorderFunction)(Task *, List
|
||||||
|
|
||||||
|
|
||||||
/* Helper function to compare two tasks by their anchor shardId. */
|
/* Helper function to compare two tasks by their anchor shardId. */
|
||||||
static int
|
int
|
||||||
CompareTasksByShardId(const void *leftElement, const void *rightElement)
|
CompareTasksByShardId(const void *leftElement, const void *rightElement)
|
||||||
{
|
{
|
||||||
const Task *leftTask = *((const Task **) leftElement);
|
const Task *leftTask = *((const Task **) leftElement);
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/modify_planner.h"
|
#include "distributed/modify_planner.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_explain.h"
|
#include "distributed/multi_explain.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
|
@ -29,6 +30,7 @@
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/task_tracker.h"
|
#include "distributed/task_tracker.h"
|
||||||
|
#include "distributed/transaction_manager.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "postmaster/postmaster.h"
|
#include "postmaster/postmaster.h"
|
||||||
|
@ -67,6 +69,12 @@ static const struct config_enum_entry shard_placement_policy_options[] = {
|
||||||
{ NULL, 0, false }
|
{ NULL, 0, false }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const struct config_enum_entry transaction_manager_options[] = {
|
||||||
|
{ "1pc", TRANSACTION_MANAGER_1PC, false },
|
||||||
|
{ "2pc", TRANSACTION_MANAGER_2PC, false },
|
||||||
|
{ NULL, 0, false }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
/* shared library initialization function */
|
/* shared library initialization function */
|
||||||
void
|
void
|
||||||
|
@ -437,6 +445,21 @@ RegisterCitusConfigVariables(void)
|
||||||
0,
|
0,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomEnumVariable(
|
||||||
|
"citus.copy_transaction_manager",
|
||||||
|
gettext_noop("Sets the transaction manager for COPY into distributed tables."),
|
||||||
|
gettext_noop("When a failure occurs during when copying into a distributed "
|
||||||
|
"table, 2PC is required to ensure data is never lost. Change "
|
||||||
|
"this setting to '2pc' from its default '1pc' to enable 2PC."
|
||||||
|
"You must also set max_prepared_transactions on the worker "
|
||||||
|
"nodes. Recovery from failed 2PCs is currently manual."),
|
||||||
|
&CopyTransactionManager,
|
||||||
|
TRANSACTION_MANAGER_1PC,
|
||||||
|
transaction_manager_options,
|
||||||
|
PGC_USERSET,
|
||||||
|
0,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomEnumVariable(
|
DefineCustomEnumVariable(
|
||||||
"citus.task_assignment_policy",
|
"citus.task_assignment_policy",
|
||||||
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
gettext_noop("Sets the policy to use when assigning tasks to worker nodes."),
|
||||||
|
|
|
@ -39,8 +39,6 @@ static HTAB *NodeConnectionHash = NULL;
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static HTAB * CreateNodeConnectionHash(void);
|
static HTAB * CreateNodeConnectionHash(void);
|
||||||
static PGconn * ConnectToNode(char *nodeName, char *nodePort);
|
|
||||||
static char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -99,10 +97,7 @@ GetOrEstablishConnection(char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
if (needNewConnection)
|
if (needNewConnection)
|
||||||
{
|
{
|
||||||
StringInfo nodePortString = makeStringInfo();
|
connection = ConnectToNode(nodeName, nodePort);
|
||||||
appendStringInfo(nodePortString, "%d", nodePort);
|
|
||||||
|
|
||||||
connection = ConnectToNode(nodeName, nodePortString->data);
|
|
||||||
if (connection != NULL)
|
if (connection != NULL)
|
||||||
{
|
{
|
||||||
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
nodeConnectionEntry = hash_search(NodeConnectionHash, &nodeConnectionKey,
|
||||||
|
@ -264,8 +259,8 @@ CreateNodeConnectionHash(void)
|
||||||
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
|
* We attempt to connect up to MAX_CONNECT_ATTEMPT times. After that we give up
|
||||||
* and return NULL.
|
* and return NULL.
|
||||||
*/
|
*/
|
||||||
static PGconn *
|
PGconn *
|
||||||
ConnectToNode(char *nodeName, char *nodePort)
|
ConnectToNode(char *nodeName, int32 nodePort)
|
||||||
{
|
{
|
||||||
PGconn *connection = NULL;
|
PGconn *connection = NULL;
|
||||||
const char *clientEncoding = GetDatabaseEncodingName();
|
const char *clientEncoding = GetDatabaseEncodingName();
|
||||||
|
@ -276,11 +271,14 @@ ConnectToNode(char *nodeName, char *nodePort)
|
||||||
"host", "port", "fallback_application_name",
|
"host", "port", "fallback_application_name",
|
||||||
"client_encoding", "connect_timeout", "dbname", NULL
|
"client_encoding", "connect_timeout", "dbname", NULL
|
||||||
};
|
};
|
||||||
|
char nodePortString[12];
|
||||||
const char *valueArray[] = {
|
const char *valueArray[] = {
|
||||||
nodeName, nodePort, "citus", clientEncoding,
|
nodeName, nodePortString, "citus", clientEncoding,
|
||||||
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, NULL
|
CLIENT_CONNECT_TIMEOUT_SECONDS, dbname, NULL
|
||||||
};
|
};
|
||||||
|
|
||||||
|
sprintf(nodePortString, "%d", nodePort);
|
||||||
|
|
||||||
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
Assert(sizeof(keywordArray) == sizeof(valueArray));
|
||||||
|
|
||||||
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
for (attemptIndex = 0; attemptIndex < MAX_CONNECT_ATTEMPTS; attemptIndex++)
|
||||||
|
@ -313,7 +311,7 @@ ConnectToNode(char *nodeName, char *nodePort)
|
||||||
* The function returns NULL if the connection has no setting for an option with
|
* The function returns NULL if the connection has no setting for an option with
|
||||||
* the provided keyword.
|
* the provided keyword.
|
||||||
*/
|
*/
|
||||||
static char *
|
char *
|
||||||
ConnectionGetOptionValue(PGconn *connection, char *optionKeyword)
|
ConnectionGetOptionValue(PGconn *connection, char *optionKeyword)
|
||||||
{
|
{
|
||||||
char *optionValue = NULL;
|
char *optionValue = NULL;
|
||||||
|
|
|
@ -0,0 +1,173 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* transaction_manager.c
|
||||||
|
* This file contains functions that comprise a pluggable API for
|
||||||
|
* managing transactions across many worker nodes using 1PC or 2PC.
|
||||||
|
*
|
||||||
|
* Contributed by Konstantin Knizhnik, Postgres Professional
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "distributed/connection_cache.h"
|
||||||
|
#include "distributed/transaction_manager.h"
|
||||||
|
|
||||||
|
static bool BeginTransaction(PGconn *connection);
|
||||||
|
static bool Prepare1PC(PGconn *connection, char *transactionId);
|
||||||
|
static bool CommitPrepared1PC(PGconn *connection, char *transactionId);
|
||||||
|
static bool RollbackPrepared1PC(PGconn *connection, char *transactionId);
|
||||||
|
static bool RollbackTransaction(PGconn *connection);
|
||||||
|
|
||||||
|
static bool Prepare2PC(PGconn *connection, char *transactionId);
|
||||||
|
static bool CommitPrepared2PC(PGconn *connection, char *transactionId);
|
||||||
|
static bool RollbackPrepared2PC(PGconn *connection, char *transactionId);
|
||||||
|
|
||||||
|
static char * Build2PCCommand(char const *command, char *transactionId);
|
||||||
|
|
||||||
|
CitusTransactionManager const CitusTransactionManagerImpl[] =
|
||||||
|
{
|
||||||
|
{ BeginTransaction, Prepare1PC, CommitPrepared1PC,
|
||||||
|
RollbackPrepared1PC, RollbackTransaction },
|
||||||
|
{ BeginTransaction, Prepare2PC, CommitPrepared2PC,
|
||||||
|
RollbackPrepared2PC, RollbackTransaction }
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BeginTransaction sends a BEGIN command to start a transaction.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
BeginTransaction(PGconn *connection)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK, "BEGIN");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare1PC does nothing since 1PC mode does not have a prepare phase.
|
||||||
|
* This function is provided for compatibility with the 2PC API.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
Prepare1PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Commit1PC sends a COMMIT command to commit a transaction.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CommitPrepared1PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK, "COMMIT");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RollbackPrepared1PC sends a ROLLBACK command to roll a transaction
|
||||||
|
* back. This function is provided for compatibility with the 2PC API.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RollbackPrepared1PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Prepare2PC sends a PREPARE TRANSACTION command to prepare a 2PC.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
Prepare2PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK,
|
||||||
|
Build2PCCommand("PREPARE TRANSACTION", transactionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CommitPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CommitPrepared2PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK,
|
||||||
|
Build2PCCommand("COMMIT PREPARED", transactionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RollbackPrepared2PC sends a COMMIT TRANSACTION command to commit a 2PC.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RollbackPrepared2PC(PGconn *connection, char *transactionId)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK,
|
||||||
|
Build2PCCommand("ROLLBACK PREPARED", transactionId));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RollbackTransaction sends a ROLLBACK command to roll a transaction back.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RollbackTransaction(PGconn *connection)
|
||||||
|
{
|
||||||
|
return ExecuteCommand(connection, PGRES_COMMAND_OK, "ROLLBACK");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Build2PCCommand builds a command with a unique transaction ID for a two-phase commit.
|
||||||
|
*/
|
||||||
|
static char *
|
||||||
|
Build2PCCommand(char const *command, char *transactionId)
|
||||||
|
{
|
||||||
|
StringInfo commandString = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(commandString, "%s '%s'", transactionId);
|
||||||
|
|
||||||
|
return commandString->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BuildTransactionId helps users construct a unique transaction id from an
|
||||||
|
* application-specific id.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
BuildTransactionId(int localId)
|
||||||
|
{
|
||||||
|
StringInfo commandString = makeStringInfo();
|
||||||
|
|
||||||
|
appendStringInfo(commandString, "citus_%d_%u_%d", MyProcPid,
|
||||||
|
GetCurrentTransactionId(), localId);
|
||||||
|
|
||||||
|
return commandString->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteCommand executes a statement on a remote node and checks its result.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
ExecuteCommand(PGconn *connection, ExecStatusType expectedResult, char const *command)
|
||||||
|
{
|
||||||
|
bool ret = true;
|
||||||
|
PGresult *result = PQexec(connection, command);
|
||||||
|
if (PQresultStatus(result) != expectedResult)
|
||||||
|
{
|
||||||
|
ReportRemoteError(connection, result);
|
||||||
|
ret = false;
|
||||||
|
}
|
||||||
|
PQclear(result);
|
||||||
|
return ret;
|
||||||
|
}
|
|
@ -54,6 +54,8 @@ typedef struct NodeConnectionEntry
|
||||||
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
extern PGconn * GetOrEstablishConnection(char *nodeName, int32 nodePort);
|
||||||
extern void PurgeConnection(PGconn *connection);
|
extern void PurgeConnection(PGconn *connection);
|
||||||
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
extern void ReportRemoteError(PGconn *connection, PGresult *result);
|
||||||
|
extern PGconn * ConnectToNode(char *nodeName, int nodePort);
|
||||||
|
extern char * ConnectionGetOptionValue(PGconn *connection, char *optionKeyword);
|
||||||
|
|
||||||
|
|
||||||
#endif /* CONNECTION_CACHE_H */
|
#endif /* CONNECTION_CACHE_H */
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* multi_copy.h
|
||||||
|
*
|
||||||
|
* Declarations for public functions and variables used in COPY for
|
||||||
|
* distributed tables.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef MULTI_COPY_H
|
||||||
|
#define MULTI_COPY_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "distributed/transaction_manager.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* config variable managed via guc.c */
|
||||||
|
extern int CopyTransactionManager;
|
||||||
|
|
||||||
|
|
||||||
|
/* function declarations for copying into a distributed table */
|
||||||
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char* completionTag);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* MULTI_COPY_H */
|
|
@ -238,6 +238,8 @@ extern int CompareShardPlacements(const void *leftElement, const void *rightElem
|
||||||
extern ShardInterval ** SortedShardIntervalArray(List *shardList);
|
extern ShardInterval ** SortedShardIntervalArray(List *shardList);
|
||||||
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
|
extern bool ShardIntervalsOverlap(ShardInterval *firstInterval,
|
||||||
ShardInterval *secondInterval);
|
ShardInterval *secondInterval);
|
||||||
|
extern int CompareShardIntervals(const void *leftElement, const void *rightElement,
|
||||||
|
FmgrInfo *typeCompareFunction);
|
||||||
|
|
||||||
/* function declarations for Task and Task list operations */
|
/* function declarations for Task and Task list operations */
|
||||||
extern bool TasksEqual(const Task *a, const Task *b);
|
extern bool TasksEqual(const Task *a, const Task *b);
|
||||||
|
@ -247,6 +249,7 @@ extern bool TaskListMember(const List *taskList, const Task *task);
|
||||||
extern List * TaskListDifference(const List *list1, const List *list2);
|
extern List * TaskListDifference(const List *list1, const List *list2);
|
||||||
extern List * TaskListUnion(const List *list1, const List *list2);
|
extern List * TaskListUnion(const List *list1, const List *list2);
|
||||||
extern List * FirstReplicaAssignTaskList(List *taskList);
|
extern List * FirstReplicaAssignTaskList(List *taskList);
|
||||||
|
extern int CompareTasksByShardId(const void *leftElement, const void *rightElement);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_PHYSICAL_PLANNER_H */
|
#endif /* MULTI_PHYSICAL_PLANNER_H */
|
||||||
|
|
|
@ -0,0 +1,49 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* transaction_manager.h
|
||||||
|
*
|
||||||
|
* Transaction manager API.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef TRANSACTION_MANAGER_H
|
||||||
|
#define TRANSACTION_MANAGER_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "libpq-fe.h"
|
||||||
|
#include "utils/guc.h"
|
||||||
|
|
||||||
|
|
||||||
|
/* pluggable transaction manager API */
|
||||||
|
typedef struct CitusTransactionManager
|
||||||
|
{
|
||||||
|
bool (*Begin)(PGconn *conn);
|
||||||
|
bool (*Prepare)(PGconn *conn, char *transactionId);
|
||||||
|
bool (*CommitPrepared)(PGconn *conn, char *transactionId);
|
||||||
|
bool (*RollbackPrepared)(PGconn *conn, char *transactionId);
|
||||||
|
bool (*Rollback)(PGconn *conn);
|
||||||
|
} CitusTransactionManager;
|
||||||
|
|
||||||
|
|
||||||
|
/* Enumeration that defines the transaction manager to use */
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
TRANSACTION_MANAGER_1PC = 0,
|
||||||
|
TRANSACTION_MANAGER_2PC = 1
|
||||||
|
} TransactionManagerType;
|
||||||
|
|
||||||
|
|
||||||
|
/* Implementations of the transaction manager API */
|
||||||
|
extern CitusTransactionManager const CitusTransactionManagerImpl[];
|
||||||
|
|
||||||
|
|
||||||
|
/* Function declarations for copying into a distributed table */
|
||||||
|
extern bool ExecuteCommand(PGconn *connection, ExecStatusType expectedResult,
|
||||||
|
char const *command);
|
||||||
|
extern char * BuildTransactionId(int localId);
|
||||||
|
|
||||||
|
|
||||||
|
#endif /* TRANSACTION_MANAGER_H */
|
|
@ -18,9 +18,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
||||||
COPY sharded_table TO STDOUT;
|
COPY sharded_table TO STDOUT;
|
||||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||||
0
|
0
|
||||||
-- but COPY in is not
|
|
||||||
COPY sharded_table FROM STDIN;
|
|
||||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
|
||||||
-- cursors may not involve distributed tables
|
-- cursors may not involve distributed tables
|
||||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||||
ERROR: DECLARE CURSOR can only be used in transaction blocks
|
ERROR: DECLARE CURSOR can only be used in transaction blocks
|
||||||
|
|
|
@ -188,9 +188,6 @@ VIETNAM
|
||||||
RUSSIA
|
RUSSIA
|
||||||
UNITED KINGDOM
|
UNITED KINGDOM
|
||||||
UNITED STATES
|
UNITED STATES
|
||||||
-- Ensure that preventing COPY FROM against distributed tables works
|
|
||||||
COPY customer FROM STDIN;
|
|
||||||
ERROR: cannot execute COPY FROM on a distributed table on master node
|
|
||||||
-- Test that we can create on-commit drop tables, and also test creating with
|
-- Test that we can create on-commit drop tables, and also test creating with
|
||||||
-- oids, along with changing column names
|
-- oids, along with changing column names
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
|
@ -10,9 +10,6 @@ SELECT master_create_worker_shards('sharded_table', 2, 1);
|
||||||
COPY sharded_table TO STDOUT;
|
COPY sharded_table TO STDOUT;
|
||||||
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
COPY (SELECT COUNT(*) FROM sharded_table) TO STDOUT;
|
||||||
|
|
||||||
-- but COPY in is not
|
|
||||||
COPY sharded_table FROM STDIN;
|
|
||||||
|
|
||||||
-- cursors may not involve distributed tables
|
-- cursors may not involve distributed tables
|
||||||
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
DECLARE all_sharded_rows CURSOR FOR SELECT * FROM sharded_table;
|
||||||
|
|
||||||
|
|
|
@ -104,9 +104,6 @@ COPY nation TO STDOUT;
|
||||||
-- ensure individual cols can be copied out, too
|
-- ensure individual cols can be copied out, too
|
||||||
COPY nation(n_name) TO STDOUT;
|
COPY nation(n_name) TO STDOUT;
|
||||||
|
|
||||||
-- Ensure that preventing COPY FROM against distributed tables works
|
|
||||||
COPY customer FROM STDIN;
|
|
||||||
|
|
||||||
-- Test that we can create on-commit drop tables, and also test creating with
|
-- Test that we can create on-commit drop tables, and also test creating with
|
||||||
-- oids, along with changing column names
|
-- oids, along with changing column names
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue