Merge pull request #618 from citusdata/ddl_propagation_2pc

Propagate DDL Commands via 2PC
pull/654/head
Eren Başak 2016-07-19 10:54:57 +03:00 committed by GitHub
commit b5e806ff55
14 changed files with 746 additions and 377 deletions

View File

@ -15,7 +15,7 @@ DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
SCRIPTS = ../../bin/scripts/copy_to_distributed_table SCRIPTS = ../../bin/scripts/copy_to_distributed_table
# directories with source files # directories with source files
SUBDIRS = . commands executor master planner relay test utils worker SUBDIRS = . commands executor master planner relay test transaction utils worker
# That patsubst rule searches all directories listed in SUBDIRS for .c # That patsubst rule searches all directories listed in SUBDIRS for .c
# files, and adds the corresponding .o files to OBJS # files, and adds the corresponding .o files to OBJS

View File

@ -71,6 +71,7 @@
#include "commands/copy.h" #include "commands/copy.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
@ -79,7 +80,7 @@
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_physical_planner.h" #include "distributed/multi_physical_planner.h"
#include "distributed/multi_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"

View File

@ -18,11 +18,15 @@
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/tablecmds.h" #include "commands/tablecmds.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/multi_shard_transaction.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h" #include "distributed/transmit.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
@ -36,12 +40,14 @@
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h" #include "utils/rel.h"
#include "utils/syscache.h" #include "utils/syscache.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */ bool EnableDDLPropagation = true; /* ddl propagation is enabled */
/* /*
* This struct defines the state for the callback for drop statements. * This struct defines the state for the callback for drop statements.
* It is copied as it is from commands/tablecmds.c in Postgres source. * It is copied as it is from commands/tablecmds.c in Postgres source.
@ -62,11 +68,11 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner); bool *commandMustRunAsOwner);
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
const char *createIndexCommand); const char *createIndexCommand, bool isTopLevel);
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
const char *dropIndexCommand); const char *dropIndexCommand, bool isTopLevel);
static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand); const char *alterTableCommand, bool isTopLevel);
/* Local functions forward declarations for unsupported command checks */ /* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
@ -77,9 +83,12 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */ /* Local functions forward declarations for helper functions */
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString); static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, bool isTopLevel);
List **failedPlacementList); static void SetLocalCommitProtocolTo2PC(void);
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString);
static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
ShardConnections *shardConnections);
static bool AllFinalizedPlacementsAccessible(Oid relationId); static bool AllFinalizedPlacementsAccessible(Oid relationId);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg); void *arg);
@ -150,9 +159,12 @@ multi_ProcessUtility(Node *parsetree,
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */ /* ddl commands are propagated to workers only if EnableDDLPropagation is set */
if (EnableDDLPropagation) if (EnableDDLPropagation)
{ {
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
if (IsA(parsetree, IndexStmt)) if (IsA(parsetree, IndexStmt))
{ {
parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString); parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString,
isTopLevel);
} }
if (IsA(parsetree, DropStmt)) if (IsA(parsetree, DropStmt))
@ -160,7 +172,7 @@ multi_ProcessUtility(Node *parsetree,
DropStmt *dropStatement = (DropStmt *) parsetree; DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType == OBJECT_INDEX) if (dropStatement->removeType == OBJECT_INDEX)
{ {
parsetree = ProcessDropIndexStmt(dropStatement, queryString); parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel);
} }
} }
@ -169,7 +181,8 @@ multi_ProcessUtility(Node *parsetree,
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
if (alterTableStmt->relkind == OBJECT_TABLE) if (alterTableStmt->relkind == OBJECT_TABLE)
{ {
parsetree = ProcessAlterTableStmt(alterTableStmt, queryString); parsetree = ProcessAlterTableStmt(alterTableStmt, queryString,
isTopLevel);
} }
} }
@ -457,7 +470,8 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
* master node table. * master node table.
*/ */
static Node * static Node *
ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
bool isTopLevel)
{ {
/* /*
* We first check whether a distributed relation is affected. For that, we need to * We first check whether a distributed relation is affected. For that, we need to
@ -504,7 +518,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
ErrorIfUnsupportedIndexStmt(createIndexStatement); ErrorIfUnsupportedIndexStmt(createIndexStatement);
/* if it is supported, go ahead and execute the command */ /* if it is supported, go ahead and execute the command */
ExecuteDistributedDDLCommand(relationId, createIndexCommand); ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel);
} }
} }
@ -521,7 +535,8 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
* master node table. * master node table.
*/ */
static Node * static Node *
ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
bool isTopLevel)
{ {
ListCell *dropObjectCell = NULL; ListCell *dropObjectCell = NULL;
Oid distributedIndexId = InvalidOid; Oid distributedIndexId = InvalidOid;
@ -590,7 +605,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
/* if it is supported, go ahead and execute the command */ /* if it is supported, go ahead and execute the command */
ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand); ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel);
} }
return (Node *) dropIndexStatement; return (Node *) dropIndexStatement;
@ -606,7 +621,8 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
* master node table. * master node table.
*/ */
static Node * static Node *
ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand,
bool isTopLevel)
{ {
/* first check whether a distributed relation is affected */ /* first check whether a distributed relation is affected */
if (alterTableStatement->relation != NULL) if (alterTableStatement->relation != NULL)
@ -621,7 +637,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
ErrorIfUnsupportedAlterTableStmt(alterTableStatement); ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
/* if it is supported, go ahead and execute the command */ /* if it is supported, go ahead and execute the command */
ExecuteDistributedDDLCommand(relationId, alterTableCommand); ExecuteDistributedDDLCommand(relationId, alterTableCommand, isTopLevel);
} }
} }
} }
@ -985,18 +1001,23 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
/* /*
* ExecuteDistributedDDLCommand applies a given DDL command to the given * ExecuteDistributedDDLCommand applies a given DDL command to the given
* distributed table. If the function is unable to access all the finalized * distributed table in a distributed transaction. For the transaction, the value of
* shard placements, then it fails early and errors out. If the command * citus.multi_shard_commit_protocol is set to '2pc' so that two phase commit mechanism
* successfully executed on any finalized shard placement, and failed on * is used, regardless of the actual value of citus.multi_shard_commit_protocol. In
* others, then it marks the placements on which execution failed as invalid. * the commit protocol, a BEGIN is sent after connection to each shard placement
* and COMMIT/ROLLBACK is handled by CompleteShardPlacementTransactions function.
*/ */
static void static void
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
bool isTopLevel)
{ {
List *failedPlacementList = NIL;
bool executionOK = false; bool executionOK = false;
bool allPlacementsAccessible = false;
bool allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); PreventTransactionChain(true, "distributed DDL commands");
SetLocalCommitProtocolTo2PC();
allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId);
if (!allPlacementsAccessible) if (!allPlacementsAccessible)
{ {
ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString), ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString),
@ -1007,31 +1028,13 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString)
/* make sure we don't process cancel signals */ /* make sure we don't process cancel signals */
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString, executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString);
&failedPlacementList);
/* if command could not be executed on any finalized shard placement, error out */ /* if command could not be executed on any finalized shard placement, error out */
if (!executionOK) if (!executionOK)
{ {
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
} }
else
{
/* else, mark failed placements as inactive */
ListCell *failedPlacementCell = NULL;
foreach(failedPlacementCell, failedPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell);
uint64 shardId = placement->shardId;
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
uint64 oldShardLength = placement->shardLength;
DeleteShardPlacementRow(shardId, workerName, workerPort);
InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength,
workerName, workerPort);
}
}
if (QueryCancelPending) if (QueryCancelPending)
{ {
@ -1043,93 +1046,152 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString)
} }
/*
* SwitchTo2PCForTransaction sets the citus.multi_shard_commit_protocol value
* to '2pc' for the current transaction, in order to force 2PC even the value
* of citus.multi_shard_commit_protocol is '1pc';
*/
static void
SetLocalCommitProtocolTo2PC(void)
{
if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC)
{
ereport(DEBUG2, (errmsg("switching to 2PC for the transaction")));
#if (PG_VERSION_NUM >= 90500)
set_config_option("citus.multi_shard_commit_protocol",
"2pc",
PGC_USERSET,
PGC_S_SESSION,
GUC_ACTION_LOCAL,
true, 0, false);
#else
set_config_option("citus.multi_shard_commit_protocol",
"2pc",
PGC_USERSET,
PGC_S_SESSION,
GUC_ACTION_LOCAL,
true, 0);
#endif
}
}
/* /*
* ExecuteCommandOnWorkerShards executes a given command on all the finalized * ExecuteCommandOnWorkerShards executes a given command on all the finalized
* shard placements of the given table. If the remote command errors out on the * shard placements of the given table within a distributed transaction. The
* first attempted placement, the function returns false. Otherwise, it returns * value of citus.multi_shard_commit_protocol is set to '2pc' by the caller
* true. * ExecuteDistributedDDLCommand function so that two phase commit protocol is used.
* *
* If the remote query errors out on the first attempted placement, it is very * ExecuteCommandOnWorkerShards opens an individual connection for each of the
* likely that the command is going to fail on other placements too. This is * shard placement. After all connections are opened, a BEGIN command followed by
* because most errors here will be PostgreSQL errors. Hence, the function fails * a proper "SELECT worker_apply_shard_ddl_command(<shardId>, <DDL Command>)" is
* fast to avoid marking a high number of placements as failed. If the command * sent to all open connections in a serial manner.
* succeeds on at least one placement before failing on others, then the list of *
* failed placements is returned in failedPlacementList. * The opened transactions are handled by the CompleteShardPlacementTransactions
* function.
* *
* Note: There are certain errors which would occur on few nodes and not on the * Note: There are certain errors which would occur on few nodes and not on the
* others. For example, adding a column with a type which exists on some nodes * others. For example, adding a column with a type which exists on some nodes
* and not on the others. In that case, this function might still end up returning * and not on the others.
* a large number of placements as failed. *
* Note: The execution will be blocked if a prepared transaction from previous
* executions exist on the workers. In this case, those prepared transactions should
* be removed by either COMMIT PREPARED or ROLLBACK PREPARED.
*/ */
static bool static bool
ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
List **failedPlacementList)
{ {
bool isFirstPlacement = true; List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardCell = NULL; char *tableOwner = TableOwner(relationId);
List *shardList = NIL; HTAB *shardConnectionHash = NULL;
char *relationOwner = TableOwner(relationId); ListCell *shardIntervalCell = NULL;
shardList = LoadShardList(relationId); MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
foreach(shardCell, shardList)
LockShards(shardIntervalList, ShareLock);
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
tableOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList)
{ {
List *shardPlacementList = NIL; ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
ListCell *shardPlacementCell = NULL; uint64 shardId = shardInterval->shardId;
uint64 *shardIdPointer = (uint64 *) lfirst(shardCell); ShardConnections *shardConnections = NULL;
uint64 shardId = (*shardIdPointer); bool shardConnectionsFound = false;
char *escapedCommandString = NULL;
StringInfo applyCommand = makeStringInfo();
shardConnections = GetShardConnections(shardConnectionHash,
shardId,
&shardConnectionsFound);
Assert(shardConnectionsFound);
/* build the shard ddl command */ /* build the shard ddl command */
char *escapedCommandString = quote_literal_cstr(commandString); escapedCommandString = quote_literal_cstr(commandString);
StringInfo applyCommand = makeStringInfo();
appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedCommandString); escapedCommandString);
shardPlacementList = FinalizedShardPlacementList(shardId); ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections);
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
List *queryResultList = ExecuteRemoteQuery(workerName, workerPort,
relationOwner, applyCommand);
if (queryResultList == NIL)
{
/*
* If we failed on the first placement, return false. We return
* here instead of exiting at the end to avoid breaking through
* multiple loops.
*/
if (isFirstPlacement)
{
return false;
}
ereport(WARNING, (errmsg("could not apply command on shard "
UINT64_FORMAT " on node %s:%d", shardId,
workerName, workerPort),
errdetail("Shard placement will be marked as "
"inactive.")));
*failedPlacementList = lappend(*failedPlacementList, placement);
}
else
{
ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT
" on node %s:%d", shardId, workerName,
workerPort)));
}
isFirstPlacement = false;
}
FreeStringInfo(applyCommand); FreeStringInfo(applyCommand);
} }
/* check for cancellation one last time before returning */
CHECK_FOR_INTERRUPTS();
return true; return true;
} }
/*
* ExecuteCommandOnShardPlacements executes the given ddl command on the
* placements of the given shard, using the given shard connections.
*/
static void
ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
ShardConnections *shardConnections)
{
List *connectionList = shardConnections->connectionList;
ListCell *connectionCell = NULL;
Assert(connectionList != NIL);
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PGresult *result = NULL;
/* send the query */
result = PQexec(connection, applyCommand->data);
if (PQresultStatus(result) != PGRES_TUPLES_OK)
{
WarnRemoteError(connection, result);
ereport(ERROR, (errmsg("could not execute DDL command on worker "
"node shards")));
}
else
{
char *workerName = ConnectionGetOptionValue(connection, "host");
char *workerPort = ConnectionGetOptionValue(connection, "port");
ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT
" on node %s:%s", shardId, workerName,
workerPort)));
}
PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
}
}
/* /*
* AllFinalizedPlacementsAccessible returns true if all the finalized shard * AllFinalizedPlacementsAccessible returns true if all the finalized shard
* placements for a given relation are accessible. Otherwise, the function * placements for a given relation are accessible. Otherwise, the function
@ -1529,6 +1591,7 @@ ReplicateGrantStmt(Node *parsetree)
RangeVar *relvar = (RangeVar *) lfirst(objectCell); RangeVar *relvar = (RangeVar *) lfirst(objectCell);
Oid relOid = RangeVarGetRelid(relvar, NoLock, false); Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
const char *grantOption = ""; const char *grantOption = "";
bool isTopLevel = true;
if (!IsDistributedTable(relOid)) if (!IsDistributedTable(relOid))
{ {
@ -1561,7 +1624,7 @@ ReplicateGrantStmt(Node *parsetree)
granteesString.data); granteesString.data);
} }
ExecuteDistributedDDLCommand(relOid, ddlString.data); ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel);
resetStringInfo(&ddlString); resetStringInfo(&ddlString);
} }
} }

View File

@ -26,6 +26,7 @@
#include "commands/event_trigger.h" #include "commands/event_trigger.h"
#include "distributed/citus_clauses.h" #include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h" #include "distributed/citus_ruleutils.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
@ -36,7 +37,7 @@
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_transaction.h" #include "distributed/multi_shard_transaction.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
@ -51,13 +52,12 @@
#include "utils/datum.h" #include "utils/datum.h"
#include "utils/inval.h" #include "utils/inval.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h"
static void LockShardsForModify(List *shardIntervalList); static void LockShardsForModify(List *shardIntervalList);
static bool HasReplication(List *shardIntervalList); static bool HasReplication(List *shardIntervalList);
static int SendQueryToShards(Query *query, List *shardIntervalList); static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId);
static HTAB * OpenConnectionsToAllShardPlacements(List *shardIntervalList);
static void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash);
static int SendQueryToPlacements(char *shardQueryString, static int SendQueryToPlacements(char *shardQueryString,
ShardConnections *shardConnections); ShardConnections *shardConnections);
@ -137,7 +137,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS)
LockShardsForModify(prunedShardIntervalList); LockShardsForModify(prunedShardIntervalList);
affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList); affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList,
relationId);
PG_RETURN_INT32(affectedTupleCount); PG_RETURN_INT32(affectedTupleCount);
} }
@ -162,7 +163,7 @@ LockShardsForModify(List *shardIntervalList)
{ {
lockMode = ShareLock; lockMode = ShareLock;
} }
else if (!HasReplication(shardIntervalList)) /* check if any shards have >1 replica */ else if (!HasReplication(shardIntervalList))
{ {
lockMode = ShareLock; lockMode = ShareLock;
} }
@ -209,153 +210,55 @@ HasReplication(List *shardIntervalList)
* the shards when necessary before calling SendQueryToShards. * the shards when necessary before calling SendQueryToShards.
*/ */
static int static int
SendQueryToShards(Query *query, List *shardIntervalList) SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId)
{ {
int affectedTupleCount = 0; int affectedTupleCount = 0;
HTAB *shardConnectionHash = OpenConnectionsToAllShardPlacements(shardIntervalList); char *relationOwner = TableOwner(relationId);
List *allShardsConnectionList = ConnectionList(shardConnectionHash); HTAB *shardConnectionHash = NULL;
ListCell *shardIntervalCell = NULL;
PG_TRY(); MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList,
relationOwner);
MemoryContextSwitchTo(oldContext);
foreach(shardIntervalCell, shardIntervalList)
{ {
ListCell *shardIntervalCell = NULL; ShardInterval *shardInterval = (ShardInterval *) lfirst(
shardIntervalCell);
Oid relationId = shardInterval->relationId;
uint64 shardId = shardInterval->shardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
StringInfo shardQueryString = makeStringInfo();
char *shardQueryStringData = NULL;
int shardAffectedTupleCount = -1;
foreach(shardIntervalCell, shardIntervalList) shardConnections = GetShardConnections(shardConnectionHash,
{ shardId,
ShardInterval *shardInterval = (ShardInterval *) lfirst( &shardConnectionsFound);
shardIntervalCell); Assert(shardConnectionsFound);
Oid relationId = shardInterval->relationId;
uint64 shardId = shardInterval->shardId;
bool shardConnectionsFound = false;
ShardConnections *shardConnections = NULL;
StringInfo shardQueryString = makeStringInfo();
char *shardQueryStringData = NULL;
int shardAffectedTupleCount = -1;
shardConnections = GetShardConnections(shardConnectionHash, deparse_shard_query(query, relationId, shardId, shardQueryString);
shardId, shardQueryStringData = shardQueryString->data;
&shardConnectionsFound); shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
Assert(shardConnectionsFound); shardConnections);
affectedTupleCount += shardAffectedTupleCount;
deparse_shard_query(query, relationId, shardId, shardQueryString);
shardQueryStringData = shardQueryString->data;
shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData,
shardConnections);
affectedTupleCount += shardAffectedTupleCount;
}
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
PrepareRemoteTransactions(allShardsConnectionList);
}
/* check for cancellation one last time before returning */
CHECK_FOR_INTERRUPTS();
} }
PG_CATCH();
{
/* roll back all transactions */
AbortRemoteTransactions(allShardsConnectionList);
CloseConnections(allShardsConnectionList);
PG_RE_THROW(); /* check for cancellation one last time before returning */
} CHECK_FOR_INTERRUPTS();
PG_END_TRY();
CommitRemoteTransactions(allShardsConnectionList, false);
CloseConnections(allShardsConnectionList);
return affectedTupleCount; return affectedTupleCount;
} }
/*
* OpenConnectionsToAllShardPlacement opens connections to all placements of
* the given shard list and returns the hash table containing the connections.
* The resulting hash table maps shardId to ShardConnection struct.
*/
static HTAB *
OpenConnectionsToAllShardPlacements(List *shardIntervalList)
{
HTAB *shardConnectionHash = CreateShardConnectionHash();
ListCell *shardIntervalCell = NULL;
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
OpenConnectionsToShardPlacements(shardId, shardConnectionHash);
}
return shardConnectionHash;
}
/*
* OpenConnectionsToShardPlacements opens connections to all placements of the
* shard with the given shardId and populates the shardConnectionHash table
* accordingly.
*/
static void
OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash)
{
bool shardConnectionsFound = false;
/* get existing connections to the shard placements, if any */
ShardConnections *shardConnections = GetShardConnections(shardConnectionHash,
shardId,
&shardConnectionsFound);
List *shardPlacementList = FinalizedShardPlacementList(shardId);
ListCell *shardPlacementCell = NULL;
List *connectionList = NIL;
Assert(!shardConnectionsFound);
if (shardPlacementList == NIL)
{
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(
shardPlacementCell);
char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort;
char *nodeUser = CurrentUserName();
PGconn *connection = ConnectToNode(workerName, workerPort, nodeUser);
TransactionConnection *transactionConnection = NULL;
if (connection == NULL)
{
List *abortConnectionList = ConnectionList(shardConnectionHash);
CloseConnections(abortConnectionList);
ereport(ERROR, (errmsg("could not establish a connection to all "
"placements")));
}
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
transactionConnection->connection = connection;
connectionList = lappend(connectionList, transactionConnection);
}
shardConnections->connectionList = connectionList;
}
/* /*
* SendQueryToPlacements sends the given query string to all given placement * SendQueryToPlacements sends the given query string to all given placement
* connections of a shard. The query is sent with a BEGIN before the the actual * connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions
* query so, CommitRemoteTransactions or AbortRemoteTransactions should be * should be called after all queries have been sent successfully.
* called after all queries have been sent successfully.
*/ */
static int static int
SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections)
@ -379,13 +282,6 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/* send the query */ /* send the query */
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
WarnRemoteError(connection, result);
ereport(ERROR, (errmsg("could not send query to shard placement")));
}
result = PQexec(connection, shardQueryString); result = PQexec(connection, shardQueryString);
if (PQresultStatus(result) != PGRES_COMMAND_OK) if (PQresultStatus(result) != PGRES_COMMAND_OK)
{ {

View File

@ -18,6 +18,7 @@
#include "commands/explain.h" #include "commands/explain.h"
#include "executor/executor.h" #include "executor/executor.h"
#include "distributed/commit_protocol.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_copy.h" #include "distributed/multi_copy.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
@ -28,7 +29,6 @@
#include "distributed/multi_router_executor.h" #include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h" #include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h" #include "distributed/multi_server_executor.h"
#include "distributed/multi_transaction.h"
#include "distributed/multi_utility.h" #include "distributed/multi_utility.h"
#include "distributed/task_tracker.h" #include "distributed/task_tracker.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* multi_transaction.c * commit_protocol.c
* This file contains functions for managing 1PC or 2PC transactions * This file contains functions for managing 1PC or 2PC transactions
* across many shard placements. * across many shard placements.
* *
@ -9,20 +9,19 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "access/xact.h" #include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h" #include "distributed/connection_cache.h"
#include "distributed/multi_transaction.h" #include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#define INITIAL_CONNECTION_CACHE_SIZE 1001
/* Local functions forward declarations */ /* Local functions forward declarations */
static uint32 DistributedTransactionId = 0; static uint32 DistributedTransactionId = 0;
@ -46,6 +45,66 @@ InitializeDistributedTransaction(void)
} }
/*
* CompleteShardPlacementTransactions commits or aborts pending shard placement
* transactions when the local transaction commits or aborts.
*/
void
CompleteShardPlacementTransactions(XactEvent event, void *arg)
{
if (shardPlacementConnectionList == NIL)
{
/* nothing to do */
return;
}
else if (event == XACT_EVENT_PRE_COMMIT)
{
/*
* Any failure here will cause local changes to be rolled back,
* and remote changes to either roll back (1PC) or, in case of
* connection or node failure, leave a prepared transaction
* (2PC).
*/
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC)
{
PrepareRemoteTransactions(shardPlacementConnectionList);
}
return;
}
else if (event == XACT_EVENT_COMMIT)
{
/*
* A failure here will cause some remote changes to either
* roll back (1PC) or, in case of connection or node failure,
* leave a prepared transaction (2PC). However, the local
* changes have already been committed.
*/
CommitRemoteTransactions(shardPlacementConnectionList, false);
}
else if (event == XACT_EVENT_ABORT)
{
/*
* A failure here will cause some remote changes to either
* roll back (1PC) or, in case of connection or node failure,
* leave a prepared transaction (2PC). The local changes have
* already been rolled back.
*/
AbortRemoteTransactions(shardPlacementConnectionList);
}
else
{
return;
}
CloseConnections(shardPlacementConnectionList);
shardPlacementConnectionList = NIL;
}
/* /*
* PrepareRemoteTransactions prepares all transactions on connections in * PrepareRemoteTransactions prepares all transactions on connections in
* connectionList for commit if the 2PC commit protocol is enabled. * connectionList for commit if the 2PC commit protocol is enabled.
@ -82,6 +141,9 @@ PrepareRemoteTransactions(List *connectionList)
errmsg("failed to prepare transaction"))); errmsg("failed to prepare transaction")));
} }
ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld",
connectionId)));
PQclear(result); PQclear(result);
transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; transactionConnection->transactionState = TRANSACTION_STATE_PREPARED;
@ -126,6 +188,8 @@ AbortRemoteTransactions(List *connectionList)
command->data, nodeName, nodePort))); command->data, nodeName, nodePort)));
} }
ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId)));
PQclear(result); PQclear(result);
} }
else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN)
@ -197,6 +261,9 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
command->data, nodeName, nodePort))); command->data, nodeName, nodePort)));
} }
} }
ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld",
connectionId)));
} }
else else
{ {
@ -224,6 +291,8 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure)
nodeName, nodePort))); nodeName, nodePort)));
} }
} }
ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId)));
} }
PQclear(result); PQclear(result);
@ -253,97 +322,3 @@ BuildTransactionName(int connectionId)
return commandString; return commandString;
} }
/*
* CloseConnections closes all connections in connectionList.
*/
void
CloseConnections(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PQfinish(connection);
}
}
/*
* CreateShardConnectionHash constructs a hash table used for shardId->Connection
* mapping.
*/
HTAB *
CreateShardConnectionHash(void)
{
HTAB *shardConnectionsHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections);
info.hash = tag_hash;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_CONNECTION_CACHE_SIZE, &info,
hashFlags);
return shardConnectionsHash;
}
/*
* GetShardConnections finds existing connections for a shard in the hash.
* If not found, then a ShardConnections structure with empty connectionList
* is returned.
*/
ShardConnections *
GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
bool *shardConnectionsFound)
{
ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
&shardId,
HASH_ENTER,
shardConnectionsFound);
if (!*shardConnectionsFound)
{
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
}
return shardConnections;
}
/*
* ConnectionList flattens the connection hash to a list of placement connections.
*/
List *
ConnectionList(HTAB *connectionHash)
{
List *connectionList = NIL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
hash_seq_init(&status, connectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
List *shardConnectionsList = list_copy(shardConnections->connectionList);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
return connectionList;
}

View File

@ -0,0 +1,243 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.c
* This file contains functions for managing 1PC or 2PC transactions
* across many shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "libpq-fe.h"
#include "postgres.h"
#include "distributed/commit_protocol.h"
#include "distributed/connection_cache.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/multi_shard_transaction.h"
#include "nodes/pg_list.h"
#define INITIAL_CONNECTION_CACHE_SIZE 1001
List *shardPlacementConnectionList = NIL;
static void RegisterShardPlacementXactCallback(void);
static bool isXactCallbackRegistered = false;
/*
* OpenTransactionsToAllShardPlacements opens connections to all placements of
* the given shard Id Pointer List and returns the hash table containing the connections.
* The resulting hash table maps shardIds to ShardConnection structs.
*/
HTAB *
OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName)
{
HTAB *shardConnectionHash = CreateShardConnectionHash();
ListCell *shardIntervalCell = NULL;
ListCell *connectionCell = NULL;
List *connectionList = NIL;
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName);
}
connectionList = ConnectionList(shardConnectionHash);
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PGresult *result = NULL;
result = PQexec(connection, "BEGIN");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(connection, result);
}
}
shardPlacementConnectionList = ConnectionList(shardConnectionHash);
RegisterShardPlacementXactCallback();
return shardConnectionHash;
}
/*
* CreateShardConnectionHash constructs a hash table used for shardId->Connection
* mapping.
*/
HTAB *
CreateShardConnectionHash(void)
{
HTAB *shardConnectionsHash = NULL;
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(int64);
info.entrysize = sizeof(ShardConnections);
info.hash = tag_hash;
hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT;
shardConnectionsHash = hash_create("Shard Connections Hash",
INITIAL_CONNECTION_CACHE_SIZE, &info,
hashFlags);
return shardConnectionsHash;
}
/*
* OpenConnectionsToShardPlacements opens connections to all placements of the
* shard with the given shardId and populates the shardConnectionHash table
* accordingly.
*/
void
OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash,
char *userName)
{
bool shardConnectionsFound = false;
/* get existing connections to the shard placements, if any */
ShardConnections *shardConnections = GetShardConnections(shardConnectionHash,
shardId,
&shardConnectionsFound);
List *shardPlacementList = FinalizedShardPlacementList(shardId);
ListCell *shardPlacementCell = NULL;
List *connectionList = NIL;
Assert(!shardConnectionsFound);
if (shardPlacementList == NIL)
{
ereport(ERROR, (errmsg("could not find any shard placements for the shard "
UINT64_FORMAT, shardId)));
}
foreach(shardPlacementCell, shardPlacementList)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(
shardPlacementCell);
char *workerName = shardPlacement->nodeName;
uint32 workerPort = shardPlacement->nodePort;
PGconn *connection = ConnectToNode(workerName, workerPort, userName);
TransactionConnection *transactionConnection = NULL;
if (connection == NULL)
{
List *abortConnectionList = ConnectionList(shardConnectionHash);
CloseConnections(abortConnectionList);
ereport(ERROR, (errmsg("could not establish a connection to all "
"placements of shard %lu", shardId)));
}
transactionConnection = palloc0(sizeof(TransactionConnection));
transactionConnection->connectionId = shardConnections->shardId;
transactionConnection->transactionState = TRANSACTION_STATE_INVALID;
transactionConnection->connection = connection;
connectionList = lappend(connectionList, transactionConnection);
}
shardConnections->connectionList = connectionList;
}
/*
* GetShardConnections finds existing connections for a shard in the hash.
* If not found, then a ShardConnections structure with empty connectionList
* is returned.
*/
ShardConnections *
GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
bool *shardConnectionsFound)
{
ShardConnections *shardConnections = NULL;
shardConnections = (ShardConnections *) hash_search(shardConnectionHash,
&shardId,
HASH_ENTER,
shardConnectionsFound);
if (!*shardConnectionsFound)
{
shardConnections->shardId = shardId;
shardConnections->connectionList = NIL;
}
return shardConnections;
}
/*
* ConnectionList flattens the connection hash to a list of placement connections.
*/
List *
ConnectionList(HTAB *connectionHash)
{
List *connectionList = NIL;
HASH_SEQ_STATUS status;
ShardConnections *shardConnections = NULL;
hash_seq_init(&status, connectionHash);
shardConnections = (ShardConnections *) hash_seq_search(&status);
while (shardConnections != NULL)
{
List *shardConnectionsList = list_copy(shardConnections->connectionList);
connectionList = list_concat(connectionList, shardConnectionsList);
shardConnections = (ShardConnections *) hash_seq_search(&status);
}
return connectionList;
}
/*
* EnableXactCallback ensures the XactCallback for committing/aborting
* remote worker transactions is registered.
*/
void
RegisterShardPlacementXactCallback(void)
{
if (!isXactCallbackRegistered)
{
RegisterXactCallback(CompleteShardPlacementTransactions, NULL);
isXactCallbackRegistered = true;
}
}
/*
* CloseConnections closes all connections in connectionList.
*/
void
CloseConnections(List *connectionList)
{
ListCell *connectionCell = NULL;
foreach(connectionCell, connectionList)
{
TransactionConnection *transactionConnection =
(TransactionConnection *) lfirst(connectionCell);
PGconn *connection = transactionConnection->connection;
PQfinish(connection);
}
}

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* multi_transaction.h * commit_protocol.h
* Type and function declarations used in performing transactions across * Type and function declarations used in performing transactions across
* shard placements. * shard placements.
* *
@ -9,10 +9,11 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#ifndef MULTI_TRANSACTION_H #ifndef COMMIT_PROTOCOL_H
#define MULTI_TRANSACTION_H #define COMMIT_PROTOCOL_H
#include "access/xact.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -47,29 +48,15 @@ typedef struct TransactionConnection
} TransactionConnection; } TransactionConnection;
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
List *connectionList;
} ShardConnections;
/* config variable managed via guc.c */ /* config variable managed via guc.c */
extern int MultiShardCommitProtocol; extern int MultiShardCommitProtocol;
/* Functions declarations for transaction and connection management */ /* Functions declarations for transaction and connection management */
extern void InitializeDistributedTransaction(void); extern void InitializeDistributedTransaction(void);
extern void CompleteShardPlacementTransactions(XactEvent event, void *arg);
extern void PrepareRemoteTransactions(List *connectionList); extern void PrepareRemoteTransactions(List *connectionList);
extern void AbortRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList);
extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure);
extern void CloseConnections(List *connectionList);
extern HTAB * CreateShardConnectionHash(void);
extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
int64 shardId,
bool *shardConnectionsFound);
extern List * ConnectionList(HTAB *connectionHash);
#endif /* COMMIT_PROTOCOL_H */
#endif /* MULTI_TRANSACTION_H */

View File

@ -15,11 +15,6 @@
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
/* config variable managed via guc.c */
extern int MultiShardCommitProtocol;
/* /*
* A smaller version of copy.c's CopyStateData, trimmed to the elements * A smaller version of copy.c's CopyStateData, trimmed to the elements
* necessary to copy out results. While it'd be a bit nicer to share code, * necessary to copy out results. While it'd be a bit nicer to share code,

View File

@ -0,0 +1,43 @@
/*-------------------------------------------------------------------------
*
* multi_shard_transaction.h
* Type and function declarations used in performing transactions across
* shard placements.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef MULTI_SHARD_TRANSACTION_H
#define MULTI_SHARD_TRANSACTION_H
#include "access/xact.h"
#include "utils/hsearch.h"
#include "nodes/pg_list.h"
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
{
int64 shardId;
List *connectionList;
} ShardConnections;
extern List *shardPlacementConnectionList;
extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList,
char *relationOwner);
extern HTAB * CreateShardConnectionHash(void);
extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash,
char *nodeUser);
extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
int64 shardId,
bool *shardConnectionsFound);
extern List * ConnectionList(HTAB *connectionHash);
extern void CloseConnections(List *connectionList);
#endif /* MULTI_SHARD_TRANSACTION_H */

View File

@ -42,9 +42,14 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1);
(1 row) (1 row)
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
DEBUG: switching to 2PC for the transaction
DEBUG: applied command on shard 650000 on node localhost:57637 DEBUG: applied command on shard 650000 on node localhost:57637
DEBUG: applied command on shard 650001 on node localhost:57638 DEBUG: applied command on shard 650001 on node localhost:57638
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
DEBUG: sent PREPARE TRANSACTION over connection 650000
DEBUG: sent PREPARE TRANSACTION over connection 650001
DEBUG: sent COMMIT PREPARED over connection 650000
DEBUG: sent COMMIT PREPARED over connection 650001
CREATE TABLE orders_hash ( CREATE TABLE orders_hash (
o_orderkey bigint not null, o_orderkey bigint not null,
o_custkey integer not null, o_custkey integer not null,

View File

@ -103,6 +103,10 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE
DEBUG: predicate pruning for shardId 350001 DEBUG: predicate pruning for shardId 350001
DEBUG: predicate pruning for shardId 350002 DEBUG: predicate pruning for shardId 350002
DEBUG: predicate pruning for shardId 350003 DEBUG: predicate pruning for shardId 350003
DEBUG: sent PREPARE TRANSACTION over connection 350000
DEBUG: sent PREPARE TRANSACTION over connection 350000
DEBUG: sent COMMIT PREPARED over connection 350000
DEBUG: sent COMMIT PREPARED over connection 350000
master_modify_multiple_shards master_modify_multiple_shards
------------------------------- -------------------------------
1 1

View File

@ -156,6 +156,42 @@ ALTER TABLE IF EXISTS lineitem_alter RENAME l_orderkey TO l_orderkey_renamed;
-- node -- node
\d lineitem_alter \d lineitem_alter
-- verify that non-propagated ddl commands are allowed inside a transaction block
SET citus.enable_ddl_propagation to false;
BEGIN;
CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber);
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_1;
-- verify that distributed ddl commands are not allowed inside a transaction block
SET citus.enable_ddl_propagation to true;
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
DROP INDEX temp_index_2;
-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc'
SET citus.multi_shard_commit_protocol TO '1pc';
SET client_min_messages TO DEBUG2;
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
RESET client_min_messages;
DROP INDEX temp_index_3;
-- verify that citus.multi_shard_commit_protocol value is not changed
SHOW citus.multi_shard_commit_protocol;
-- verify that not any of shard placements are marked as failed when a query failure occurs
CREATE TABLE test_ab (a int, b int);
SELECT master_create_distributed_table('test_ab', 'a', 'hash');
SELECT master_create_worker_shards('test_ab', 8, 2);
INSERT INTO test_ab VALUES (2, 10);
INSERT INTO test_ab VALUES (2, 11);
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
WHERE logicalrelid='test_ab'::regclass AND shardstate=3;
-- Check that the schema on the worker still looks reasonable -- Check that the schema on the worker still looks reasonable
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT attname, atttypid::regtype SELECT attname, atttypid::regtype

View File

@ -412,6 +412,127 @@ ERROR: renaming distributed tables or their objects is currently unsupported
l_comment | character varying(44) | not null l_comment | character varying(44) | not null
null_column | integer | null_column | integer |
-- verify that non-propagated ddl commands are allowed inside a transaction block
SET citus.enable_ddl_propagation to false;
BEGIN;
CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber);
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
--------------+----------------
temp_index_1 | lineitem_alter
(1 row)
DROP INDEX temp_index_1;
-- verify that distributed ddl commands are not allowed inside a transaction block
SET citus.enable_ddl_propagation to true;
BEGIN;
CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey);
ERROR: distributed DDL commands cannot run inside a transaction block
COMMIT;
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
indexname | tablename
-----------+-----------
(0 rows)
DROP INDEX temp_index_2;
ERROR: index "temp_index_2" does not exist
-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc'
SET citus.multi_shard_commit_protocol TO '1pc';
SET client_min_messages TO DEBUG2;
CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey);
DEBUG: switching to 2PC for the transaction
DEBUG: applied command on shard 220000 on node localhost:57638
DEBUG: applied command on shard 220000 on node localhost:57637
DEBUG: applied command on shard 220003 on node localhost:57637
DEBUG: applied command on shard 220003 on node localhost:57638
DEBUG: applied command on shard 220007 on node localhost:57637
DEBUG: applied command on shard 220007 on node localhost:57638
DEBUG: applied command on shard 220004 on node localhost:57638
DEBUG: applied command on shard 220004 on node localhost:57637
DEBUG: applied command on shard 220008 on node localhost:57638
DEBUG: applied command on shard 220008 on node localhost:57637
DEBUG: applied command on shard 220001 on node localhost:57637
DEBUG: applied command on shard 220001 on node localhost:57638
DEBUG: applied command on shard 220002 on node localhost:57638
DEBUG: applied command on shard 220002 on node localhost:57637
DEBUG: applied command on shard 220005 on node localhost:57637
DEBUG: applied command on shard 220005 on node localhost:57638
DEBUG: applied command on shard 220009 on node localhost:57637
DEBUG: applied command on shard 220009 on node localhost:57638
DEBUG: building index "temp_index_3" on table "lineitem_alter"
DEBUG: sent PREPARE TRANSACTION over connection 220007
DEBUG: sent PREPARE TRANSACTION over connection 220007
DEBUG: sent PREPARE TRANSACTION over connection 220002
DEBUG: sent PREPARE TRANSACTION over connection 220002
DEBUG: sent PREPARE TRANSACTION over connection 220004
DEBUG: sent PREPARE TRANSACTION over connection 220004
DEBUG: sent PREPARE TRANSACTION over connection 220000
DEBUG: sent PREPARE TRANSACTION over connection 220000
DEBUG: sent PREPARE TRANSACTION over connection 220008
DEBUG: sent PREPARE TRANSACTION over connection 220008
DEBUG: sent PREPARE TRANSACTION over connection 220009
DEBUG: sent PREPARE TRANSACTION over connection 220009
DEBUG: sent PREPARE TRANSACTION over connection 220001
DEBUG: sent PREPARE TRANSACTION over connection 220001
DEBUG: sent PREPARE TRANSACTION over connection 220005
DEBUG: sent PREPARE TRANSACTION over connection 220005
DEBUG: sent PREPARE TRANSACTION over connection 220003
DEBUG: sent PREPARE TRANSACTION over connection 220003
DEBUG: sent COMMIT PREPARED over connection 220007
DEBUG: sent COMMIT PREPARED over connection 220007
DEBUG: sent COMMIT PREPARED over connection 220002
DEBUG: sent COMMIT PREPARED over connection 220002
DEBUG: sent COMMIT PREPARED over connection 220004
DEBUG: sent COMMIT PREPARED over connection 220004
DEBUG: sent COMMIT PREPARED over connection 220000
DEBUG: sent COMMIT PREPARED over connection 220000
DEBUG: sent COMMIT PREPARED over connection 220008
DEBUG: sent COMMIT PREPARED over connection 220008
DEBUG: sent COMMIT PREPARED over connection 220009
DEBUG: sent COMMIT PREPARED over connection 220009
DEBUG: sent COMMIT PREPARED over connection 220001
DEBUG: sent COMMIT PREPARED over connection 220001
DEBUG: sent COMMIT PREPARED over connection 220005
DEBUG: sent COMMIT PREPARED over connection 220005
DEBUG: sent COMMIT PREPARED over connection 220003
DEBUG: sent COMMIT PREPARED over connection 220003
RESET client_min_messages;
DROP INDEX temp_index_3;
-- verify that citus.multi_shard_commit_protocol value is not changed
SHOW citus.multi_shard_commit_protocol;
citus.multi_shard_commit_protocol
-----------------------------------
1pc
(1 row)
-- verify that not any of shard placements are marked as failed when a query failure occurs
CREATE TABLE test_ab (a int, b int);
SELECT master_create_distributed_table('test_ab', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('test_ab', 8, 2);
master_create_worker_shards
-----------------------------
(1 row)
INSERT INTO test_ab VALUES (2, 10);
INSERT INTO test_ab VALUES (2, 11);
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
WARNING: could not create unique index "temp_unique_index_1_220016"
DETAIL: Key (a)=(2) is duplicated.
CONTEXT: while executing command on localhost:57638
ERROR: could not execute DDL command on worker node shards
SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
WHERE logicalrelid='test_ab'::regclass AND shardstate=3;
shardid
---------
(0 rows)
-- Check that the schema on the worker still looks reasonable -- Check that the schema on the worker still looks reasonable
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT attname, atttypid::regtype SELECT attname, atttypid::regtype