From 3eaff481149b3d5979d73bc1a7c578d07fee0841 Mon Sep 17 00:00:00 2001 From: Eren Date: Thu, 16 Jun 2016 12:01:05 +0300 Subject: [PATCH] Propagate DDL Commands with 2PC Fixes #513 This change modifies the DDL Propagation logic so that DDL queries are propagated via 2-Phase Commit protocol. This way, failures during the execution of distributed DDL commands will not leave the table in an intermediate state and the pending prepared transactions can be commited manually. DDL commands are not allowed inside other transaction blocks or functions. DDL commands are performed with 2PC regardless of the value of `citus.multi_shard_commit_protocol` parameter. The workflow of the successful case is this: 1. Open individual connections to all shard placements and send `BEGIN` 2. Send `SELECT worker_apply_shard_ddl_command(, )` to all connections, one by one, in a serial manner. 3. Send `PREPARE TRANSCATION ` to all connections. 4. Sedn `COMMIT` to all connections. Failure cases: - If a worker problem occurs before sending of all DDL commands is finished, then all changes are rolled back. - If a worker problem occurs after all DDL commands are sent but not after `PREPARE TRANSACTION` commands are finished, then all changes are rolled back. However, if a worker node is failed, then the prepared transactions in that worker should be rolled back manually. - If a worker problem occurs during `COMMIT PREPARED` statements are being sent, then the prepared transactions on the failed workers should be commited manually. - If master fails before the first 'PREPARE TRANSACTION' is sent, then nothing is changed on workers. - If master fails during `PREPARE TRANSACTION` commands are being sent, then the prepared transactions on workers should be rolled back manually. - If master fails during `COMMIT PREPARED` or `ROLLBACK PREPARED` commands are being sent, then the remaining prepared transactions on the workers should be handled manually. This change also helps with #480, since failed DDL changes no longer mark failed placements as inactive. --- src/backend/distributed/Makefile | 2 +- src/backend/distributed/commands/multi_copy.c | 3 +- .../distributed/executor/multi_utility.c | 273 +++++++++++------- .../master/master_modify_multiple_shards.c | 186 +++--------- src/backend/distributed/shared_library_init.c | 2 +- .../commit_protocol.c} | 175 +++++------ .../transaction/multi_shard_transaction.c | 243 ++++++++++++++++ ...{multi_transaction.h => commit_protocol.h} | 25 +- src/include/distributed/multi_copy.h | 5 - .../distributed/multi_shard_transaction.h | 43 +++ .../expected/multi_join_order_additional.out | 5 + .../regress/expected/multi_shard_modify.out | 4 + .../input/multi_alter_table_statements.source | 36 +++ .../multi_alter_table_statements.source | 121 ++++++++ 14 files changed, 746 insertions(+), 377 deletions(-) rename src/backend/distributed/{utils/multi_transaction.c => transaction/commit_protocol.c} (76%) create mode 100644 src/backend/distributed/transaction/multi_shard_transaction.c rename src/include/distributed/{multi_transaction.h => commit_protocol.h} (71%) create mode 100644 src/include/distributed/multi_shard_transaction.h diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index f946adadd..6ee2cc9a7 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -15,7 +15,7 @@ DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) SCRIPTS = ../../bin/scripts/copy_to_distributed_table # directories with source files -SUBDIRS = . commands executor master planner relay test utils worker +SUBDIRS = . commands executor master planner relay test transaction utils worker # That patsubst rule searches all directories listed in SUBDIRS for .c # files, and adds the corresponding .o files to OBJS diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4fe427247..adfe5791d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -71,6 +71,7 @@ #include "commands/copy.h" #include "commands/defrem.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -79,7 +80,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_transaction.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ab0e1c2c2..40b81daee 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -18,11 +18,15 @@ #include "commands/defrem.h" #include "commands/tablecmds.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_utility.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_shard_transaction.h" +#include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -36,12 +40,14 @@ #include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ + /* * This struct defines the state for the callback for drop statements. * It is copied as it is from commands/tablecmds.c in Postgres source. @@ -62,11 +68,11 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand); + const char *createIndexCommand, bool isTopLevel); static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand); + const char *dropIndexCommand, bool isTopLevel); static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand); + const char *alterTableCommand, bool isTopLevel); /* Local functions forward declarations for unsupported command checks */ static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); @@ -77,9 +83,12 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString); -static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, - List **failedPlacementList); +static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, + bool isTopLevel); +static void SetLocalCommitProtocolTo2PC(void); +static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString); +static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, + ShardConnections *shardConnections); static bool AllFinalizedPlacementsAccessible(Oid relationId); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); @@ -150,9 +159,12 @@ multi_ProcessUtility(Node *parsetree, /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ if (EnableDDLPropagation) { + bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); + if (IsA(parsetree, IndexStmt)) { - parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString); + parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, + isTopLevel); } if (IsA(parsetree, DropStmt)) @@ -160,7 +172,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = ProcessDropIndexStmt(dropStatement, queryString); + parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); } } @@ -169,7 +181,8 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = ProcessAlterTableStmt(alterTableStmt, queryString); + parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, + isTopLevel); } } @@ -457,7 +470,8 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * master node table. */ static Node * -ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) +ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, + bool isTopLevel) { /* * We first check whether a distributed relation is affected. For that, we need to @@ -504,7 +518,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand ErrorIfUnsupportedIndexStmt(createIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, createIndexCommand); + ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); } } @@ -521,7 +535,8 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand * master node table. */ static Node * -ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) +ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, + bool isTopLevel) { ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; @@ -590,7 +605,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand); + ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); } return (Node *) dropIndexStatement; @@ -606,7 +621,8 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) * master node table. */ static Node * -ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) +ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, + bool isTopLevel) { /* first check whether a distributed relation is affected */ if (alterTableStatement->relation != NULL) @@ -621,7 +637,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl ErrorIfUnsupportedAlterTableStmt(alterTableStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, alterTableCommand); + ExecuteDistributedDDLCommand(relationId, alterTableCommand, isTopLevel); } } } @@ -985,18 +1001,23 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* * ExecuteDistributedDDLCommand applies a given DDL command to the given - * distributed table. If the function is unable to access all the finalized - * shard placements, then it fails early and errors out. If the command - * successfully executed on any finalized shard placement, and failed on - * others, then it marks the placements on which execution failed as invalid. + * distributed table in a distributed transaction. For the transaction, the value of + * citus.multi_shard_commit_protocol is set to '2pc' so that two phase commit mechanism + * is used, regardless of the actual value of citus.multi_shard_commit_protocol. In + * the commit protocol, a BEGIN is sent after connection to each shard placement + * and COMMIT/ROLLBACK is handled by CompleteShardPlacementTransactions function. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) +ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, + bool isTopLevel) { - List *failedPlacementList = NIL; bool executionOK = false; + bool allPlacementsAccessible = false; - bool allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); + PreventTransactionChain(true, "distributed DDL commands"); + SetLocalCommitProtocolTo2PC(); + + allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); if (!allPlacementsAccessible) { ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString), @@ -1007,31 +1028,13 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) /* make sure we don't process cancel signals */ HOLD_INTERRUPTS(); - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString, - &failedPlacementList); + executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); /* if command could not be executed on any finalized shard placement, error out */ if (!executionOK) { ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); } - else - { - /* else, mark failed placements as inactive */ - ListCell *failedPlacementCell = NULL; - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 shardId = placement->shardId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - uint64 oldShardLength = placement->shardLength; - - DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength, - workerName, workerPort); - } - } if (QueryCancelPending) { @@ -1043,93 +1046,152 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) } +/* + * SwitchTo2PCForTransaction sets the citus.multi_shard_commit_protocol value + * to '2pc' for the current transaction, in order to force 2PC even the value + * of citus.multi_shard_commit_protocol is '1pc'; + */ +static void +SetLocalCommitProtocolTo2PC(void) +{ + if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC) + { + ereport(DEBUG2, (errmsg("switching to 2PC for the transaction"))); + +#if (PG_VERSION_NUM >= 90500) + set_config_option("citus.multi_shard_commit_protocol", + "2pc", + PGC_USERSET, + PGC_S_SESSION, + GUC_ACTION_LOCAL, + true, 0, false); +#else + set_config_option("citus.multi_shard_commit_protocol", + "2pc", + PGC_USERSET, + PGC_S_SESSION, + GUC_ACTION_LOCAL, + true, 0); +#endif + } +} + + /* * ExecuteCommandOnWorkerShards executes a given command on all the finalized - * shard placements of the given table. If the remote command errors out on the - * first attempted placement, the function returns false. Otherwise, it returns - * true. + * shard placements of the given table within a distributed transaction. The + * value of citus.multi_shard_commit_protocol is set to '2pc' by the caller + * ExecuteDistributedDDLCommand function so that two phase commit protocol is used. * - * If the remote query errors out on the first attempted placement, it is very - * likely that the command is going to fail on other placements too. This is - * because most errors here will be PostgreSQL errors. Hence, the function fails - * fast to avoid marking a high number of placements as failed. If the command - * succeeds on at least one placement before failing on others, then the list of - * failed placements is returned in failedPlacementList. + * ExecuteCommandOnWorkerShards opens an individual connection for each of the + * shard placement. After all connections are opened, a BEGIN command followed by + * a proper "SELECT worker_apply_shard_ddl_command(, )" is + * sent to all open connections in a serial manner. + * + * The opened transactions are handled by the CompleteShardPlacementTransactions + * function. * * Note: There are certain errors which would occur on few nodes and not on the * others. For example, adding a column with a type which exists on some nodes - * and not on the others. In that case, this function might still end up returning - * a large number of placements as failed. + * and not on the others. + * + * Note: The execution will be blocked if a prepared transaction from previous + * executions exist on the workers. In this case, those prepared transactions should + * be removed by either COMMIT PREPARED or ROLLBACK PREPARED. */ static bool -ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, - List **failedPlacementList) +ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) { - bool isFirstPlacement = true; - ListCell *shardCell = NULL; - List *shardList = NIL; - char *relationOwner = TableOwner(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + char *tableOwner = TableOwner(relationId); + HTAB *shardConnectionHash = NULL; + ListCell *shardIntervalCell = NULL; - shardList = LoadShardList(relationId); - foreach(shardCell, shardList) + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + LockShards(shardIntervalList, ShareLock); + + shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, + tableOwner); + + MemoryContextSwitchTo(oldContext); + + foreach(shardIntervalCell, shardIntervalList) { - List *shardPlacementList = NIL; - ListCell *shardPlacementCell = NULL; - uint64 *shardIdPointer = (uint64 *) lfirst(shardCell); - uint64 shardId = (*shardIdPointer); + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + ShardConnections *shardConnections = NULL; + bool shardConnectionsFound = false; + char *escapedCommandString = NULL; + StringInfo applyCommand = makeStringInfo(); + + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + Assert(shardConnectionsFound); /* build the shard ddl command */ - char *escapedCommandString = quote_literal_cstr(commandString); - StringInfo applyCommand = makeStringInfo(); + escapedCommandString = quote_literal_cstr(commandString); appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedCommandString); - shardPlacementList = FinalizedShardPlacementList(shardId); - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - List *queryResultList = ExecuteRemoteQuery(workerName, workerPort, - relationOwner, applyCommand); - if (queryResultList == NIL) - { - /* - * If we failed on the first placement, return false. We return - * here instead of exiting at the end to avoid breaking through - * multiple loops. - */ - if (isFirstPlacement) - { - return false; - } - - ereport(WARNING, (errmsg("could not apply command on shard " - UINT64_FORMAT " on node %s:%d", shardId, - workerName, workerPort), - errdetail("Shard placement will be marked as " - "inactive."))); - - *failedPlacementList = lappend(*failedPlacementList, placement); - } - else - { - ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT - " on node %s:%d", shardId, workerName, - workerPort))); - } - - isFirstPlacement = false; - } + ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections); FreeStringInfo(applyCommand); } + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); + return true; } +/* + * ExecuteCommandOnShardPlacements executes the given ddl command on the + * placements of the given shard, using the given shard connections. + */ +static void +ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, + ShardConnections *shardConnections) +{ + List *connectionList = shardConnections->connectionList; + ListCell *connectionCell = NULL; + + Assert(connectionList != NIL); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + PGresult *result = NULL; + + /* send the query */ + result = PQexec(connection, applyCommand->data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + { + WarnRemoteError(connection, result); + ereport(ERROR, (errmsg("could not execute DDL command on worker " + "node shards"))); + } + else + { + char *workerName = ConnectionGetOptionValue(connection, "host"); + char *workerPort = ConnectionGetOptionValue(connection, "port"); + + ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT + " on node %s:%s", shardId, workerName, + workerPort))); + } + + PQclear(result); + + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + } +} + + /* * AllFinalizedPlacementsAccessible returns true if all the finalized shard * placements for a given relation are accessible. Otherwise, the function @@ -1529,6 +1591,7 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; + bool isTopLevel = true; if (!IsDistributedTable(relOid)) { @@ -1561,7 +1624,7 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data); + ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); resetStringInfo(&ddlString); } } diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 6c7cf7833..008909e09 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -26,6 +26,7 @@ #include "commands/event_trigger.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" @@ -36,7 +37,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_transaction.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" @@ -51,13 +52,12 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" static void LockShardsForModify(List *shardIntervalList); static bool HasReplication(List *shardIntervalList); -static int SendQueryToShards(Query *query, List *shardIntervalList); -static HTAB * OpenConnectionsToAllShardPlacements(List *shardIntervalList); -static void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash); +static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId); static int SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections); @@ -137,7 +137,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) LockShardsForModify(prunedShardIntervalList); - affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList); + affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList, + relationId); PG_RETURN_INT32(affectedTupleCount); } @@ -162,7 +163,7 @@ LockShardsForModify(List *shardIntervalList) { lockMode = ShareLock; } - else if (!HasReplication(shardIntervalList)) /* check if any shards have >1 replica */ + else if (!HasReplication(shardIntervalList)) { lockMode = ShareLock; } @@ -209,153 +210,55 @@ HasReplication(List *shardIntervalList) * the shards when necessary before calling SendQueryToShards. */ static int -SendQueryToShards(Query *query, List *shardIntervalList) +SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) { int affectedTupleCount = 0; - HTAB *shardConnectionHash = OpenConnectionsToAllShardPlacements(shardIntervalList); - List *allShardsConnectionList = ConnectionList(shardConnectionHash); + char *relationOwner = TableOwner(relationId); + HTAB *shardConnectionHash = NULL; + ListCell *shardIntervalCell = NULL; - PG_TRY(); + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, + relationOwner); + + MemoryContextSwitchTo(oldContext); + + foreach(shardIntervalCell, shardIntervalList) { - ListCell *shardIntervalCell = NULL; + ShardInterval *shardInterval = (ShardInterval *) lfirst( + shardIntervalCell); + Oid relationId = shardInterval->relationId; + uint64 shardId = shardInterval->shardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + StringInfo shardQueryString = makeStringInfo(); + char *shardQueryStringData = NULL; + int shardAffectedTupleCount = -1; - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst( - shardIntervalCell); - Oid relationId = shardInterval->relationId; - uint64 shardId = shardInterval->shardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - StringInfo shardQueryString = makeStringInfo(); - char *shardQueryStringData = NULL; - int shardAffectedTupleCount = -1; + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + Assert(shardConnectionsFound); - shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); - Assert(shardConnectionsFound); - - deparse_shard_query(query, relationId, shardId, shardQueryString); - shardQueryStringData = shardQueryString->data; - shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, - shardConnections); - affectedTupleCount += shardAffectedTupleCount; - } - - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - PrepareRemoteTransactions(allShardsConnectionList); - } - - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); + deparse_shard_query(query, relationId, shardId, shardQueryString); + shardQueryStringData = shardQueryString->data; + shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, + shardConnections); + affectedTupleCount += shardAffectedTupleCount; } - PG_CATCH(); - { - /* roll back all transactions */ - AbortRemoteTransactions(allShardsConnectionList); - CloseConnections(allShardsConnectionList); - PG_RE_THROW(); - } - PG_END_TRY(); - - CommitRemoteTransactions(allShardsConnectionList, false); - CloseConnections(allShardsConnectionList); + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); return affectedTupleCount; } -/* - * OpenConnectionsToAllShardPlacement opens connections to all placements of - * the given shard list and returns the hash table containing the connections. - * The resulting hash table maps shardId to ShardConnection struct. - */ -static HTAB * -OpenConnectionsToAllShardPlacements(List *shardIntervalList) -{ - HTAB *shardConnectionHash = CreateShardConnectionHash(); - - ListCell *shardIntervalCell = NULL; - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - - OpenConnectionsToShardPlacements(shardId, shardConnectionHash); - } - - return shardConnectionHash; -} - - -/* - * OpenConnectionsToShardPlacements opens connections to all placements of the - * shard with the given shardId and populates the shardConnectionHash table - * accordingly. - */ -static void -OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash) -{ - bool shardConnectionsFound = false; - - /* get existing connections to the shard placements, if any */ - ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); - - List *shardPlacementList = FinalizedShardPlacementList(shardId); - ListCell *shardPlacementCell = NULL; - List *connectionList = NIL; - - Assert(!shardConnectionsFound); - - if (shardPlacementList == NIL) - { - ereport(ERROR, (errmsg("could not find any shard placements for the shard " - UINT64_FORMAT, shardId))); - } - - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( - shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - char *nodeUser = CurrentUserName(); - PGconn *connection = ConnectToNode(workerName, workerPort, nodeUser); - TransactionConnection *transactionConnection = NULL; - - if (connection == NULL) - { - List *abortConnectionList = ConnectionList(shardConnectionHash); - CloseConnections(abortConnectionList); - - ereport(ERROR, (errmsg("could not establish a connection to all " - "placements"))); - } - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_INVALID; - transactionConnection->connection = connection; - - connectionList = lappend(connectionList, transactionConnection); - } - - shardConnections->connectionList = connectionList; -} - - /* * SendQueryToPlacements sends the given query string to all given placement - * connections of a shard. The query is sent with a BEGIN before the the actual - * query so, CommitRemoteTransactions or AbortRemoteTransactions should be - * called after all queries have been sent successfully. + * connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions + * should be called after all queries have been sent successfully. */ static int SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) @@ -379,13 +282,6 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections CHECK_FOR_INTERRUPTS(); /* send the query */ - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not send query to shard placement"))); - } - result = PQexec(connection, shardQueryString); if (PQresultStatus(result) != PGRES_COMMAND_OK) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ba959f860..921500609 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -18,6 +18,7 @@ #include "commands/explain.h" #include "executor/executor.h" +#include "distributed/commit_protocol.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -28,7 +29,6 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_transaction.h" #include "distributed/multi_utility.h" #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/transaction/commit_protocol.c similarity index 76% rename from src/backend/distributed/utils/multi_transaction.c rename to src/backend/distributed/transaction/commit_protocol.c index 7204dfd92..a1c7ff839 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * multi_transaction.c + * commit_protocol.c * This file contains functions for managing 1PC or 2PC transactions * across many shard placements. * @@ -9,20 +9,19 @@ *------------------------------------------------------------------------- */ + #include "postgres.h" #include "libpq-fe.h" #include "miscadmin.h" -#include "access/xact.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" -#include "distributed/multi_transaction.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_shard_transaction.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -#define INITIAL_CONNECTION_CACHE_SIZE 1001 - - /* Local functions forward declarations */ static uint32 DistributedTransactionId = 0; @@ -46,6 +45,66 @@ InitializeDistributedTransaction(void) } +/* + * CompleteShardPlacementTransactions commits or aborts pending shard placement + * transactions when the local transaction commits or aborts. + */ +void +CompleteShardPlacementTransactions(XactEvent event, void *arg) +{ + if (shardPlacementConnectionList == NIL) + { + /* nothing to do */ + return; + } + else if (event == XACT_EVENT_PRE_COMMIT) + { + /* + * Any failure here will cause local changes to be rolled back, + * and remote changes to either roll back (1PC) or, in case of + * connection or node failure, leave a prepared transaction + * (2PC). + */ + + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + PrepareRemoteTransactions(shardPlacementConnectionList); + } + + return; + } + else if (event == XACT_EVENT_COMMIT) + { + /* + * A failure here will cause some remote changes to either + * roll back (1PC) or, in case of connection or node failure, + * leave a prepared transaction (2PC). However, the local + * changes have already been committed. + */ + + CommitRemoteTransactions(shardPlacementConnectionList, false); + } + else if (event == XACT_EVENT_ABORT) + { + /* + * A failure here will cause some remote changes to either + * roll back (1PC) or, in case of connection or node failure, + * leave a prepared transaction (2PC). The local changes have + * already been rolled back. + */ + + AbortRemoteTransactions(shardPlacementConnectionList); + } + else + { + return; + } + + CloseConnections(shardPlacementConnectionList); + shardPlacementConnectionList = NIL; +} + + /* * PrepareRemoteTransactions prepares all transactions on connections in * connectionList for commit if the 2PC commit protocol is enabled. @@ -82,6 +141,9 @@ PrepareRemoteTransactions(List *connectionList) errmsg("failed to prepare transaction"))); } + ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld", + connectionId))); + PQclear(result); transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; @@ -126,6 +188,8 @@ AbortRemoteTransactions(List *connectionList) command->data, nodeName, nodePort))); } + ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId))); + PQclear(result); } else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) @@ -197,6 +261,9 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure) command->data, nodeName, nodePort))); } } + + ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld", + connectionId))); } else { @@ -224,6 +291,8 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure) nodeName, nodePort))); } } + + ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId))); } PQclear(result); @@ -253,97 +322,3 @@ BuildTransactionName(int connectionId) return commandString; } - - -/* - * CloseConnections closes all connections in connectionList. - */ -void -CloseConnections(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - - PQfinish(connection); - } -} - - -/* - * CreateShardConnectionHash constructs a hash table used for shardId->Connection - * mapping. - */ -HTAB * -CreateShardConnectionHash(void) -{ - HTAB *shardConnectionsHash = NULL; - int hashFlags = 0; - HASHCTL info; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(int64); - info.entrysize = sizeof(ShardConnections); - info.hash = tag_hash; - - hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; - shardConnectionsHash = hash_create("Shard Connections Hash", - INITIAL_CONNECTION_CACHE_SIZE, &info, - hashFlags); - - return shardConnectionsHash; -} - - -/* - * GetShardConnections finds existing connections for a shard in the hash. - * If not found, then a ShardConnections structure with empty connectionList - * is returned. - */ -ShardConnections * -GetShardConnections(HTAB *shardConnectionHash, int64 shardId, - bool *shardConnectionsFound) -{ - ShardConnections *shardConnections = NULL; - - shardConnections = (ShardConnections *) hash_search(shardConnectionHash, - &shardId, - HASH_ENTER, - shardConnectionsFound); - if (!*shardConnectionsFound) - { - shardConnections->shardId = shardId; - shardConnections->connectionList = NIL; - } - - return shardConnections; -} - - -/* - * ConnectionList flattens the connection hash to a list of placement connections. - */ -List * -ConnectionList(HTAB *connectionHash) -{ - List *connectionList = NIL; - HASH_SEQ_STATUS status; - ShardConnections *shardConnections = NULL; - - hash_seq_init(&status, connectionHash); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - while (shardConnections != NULL) - { - List *shardConnectionsList = list_copy(shardConnections->connectionList); - connectionList = list_concat(connectionList, shardConnectionsList); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - } - - return connectionList; -} diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c new file mode 100644 index 000000000..b11019be0 --- /dev/null +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -0,0 +1,243 @@ +/*------------------------------------------------------------------------- + * + * multi_shard_transaction.c + * This file contains functions for managing 1PC or 2PC transactions + * across many shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "libpq-fe.h" +#include "postgres.h" + +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_shard_transaction.h" +#include "nodes/pg_list.h" + + +#define INITIAL_CONNECTION_CACHE_SIZE 1001 + + +List *shardPlacementConnectionList = NIL; + +static void RegisterShardPlacementXactCallback(void); + +static bool isXactCallbackRegistered = false; + + +/* + * OpenTransactionsToAllShardPlacements opens connections to all placements of + * the given shard Id Pointer List and returns the hash table containing the connections. + * The resulting hash table maps shardIds to ShardConnection structs. + */ +HTAB * +OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) +{ + HTAB *shardConnectionHash = CreateShardConnectionHash(); + ListCell *shardIntervalCell = NULL; + ListCell *connectionCell = NULL; + List *connectionList = NIL; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + + OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName); + } + + connectionList = ConnectionList(shardConnectionHash); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + PGresult *result = NULL; + + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(connection, result); + } + } + + shardPlacementConnectionList = ConnectionList(shardConnectionHash); + + RegisterShardPlacementXactCallback(); + + return shardConnectionHash; +} + + +/* + * CreateShardConnectionHash constructs a hash table used for shardId->Connection + * mapping. + */ +HTAB * +CreateShardConnectionHash(void) +{ + HTAB *shardConnectionsHash = NULL; + int hashFlags = 0; + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardConnections); + info.hash = tag_hash; + + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; + shardConnectionsHash = hash_create("Shard Connections Hash", + INITIAL_CONNECTION_CACHE_SIZE, &info, + hashFlags); + + return shardConnectionsHash; +} + + +/* + * OpenConnectionsToShardPlacements opens connections to all placements of the + * shard with the given shardId and populates the shardConnectionHash table + * accordingly. + */ +void +OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, + char *userName) +{ + bool shardConnectionsFound = false; + + /* get existing connections to the shard placements, if any */ + ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + + List *shardPlacementList = FinalizedShardPlacementList(shardId); + ListCell *shardPlacementCell = NULL; + List *connectionList = NIL; + + Assert(!shardConnectionsFound); + + if (shardPlacementList == NIL) + { + ereport(ERROR, (errmsg("could not find any shard placements for the shard " + UINT64_FORMAT, shardId))); + } + + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( + shardPlacementCell); + char *workerName = shardPlacement->nodeName; + uint32 workerPort = shardPlacement->nodePort; + PGconn *connection = ConnectToNode(workerName, workerPort, userName); + TransactionConnection *transactionConnection = NULL; + + if (connection == NULL) + { + List *abortConnectionList = ConnectionList(shardConnectionHash); + CloseConnections(abortConnectionList); + + ereport(ERROR, (errmsg("could not establish a connection to all " + "placements of shard %lu", shardId))); + } + + transactionConnection = palloc0(sizeof(TransactionConnection)); + + transactionConnection->connectionId = shardConnections->shardId; + transactionConnection->transactionState = TRANSACTION_STATE_INVALID; + transactionConnection->connection = connection; + + connectionList = lappend(connectionList, transactionConnection); + } + + shardConnections->connectionList = connectionList; +} + + +/* + * GetShardConnections finds existing connections for a shard in the hash. + * If not found, then a ShardConnections structure with empty connectionList + * is returned. + */ +ShardConnections * +GetShardConnections(HTAB *shardConnectionHash, int64 shardId, + bool *shardConnectionsFound) +{ + ShardConnections *shardConnections = NULL; + + shardConnections = (ShardConnections *) hash_search(shardConnectionHash, + &shardId, + HASH_ENTER, + shardConnectionsFound); + if (!*shardConnectionsFound) + { + shardConnections->shardId = shardId; + shardConnections->connectionList = NIL; + } + + return shardConnections; +} + + +/* + * ConnectionList flattens the connection hash to a list of placement connections. + */ +List * +ConnectionList(HTAB *connectionHash) +{ + List *connectionList = NIL; + HASH_SEQ_STATUS status; + ShardConnections *shardConnections = NULL; + + hash_seq_init(&status, connectionHash); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + while (shardConnections != NULL) + { + List *shardConnectionsList = list_copy(shardConnections->connectionList); + connectionList = list_concat(connectionList, shardConnectionsList); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + } + + return connectionList; +} + + +/* + * EnableXactCallback ensures the XactCallback for committing/aborting + * remote worker transactions is registered. + */ +void +RegisterShardPlacementXactCallback(void) +{ + if (!isXactCallbackRegistered) + { + RegisterXactCallback(CompleteShardPlacementTransactions, NULL); + isXactCallbackRegistered = true; + } +} + + +/* + * CloseConnections closes all connections in connectionList. + */ +void +CloseConnections(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + + PQfinish(connection); + } +} diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/commit_protocol.h similarity index 71% rename from src/include/distributed/multi_transaction.h rename to src/include/distributed/commit_protocol.h index 7a50eb274..346de8ca9 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/commit_protocol.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * multi_transaction.h + * commit_protocol.h * Type and function declarations used in performing transactions across * shard placements. * @@ -9,10 +9,11 @@ *------------------------------------------------------------------------- */ -#ifndef MULTI_TRANSACTION_H -#define MULTI_TRANSACTION_H +#ifndef COMMIT_PROTOCOL_H +#define COMMIT_PROTOCOL_H +#include "access/xact.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" @@ -47,29 +48,15 @@ typedef struct TransactionConnection } TransactionConnection; -/* ShardConnections represents a set of connections for each placement of a shard */ -typedef struct ShardConnections -{ - int64 shardId; - List *connectionList; -} ShardConnections; - - /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); +extern void CompleteShardPlacementTransactions(XactEvent event, void *arg); extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); -extern void CloseConnections(List *connectionList); -extern HTAB * CreateShardConnectionHash(void); -extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, - int64 shardId, - bool *shardConnectionsFound); -extern List * ConnectionList(HTAB *connectionHash); - -#endif /* MULTI_TRANSACTION_H */ +#endif /* COMMIT_PROTOCOL_H */ diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 3cc44092b..d27926bbf 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -15,11 +15,6 @@ #include "nodes/parsenodes.h" - -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* * A smaller version of copy.c's CopyStateData, trimmed to the elements * necessary to copy out results. While it'd be a bit nicer to share code, diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h new file mode 100644 index 000000000..2f7c30fd9 --- /dev/null +++ b/src/include/distributed/multi_shard_transaction.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * multi_shard_transaction.h + * Type and function declarations used in performing transactions across + * shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MULTI_SHARD_TRANSACTION_H +#define MULTI_SHARD_TRANSACTION_H + + +#include "access/xact.h" +#include "utils/hsearch.h" +#include "nodes/pg_list.h" + + +/* ShardConnections represents a set of connections for each placement of a shard */ +typedef struct ShardConnections +{ + int64 shardId; + List *connectionList; +} ShardConnections; + + +extern List *shardPlacementConnectionList; + + +extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, + char *relationOwner); +extern HTAB * CreateShardConnectionHash(void); +extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, + char *nodeUser); +extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, + int64 shardId, + bool *shardConnectionsFound); +extern List * ConnectionList(HTAB *connectionHash); +extern void CloseConnections(List *connectionList); + +#endif /* MULTI_SHARD_TRANSACTION_H */ diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 20698676a..aaf2165e6 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -42,9 +42,14 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); (1 row) CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); +DEBUG: switching to 2PC for the transaction DEBUG: applied command on shard 650000 on node localhost:57637 DEBUG: applied command on shard 650001 on node localhost:57638 DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" +DEBUG: sent PREPARE TRANSACTION over connection 650000 +DEBUG: sent PREPARE TRANSACTION over connection 650001 +DEBUG: sent COMMIT PREPARED over connection 650000 +DEBUG: sent COMMIT PREPARED over connection 650001 CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 1394563d3..fc4866d04 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -103,6 +103,10 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE DEBUG: predicate pruning for shardId 350001 DEBUG: predicate pruning for shardId 350002 DEBUG: predicate pruning for shardId 350003 +DEBUG: sent PREPARE TRANSACTION over connection 350000 +DEBUG: sent PREPARE TRANSACTION over connection 350000 +DEBUG: sent COMMIT PREPARED over connection 350000 +DEBUG: sent COMMIT PREPARED over connection 350000 master_modify_multiple_shards ------------------------------- 1 diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index 3df73f6db..4584c31fd 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -156,6 +156,42 @@ ALTER TABLE IF EXISTS lineitem_alter RENAME l_orderkey TO l_orderkey_renamed; -- node \d lineitem_alter +-- verify that non-propagated ddl commands are allowed inside a transaction block +SET citus.enable_ddl_propagation to false; +BEGIN; +CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; +DROP INDEX temp_index_1; + +-- verify that distributed ddl commands are not allowed inside a transaction block +SET citus.enable_ddl_propagation to true; +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; +DROP INDEX temp_index_2; + +-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc' +SET citus.multi_shard_commit_protocol TO '1pc'; +SET client_min_messages TO DEBUG2; +CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey); +RESET client_min_messages; +DROP INDEX temp_index_3; + +-- verify that citus.multi_shard_commit_protocol value is not changed +SHOW citus.multi_shard_commit_protocol; + +-- verify that not any of shard placements are marked as failed when a query failure occurs +CREATE TABLE test_ab (a int, b int); +SELECT master_create_distributed_table('test_ab', 'a', 'hash'); +SELECT master_create_worker_shards('test_ab', 8, 2); +INSERT INTO test_ab VALUES (2, 10); +INSERT INTO test_ab VALUES (2, 11); +CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); +SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE logicalrelid='test_ab'::regclass AND shardstate=3; + -- Check that the schema on the worker still looks reasonable \c - - - :worker_1_port SELECT attname, atttypid::regtype diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 2688ce896..5af6ff985 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -412,6 +412,127 @@ ERROR: renaming distributed tables or their objects is currently unsupported l_comment | character varying(44) | not null null_column | integer | +-- verify that non-propagated ddl commands are allowed inside a transaction block +SET citus.enable_ddl_propagation to false; +BEGIN; +CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +--------------+---------------- + temp_index_1 | lineitem_alter +(1 row) + +DROP INDEX temp_index_1; +-- verify that distributed ddl commands are not allowed inside a transaction block +SET citus.enable_ddl_propagation to true; +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +ERROR: distributed DDL commands cannot run inside a transaction block +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +-----------+----------- +(0 rows) + +DROP INDEX temp_index_2; +ERROR: index "temp_index_2" does not exist +-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc' +SET citus.multi_shard_commit_protocol TO '1pc'; +SET client_min_messages TO DEBUG2; +CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey); +DEBUG: switching to 2PC for the transaction +DEBUG: applied command on shard 220000 on node localhost:57638 +DEBUG: applied command on shard 220000 on node localhost:57637 +DEBUG: applied command on shard 220003 on node localhost:57637 +DEBUG: applied command on shard 220003 on node localhost:57638 +DEBUG: applied command on shard 220007 on node localhost:57637 +DEBUG: applied command on shard 220007 on node localhost:57638 +DEBUG: applied command on shard 220004 on node localhost:57638 +DEBUG: applied command on shard 220004 on node localhost:57637 +DEBUG: applied command on shard 220008 on node localhost:57638 +DEBUG: applied command on shard 220008 on node localhost:57637 +DEBUG: applied command on shard 220001 on node localhost:57637 +DEBUG: applied command on shard 220001 on node localhost:57638 +DEBUG: applied command on shard 220002 on node localhost:57638 +DEBUG: applied command on shard 220002 on node localhost:57637 +DEBUG: applied command on shard 220005 on node localhost:57637 +DEBUG: applied command on shard 220005 on node localhost:57638 +DEBUG: applied command on shard 220009 on node localhost:57637 +DEBUG: applied command on shard 220009 on node localhost:57638 +DEBUG: building index "temp_index_3" on table "lineitem_alter" +DEBUG: sent PREPARE TRANSACTION over connection 220007 +DEBUG: sent PREPARE TRANSACTION over connection 220007 +DEBUG: sent PREPARE TRANSACTION over connection 220002 +DEBUG: sent PREPARE TRANSACTION over connection 220002 +DEBUG: sent PREPARE TRANSACTION over connection 220004 +DEBUG: sent PREPARE TRANSACTION over connection 220004 +DEBUG: sent PREPARE TRANSACTION over connection 220000 +DEBUG: sent PREPARE TRANSACTION over connection 220000 +DEBUG: sent PREPARE TRANSACTION over connection 220008 +DEBUG: sent PREPARE TRANSACTION over connection 220008 +DEBUG: sent PREPARE TRANSACTION over connection 220009 +DEBUG: sent PREPARE TRANSACTION over connection 220009 +DEBUG: sent PREPARE TRANSACTION over connection 220001 +DEBUG: sent PREPARE TRANSACTION over connection 220001 +DEBUG: sent PREPARE TRANSACTION over connection 220005 +DEBUG: sent PREPARE TRANSACTION over connection 220005 +DEBUG: sent PREPARE TRANSACTION over connection 220003 +DEBUG: sent PREPARE TRANSACTION over connection 220003 +DEBUG: sent COMMIT PREPARED over connection 220007 +DEBUG: sent COMMIT PREPARED over connection 220007 +DEBUG: sent COMMIT PREPARED over connection 220002 +DEBUG: sent COMMIT PREPARED over connection 220002 +DEBUG: sent COMMIT PREPARED over connection 220004 +DEBUG: sent COMMIT PREPARED over connection 220004 +DEBUG: sent COMMIT PREPARED over connection 220000 +DEBUG: sent COMMIT PREPARED over connection 220000 +DEBUG: sent COMMIT PREPARED over connection 220008 +DEBUG: sent COMMIT PREPARED over connection 220008 +DEBUG: sent COMMIT PREPARED over connection 220009 +DEBUG: sent COMMIT PREPARED over connection 220009 +DEBUG: sent COMMIT PREPARED over connection 220001 +DEBUG: sent COMMIT PREPARED over connection 220001 +DEBUG: sent COMMIT PREPARED over connection 220005 +DEBUG: sent COMMIT PREPARED over connection 220005 +DEBUG: sent COMMIT PREPARED over connection 220003 +DEBUG: sent COMMIT PREPARED over connection 220003 +RESET client_min_messages; +DROP INDEX temp_index_3; +-- verify that citus.multi_shard_commit_protocol value is not changed +SHOW citus.multi_shard_commit_protocol; + citus.multi_shard_commit_protocol +----------------------------------- + 1pc +(1 row) + +-- verify that not any of shard placements are marked as failed when a query failure occurs +CREATE TABLE test_ab (a int, b int); +SELECT master_create_distributed_table('test_ab', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_ab', 8, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO test_ab VALUES (2, 10); +INSERT INTO test_ab VALUES (2, 11); +CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); +WARNING: could not create unique index "temp_unique_index_1_220016" +DETAIL: Key (a)=(2) is duplicated. +CONTEXT: while executing command on localhost:57638 +ERROR: could not execute DDL command on worker node shards +SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE logicalrelid='test_ab'::regclass AND shardstate=3; + shardid +--------- +(0 rows) + -- Check that the schema on the worker still looks reasonable \c - - - :worker_1_port SELECT attname, atttypid::regtype