diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index f946adadd..6ee2cc9a7 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -15,7 +15,7 @@ DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql) SCRIPTS = ../../bin/scripts/copy_to_distributed_table # directories with source files -SUBDIRS = . commands executor master planner relay test utils worker +SUBDIRS = . commands executor master planner relay test transaction utils worker # That patsubst rule searches all directories listed in SUBDIRS for .c # files, and adds the corresponding .o files to OBJS diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4fe427247..adfe5791d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -71,6 +71,7 @@ #include "commands/copy.h" #include "commands/defrem.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/metadata_cache.h" @@ -79,7 +80,7 @@ #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_physical_planner.h" -#include "distributed/multi_transaction.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index ab0e1c2c2..40b81daee 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -18,11 +18,15 @@ #include "commands/defrem.h" #include "commands/tablecmds.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" #include "distributed/multi_utility.h" #include "distributed/multi_join_order.h" +#include "distributed/multi_shard_transaction.h" +#include "distributed/resource_lock.h" #include "distributed/transmit.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" @@ -36,12 +40,14 @@ #include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ + /* * This struct defines the state for the callback for drop statements. * It is copied as it is from commands/tablecmds.c in Postgres source. @@ -62,11 +68,11 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement); static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner); static Node * ProcessIndexStmt(IndexStmt *createIndexStatement, - const char *createIndexCommand); + const char *createIndexCommand, bool isTopLevel); static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement, - const char *dropIndexCommand); + const char *dropIndexCommand, bool isTopLevel); static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, - const char *alterTableCommand); + const char *alterTableCommand, bool isTopLevel); /* Local functions forward declarations for unsupported command checks */ static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); @@ -77,9 +83,12 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); -static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString); -static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, - List **failedPlacementList); +static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, + bool isTopLevel); +static void SetLocalCommitProtocolTo2PC(void); +static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString); +static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, + ShardConnections *shardConnections); static bool AllFinalizedPlacementsAccessible(Oid relationId); static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); @@ -150,9 +159,12 @@ multi_ProcessUtility(Node *parsetree, /* ddl commands are propagated to workers only if EnableDDLPropagation is set */ if (EnableDDLPropagation) { + bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL); + if (IsA(parsetree, IndexStmt)) { - parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString); + parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString, + isTopLevel); } if (IsA(parsetree, DropStmt)) @@ -160,7 +172,7 @@ multi_ProcessUtility(Node *parsetree, DropStmt *dropStatement = (DropStmt *) parsetree; if (dropStatement->removeType == OBJECT_INDEX) { - parsetree = ProcessDropIndexStmt(dropStatement, queryString); + parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel); } } @@ -169,7 +181,8 @@ multi_ProcessUtility(Node *parsetree, AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree; if (alterTableStmt->relkind == OBJECT_TABLE) { - parsetree = ProcessAlterTableStmt(alterTableStmt, queryString); + parsetree = ProcessAlterTableStmt(alterTableStmt, queryString, + isTopLevel); } } @@ -457,7 +470,8 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR * master node table. */ static Node * -ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand) +ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand, + bool isTopLevel) { /* * We first check whether a distributed relation is affected. For that, we need to @@ -504,7 +518,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand ErrorIfUnsupportedIndexStmt(createIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, createIndexCommand); + ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel); } } @@ -521,7 +535,8 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand * master node table. */ static Node * -ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) +ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand, + bool isTopLevel) { ListCell *dropObjectCell = NULL; Oid distributedIndexId = InvalidOid; @@ -590,7 +605,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand); + ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel); } return (Node *) dropIndexStatement; @@ -606,7 +621,8 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand) * master node table. */ static Node * -ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand) +ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand, + bool isTopLevel) { /* first check whether a distributed relation is affected */ if (alterTableStatement->relation != NULL) @@ -621,7 +637,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl ErrorIfUnsupportedAlterTableStmt(alterTableStatement); /* if it is supported, go ahead and execute the command */ - ExecuteDistributedDDLCommand(relationId, alterTableCommand); + ExecuteDistributedDDLCommand(relationId, alterTableCommand, isTopLevel); } } } @@ -985,18 +1001,23 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt) /* * ExecuteDistributedDDLCommand applies a given DDL command to the given - * distributed table. If the function is unable to access all the finalized - * shard placements, then it fails early and errors out. If the command - * successfully executed on any finalized shard placement, and failed on - * others, then it marks the placements on which execution failed as invalid. + * distributed table in a distributed transaction. For the transaction, the value of + * citus.multi_shard_commit_protocol is set to '2pc' so that two phase commit mechanism + * is used, regardless of the actual value of citus.multi_shard_commit_protocol. In + * the commit protocol, a BEGIN is sent after connection to each shard placement + * and COMMIT/ROLLBACK is handled by CompleteShardPlacementTransactions function. */ static void -ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) +ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString, + bool isTopLevel) { - List *failedPlacementList = NIL; bool executionOK = false; + bool allPlacementsAccessible = false; - bool allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); + PreventTransactionChain(true, "distributed DDL commands"); + SetLocalCommitProtocolTo2PC(); + + allPlacementsAccessible = AllFinalizedPlacementsAccessible(relationId); if (!allPlacementsAccessible) { ereport(ERROR, (errmsg("cannot execute command: %s", ddlCommandString), @@ -1007,31 +1028,13 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) /* make sure we don't process cancel signals */ HOLD_INTERRUPTS(); - executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString, - &failedPlacementList); + executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString); /* if command could not be executed on any finalized shard placement, error out */ if (!executionOK) { ereport(ERROR, (errmsg("could not execute DDL command on worker node shards"))); } - else - { - /* else, mark failed placements as inactive */ - ListCell *failedPlacementCell = NULL; - foreach(failedPlacementCell, failedPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); - uint64 shardId = placement->shardId; - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - uint64 oldShardLength = placement->shardLength; - - DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength, - workerName, workerPort); - } - } if (QueryCancelPending) { @@ -1043,93 +1046,152 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString) } +/* + * SwitchTo2PCForTransaction sets the citus.multi_shard_commit_protocol value + * to '2pc' for the current transaction, in order to force 2PC even the value + * of citus.multi_shard_commit_protocol is '1pc'; + */ +static void +SetLocalCommitProtocolTo2PC(void) +{ + if (MultiShardCommitProtocol != COMMIT_PROTOCOL_2PC) + { + ereport(DEBUG2, (errmsg("switching to 2PC for the transaction"))); + +#if (PG_VERSION_NUM >= 90500) + set_config_option("citus.multi_shard_commit_protocol", + "2pc", + PGC_USERSET, + PGC_S_SESSION, + GUC_ACTION_LOCAL, + true, 0, false); +#else + set_config_option("citus.multi_shard_commit_protocol", + "2pc", + PGC_USERSET, + PGC_S_SESSION, + GUC_ACTION_LOCAL, + true, 0); +#endif + } +} + + /* * ExecuteCommandOnWorkerShards executes a given command on all the finalized - * shard placements of the given table. If the remote command errors out on the - * first attempted placement, the function returns false. Otherwise, it returns - * true. + * shard placements of the given table within a distributed transaction. The + * value of citus.multi_shard_commit_protocol is set to '2pc' by the caller + * ExecuteDistributedDDLCommand function so that two phase commit protocol is used. * - * If the remote query errors out on the first attempted placement, it is very - * likely that the command is going to fail on other placements too. This is - * because most errors here will be PostgreSQL errors. Hence, the function fails - * fast to avoid marking a high number of placements as failed. If the command - * succeeds on at least one placement before failing on others, then the list of - * failed placements is returned in failedPlacementList. + * ExecuteCommandOnWorkerShards opens an individual connection for each of the + * shard placement. After all connections are opened, a BEGIN command followed by + * a proper "SELECT worker_apply_shard_ddl_command(, )" is + * sent to all open connections in a serial manner. + * + * The opened transactions are handled by the CompleteShardPlacementTransactions + * function. * * Note: There are certain errors which would occur on few nodes and not on the * others. For example, adding a column with a type which exists on some nodes - * and not on the others. In that case, this function might still end up returning - * a large number of placements as failed. + * and not on the others. + * + * Note: The execution will be blocked if a prepared transaction from previous + * executions exist on the workers. In this case, those prepared transactions should + * be removed by either COMMIT PREPARED or ROLLBACK PREPARED. */ static bool -ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, - List **failedPlacementList) +ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString) { - bool isFirstPlacement = true; - ListCell *shardCell = NULL; - List *shardList = NIL; - char *relationOwner = TableOwner(relationId); + List *shardIntervalList = LoadShardIntervalList(relationId); + char *tableOwner = TableOwner(relationId); + HTAB *shardConnectionHash = NULL; + ListCell *shardIntervalCell = NULL; - shardList = LoadShardList(relationId); - foreach(shardCell, shardList) + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + LockShards(shardIntervalList, ShareLock); + + shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, + tableOwner); + + MemoryContextSwitchTo(oldContext); + + foreach(shardIntervalCell, shardIntervalList) { - List *shardPlacementList = NIL; - ListCell *shardPlacementCell = NULL; - uint64 *shardIdPointer = (uint64 *) lfirst(shardCell); - uint64 shardId = (*shardIdPointer); + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + ShardConnections *shardConnections = NULL; + bool shardConnectionsFound = false; + char *escapedCommandString = NULL; + StringInfo applyCommand = makeStringInfo(); + + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + Assert(shardConnectionsFound); /* build the shard ddl command */ - char *escapedCommandString = quote_literal_cstr(commandString); - StringInfo applyCommand = makeStringInfo(); + escapedCommandString = quote_literal_cstr(commandString); appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, escapedCommandString); - shardPlacementList = FinalizedShardPlacementList(shardId); - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); - char *workerName = placement->nodeName; - uint32 workerPort = placement->nodePort; - - List *queryResultList = ExecuteRemoteQuery(workerName, workerPort, - relationOwner, applyCommand); - if (queryResultList == NIL) - { - /* - * If we failed on the first placement, return false. We return - * here instead of exiting at the end to avoid breaking through - * multiple loops. - */ - if (isFirstPlacement) - { - return false; - } - - ereport(WARNING, (errmsg("could not apply command on shard " - UINT64_FORMAT " on node %s:%d", shardId, - workerName, workerPort), - errdetail("Shard placement will be marked as " - "inactive."))); - - *failedPlacementList = lappend(*failedPlacementList, placement); - } - else - { - ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT - " on node %s:%d", shardId, workerName, - workerPort))); - } - - isFirstPlacement = false; - } + ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections); FreeStringInfo(applyCommand); } + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); + return true; } +/* + * ExecuteCommandOnShardPlacements executes the given ddl command on the + * placements of the given shard, using the given shard connections. + */ +static void +ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId, + ShardConnections *shardConnections) +{ + List *connectionList = shardConnections->connectionList; + ListCell *connectionCell = NULL; + + Assert(connectionList != NIL); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + PGresult *result = NULL; + + /* send the query */ + result = PQexec(connection, applyCommand->data); + if (PQresultStatus(result) != PGRES_TUPLES_OK) + { + WarnRemoteError(connection, result); + ereport(ERROR, (errmsg("could not execute DDL command on worker " + "node shards"))); + } + else + { + char *workerName = ConnectionGetOptionValue(connection, "host"); + char *workerPort = ConnectionGetOptionValue(connection, "port"); + + ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT + " on node %s:%s", shardId, workerName, + workerPort))); + } + + PQclear(result); + + transactionConnection->transactionState = TRANSACTION_STATE_OPEN; + } +} + + /* * AllFinalizedPlacementsAccessible returns true if all the finalized shard * placements for a given relation are accessible. Otherwise, the function @@ -1529,6 +1591,7 @@ ReplicateGrantStmt(Node *parsetree) RangeVar *relvar = (RangeVar *) lfirst(objectCell); Oid relOid = RangeVarGetRelid(relvar, NoLock, false); const char *grantOption = ""; + bool isTopLevel = true; if (!IsDistributedTable(relOid)) { @@ -1561,7 +1624,7 @@ ReplicateGrantStmt(Node *parsetree) granteesString.data); } - ExecuteDistributedDDLCommand(relOid, ddlString.data); + ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel); resetStringInfo(&ddlString); } } diff --git a/src/backend/distributed/master/master_modify_multiple_shards.c b/src/backend/distributed/master/master_modify_multiple_shards.c index 6c7cf7833..008909e09 100644 --- a/src/backend/distributed/master/master_modify_multiple_shards.c +++ b/src/backend/distributed/master/master_modify_multiple_shards.c @@ -26,6 +26,7 @@ #include "commands/event_trigger.h" #include "distributed/citus_clauses.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" #include "distributed/listutils.h" #include "distributed/master_metadata_utility.h" @@ -36,7 +37,7 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_transaction.h" +#include "distributed/multi_shard_transaction.h" #include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_partition.h" #include "distributed/resource_lock.h" @@ -51,13 +52,12 @@ #include "utils/datum.h" #include "utils/inval.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" static void LockShardsForModify(List *shardIntervalList); static bool HasReplication(List *shardIntervalList); -static int SendQueryToShards(Query *query, List *shardIntervalList); -static HTAB * OpenConnectionsToAllShardPlacements(List *shardIntervalList); -static void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash); +static int SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId); static int SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections); @@ -137,7 +137,8 @@ master_modify_multiple_shards(PG_FUNCTION_ARGS) LockShardsForModify(prunedShardIntervalList); - affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList); + affectedTupleCount = SendQueryToShards(modifyQuery, prunedShardIntervalList, + relationId); PG_RETURN_INT32(affectedTupleCount); } @@ -162,7 +163,7 @@ LockShardsForModify(List *shardIntervalList) { lockMode = ShareLock; } - else if (!HasReplication(shardIntervalList)) /* check if any shards have >1 replica */ + else if (!HasReplication(shardIntervalList)) { lockMode = ShareLock; } @@ -209,153 +210,55 @@ HasReplication(List *shardIntervalList) * the shards when necessary before calling SendQueryToShards. */ static int -SendQueryToShards(Query *query, List *shardIntervalList) +SendQueryToShards(Query *query, List *shardIntervalList, Oid relationId) { int affectedTupleCount = 0; - HTAB *shardConnectionHash = OpenConnectionsToAllShardPlacements(shardIntervalList); - List *allShardsConnectionList = ConnectionList(shardConnectionHash); + char *relationOwner = TableOwner(relationId); + HTAB *shardConnectionHash = NULL; + ListCell *shardIntervalCell = NULL; - PG_TRY(); + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); + + shardConnectionHash = OpenTransactionsToAllShardPlacements(shardIntervalList, + relationOwner); + + MemoryContextSwitchTo(oldContext); + + foreach(shardIntervalCell, shardIntervalList) { - ListCell *shardIntervalCell = NULL; + ShardInterval *shardInterval = (ShardInterval *) lfirst( + shardIntervalCell); + Oid relationId = shardInterval->relationId; + uint64 shardId = shardInterval->shardId; + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + StringInfo shardQueryString = makeStringInfo(); + char *shardQueryStringData = NULL; + int shardAffectedTupleCount = -1; - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst( - shardIntervalCell); - Oid relationId = shardInterval->relationId; - uint64 shardId = shardInterval->shardId; - bool shardConnectionsFound = false; - ShardConnections *shardConnections = NULL; - StringInfo shardQueryString = makeStringInfo(); - char *shardQueryStringData = NULL; - int shardAffectedTupleCount = -1; + shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + Assert(shardConnectionsFound); - shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); - Assert(shardConnectionsFound); - - deparse_shard_query(query, relationId, shardId, shardQueryString); - shardQueryStringData = shardQueryString->data; - shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, - shardConnections); - affectedTupleCount += shardAffectedTupleCount; - } - - if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) - { - PrepareRemoteTransactions(allShardsConnectionList); - } - - /* check for cancellation one last time before returning */ - CHECK_FOR_INTERRUPTS(); + deparse_shard_query(query, relationId, shardId, shardQueryString); + shardQueryStringData = shardQueryString->data; + shardAffectedTupleCount = SendQueryToPlacements(shardQueryStringData, + shardConnections); + affectedTupleCount += shardAffectedTupleCount; } - PG_CATCH(); - { - /* roll back all transactions */ - AbortRemoteTransactions(allShardsConnectionList); - CloseConnections(allShardsConnectionList); - PG_RE_THROW(); - } - PG_END_TRY(); - - CommitRemoteTransactions(allShardsConnectionList, false); - CloseConnections(allShardsConnectionList); + /* check for cancellation one last time before returning */ + CHECK_FOR_INTERRUPTS(); return affectedTupleCount; } -/* - * OpenConnectionsToAllShardPlacement opens connections to all placements of - * the given shard list and returns the hash table containing the connections. - * The resulting hash table maps shardId to ShardConnection struct. - */ -static HTAB * -OpenConnectionsToAllShardPlacements(List *shardIntervalList) -{ - HTAB *shardConnectionHash = CreateShardConnectionHash(); - - ListCell *shardIntervalCell = NULL; - - foreach(shardIntervalCell, shardIntervalList) - { - ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); - uint64 shardId = shardInterval->shardId; - - OpenConnectionsToShardPlacements(shardId, shardConnectionHash); - } - - return shardConnectionHash; -} - - -/* - * OpenConnectionsToShardPlacements opens connections to all placements of the - * shard with the given shardId and populates the shardConnectionHash table - * accordingly. - */ -static void -OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash) -{ - bool shardConnectionsFound = false; - - /* get existing connections to the shard placements, if any */ - ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, - shardId, - &shardConnectionsFound); - - List *shardPlacementList = FinalizedShardPlacementList(shardId); - ListCell *shardPlacementCell = NULL; - List *connectionList = NIL; - - Assert(!shardConnectionsFound); - - if (shardPlacementList == NIL) - { - ereport(ERROR, (errmsg("could not find any shard placements for the shard " - UINT64_FORMAT, shardId))); - } - - foreach(shardPlacementCell, shardPlacementList) - { - ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( - shardPlacementCell); - char *workerName = shardPlacement->nodeName; - uint32 workerPort = shardPlacement->nodePort; - char *nodeUser = CurrentUserName(); - PGconn *connection = ConnectToNode(workerName, workerPort, nodeUser); - TransactionConnection *transactionConnection = NULL; - - if (connection == NULL) - { - List *abortConnectionList = ConnectionList(shardConnectionHash); - CloseConnections(abortConnectionList); - - ereport(ERROR, (errmsg("could not establish a connection to all " - "placements"))); - } - - transactionConnection = palloc0(sizeof(TransactionConnection)); - - transactionConnection->connectionId = shardConnections->shardId; - transactionConnection->transactionState = TRANSACTION_STATE_INVALID; - transactionConnection->connection = connection; - - connectionList = lappend(connectionList, transactionConnection); - } - - shardConnections->connectionList = connectionList; -} - - /* * SendQueryToPlacements sends the given query string to all given placement - * connections of a shard. The query is sent with a BEGIN before the the actual - * query so, CommitRemoteTransactions or AbortRemoteTransactions should be - * called after all queries have been sent successfully. + * connections of a shard. CommitRemoteTransactions or AbortRemoteTransactions + * should be called after all queries have been sent successfully. */ static int SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections) @@ -379,13 +282,6 @@ SendQueryToPlacements(char *shardQueryString, ShardConnections *shardConnections CHECK_FOR_INTERRUPTS(); /* send the query */ - result = PQexec(connection, "BEGIN"); - if (PQresultStatus(result) != PGRES_COMMAND_OK) - { - WarnRemoteError(connection, result); - ereport(ERROR, (errmsg("could not send query to shard placement"))); - } - result = PQexec(connection, shardQueryString); if (PQresultStatus(result) != PGRES_COMMAND_OK) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index ba959f860..921500609 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -18,6 +18,7 @@ #include "commands/explain.h" #include "executor/executor.h" +#include "distributed/commit_protocol.h" #include "distributed/master_protocol.h" #include "distributed/multi_copy.h" #include "distributed/multi_executor.h" @@ -28,7 +29,6 @@ #include "distributed/multi_router_executor.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" -#include "distributed/multi_transaction.h" #include "distributed/multi_utility.h" #include "distributed/task_tracker.h" #include "distributed/worker_manager.h" diff --git a/src/backend/distributed/utils/multi_transaction.c b/src/backend/distributed/transaction/commit_protocol.c similarity index 76% rename from src/backend/distributed/utils/multi_transaction.c rename to src/backend/distributed/transaction/commit_protocol.c index 7204dfd92..a1c7ff839 100644 --- a/src/backend/distributed/utils/multi_transaction.c +++ b/src/backend/distributed/transaction/commit_protocol.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * multi_transaction.c + * commit_protocol.c * This file contains functions for managing 1PC or 2PC transactions * across many shard placements. * @@ -9,20 +9,19 @@ *------------------------------------------------------------------------- */ + #include "postgres.h" #include "libpq-fe.h" #include "miscadmin.h" -#include "access/xact.h" +#include "distributed/commit_protocol.h" #include "distributed/connection_cache.h" -#include "distributed/multi_transaction.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_shard_transaction.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" -#define INITIAL_CONNECTION_CACHE_SIZE 1001 - - /* Local functions forward declarations */ static uint32 DistributedTransactionId = 0; @@ -46,6 +45,66 @@ InitializeDistributedTransaction(void) } +/* + * CompleteShardPlacementTransactions commits or aborts pending shard placement + * transactions when the local transaction commits or aborts. + */ +void +CompleteShardPlacementTransactions(XactEvent event, void *arg) +{ + if (shardPlacementConnectionList == NIL) + { + /* nothing to do */ + return; + } + else if (event == XACT_EVENT_PRE_COMMIT) + { + /* + * Any failure here will cause local changes to be rolled back, + * and remote changes to either roll back (1PC) or, in case of + * connection or node failure, leave a prepared transaction + * (2PC). + */ + + if (MultiShardCommitProtocol == COMMIT_PROTOCOL_2PC) + { + PrepareRemoteTransactions(shardPlacementConnectionList); + } + + return; + } + else if (event == XACT_EVENT_COMMIT) + { + /* + * A failure here will cause some remote changes to either + * roll back (1PC) or, in case of connection or node failure, + * leave a prepared transaction (2PC). However, the local + * changes have already been committed. + */ + + CommitRemoteTransactions(shardPlacementConnectionList, false); + } + else if (event == XACT_EVENT_ABORT) + { + /* + * A failure here will cause some remote changes to either + * roll back (1PC) or, in case of connection or node failure, + * leave a prepared transaction (2PC). The local changes have + * already been rolled back. + */ + + AbortRemoteTransactions(shardPlacementConnectionList); + } + else + { + return; + } + + CloseConnections(shardPlacementConnectionList); + shardPlacementConnectionList = NIL; +} + + /* * PrepareRemoteTransactions prepares all transactions on connections in * connectionList for commit if the 2PC commit protocol is enabled. @@ -82,6 +141,9 @@ PrepareRemoteTransactions(List *connectionList) errmsg("failed to prepare transaction"))); } + ereport(DEBUG2, (errmsg("sent PREPARE TRANSACTION over connection %ld", + connectionId))); + PQclear(result); transactionConnection->transactionState = TRANSACTION_STATE_PREPARED; @@ -126,6 +188,8 @@ AbortRemoteTransactions(List *connectionList) command->data, nodeName, nodePort))); } + ereport(DEBUG2, (errmsg("sent ROLLBACK over connection %ld", connectionId))); + PQclear(result); } else if (transactionConnection->transactionState == TRANSACTION_STATE_OPEN) @@ -197,6 +261,9 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure) command->data, nodeName, nodePort))); } } + + ereport(DEBUG2, (errmsg("sent COMMIT PREPARED over connection %ld", + connectionId))); } else { @@ -224,6 +291,8 @@ CommitRemoteTransactions(List *connectionList, bool stopOnFailure) nodeName, nodePort))); } } + + ereport(DEBUG2, (errmsg("sent COMMIT over connection %ld", connectionId))); } PQclear(result); @@ -253,97 +322,3 @@ BuildTransactionName(int connectionId) return commandString; } - - -/* - * CloseConnections closes all connections in connectionList. - */ -void -CloseConnections(List *connectionList) -{ - ListCell *connectionCell = NULL; - - foreach(connectionCell, connectionList) - { - TransactionConnection *transactionConnection = - (TransactionConnection *) lfirst(connectionCell); - PGconn *connection = transactionConnection->connection; - - PQfinish(connection); - } -} - - -/* - * CreateShardConnectionHash constructs a hash table used for shardId->Connection - * mapping. - */ -HTAB * -CreateShardConnectionHash(void) -{ - HTAB *shardConnectionsHash = NULL; - int hashFlags = 0; - HASHCTL info; - - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(int64); - info.entrysize = sizeof(ShardConnections); - info.hash = tag_hash; - - hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; - shardConnectionsHash = hash_create("Shard Connections Hash", - INITIAL_CONNECTION_CACHE_SIZE, &info, - hashFlags); - - return shardConnectionsHash; -} - - -/* - * GetShardConnections finds existing connections for a shard in the hash. - * If not found, then a ShardConnections structure with empty connectionList - * is returned. - */ -ShardConnections * -GetShardConnections(HTAB *shardConnectionHash, int64 shardId, - bool *shardConnectionsFound) -{ - ShardConnections *shardConnections = NULL; - - shardConnections = (ShardConnections *) hash_search(shardConnectionHash, - &shardId, - HASH_ENTER, - shardConnectionsFound); - if (!*shardConnectionsFound) - { - shardConnections->shardId = shardId; - shardConnections->connectionList = NIL; - } - - return shardConnections; -} - - -/* - * ConnectionList flattens the connection hash to a list of placement connections. - */ -List * -ConnectionList(HTAB *connectionHash) -{ - List *connectionList = NIL; - HASH_SEQ_STATUS status; - ShardConnections *shardConnections = NULL; - - hash_seq_init(&status, connectionHash); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - while (shardConnections != NULL) - { - List *shardConnectionsList = list_copy(shardConnections->connectionList); - connectionList = list_concat(connectionList, shardConnectionsList); - - shardConnections = (ShardConnections *) hash_seq_search(&status); - } - - return connectionList; -} diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c new file mode 100644 index 000000000..b11019be0 --- /dev/null +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -0,0 +1,243 @@ +/*------------------------------------------------------------------------- + * + * multi_shard_transaction.c + * This file contains functions for managing 1PC or 2PC transactions + * across many shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + + +#include "libpq-fe.h" +#include "postgres.h" + +#include "distributed/commit_protocol.h" +#include "distributed/connection_cache.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/multi_shard_transaction.h" +#include "nodes/pg_list.h" + + +#define INITIAL_CONNECTION_CACHE_SIZE 1001 + + +List *shardPlacementConnectionList = NIL; + +static void RegisterShardPlacementXactCallback(void); + +static bool isXactCallbackRegistered = false; + + +/* + * OpenTransactionsToAllShardPlacements opens connections to all placements of + * the given shard Id Pointer List and returns the hash table containing the connections. + * The resulting hash table maps shardIds to ShardConnection structs. + */ +HTAB * +OpenTransactionsToAllShardPlacements(List *shardIntervalList, char *userName) +{ + HTAB *shardConnectionHash = CreateShardConnectionHash(); + ListCell *shardIntervalCell = NULL; + ListCell *connectionCell = NULL; + List *connectionList = NIL; + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + uint64 shardId = shardInterval->shardId; + + OpenConnectionsToShardPlacements(shardId, shardConnectionHash, userName); + } + + connectionList = ConnectionList(shardConnectionHash); + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + PGresult *result = NULL; + + result = PQexec(connection, "BEGIN"); + if (PQresultStatus(result) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(connection, result); + } + } + + shardPlacementConnectionList = ConnectionList(shardConnectionHash); + + RegisterShardPlacementXactCallback(); + + return shardConnectionHash; +} + + +/* + * CreateShardConnectionHash constructs a hash table used for shardId->Connection + * mapping. + */ +HTAB * +CreateShardConnectionHash(void) +{ + HTAB *shardConnectionsHash = NULL; + int hashFlags = 0; + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(int64); + info.entrysize = sizeof(ShardConnections); + info.hash = tag_hash; + + hashFlags = HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT; + shardConnectionsHash = hash_create("Shard Connections Hash", + INITIAL_CONNECTION_CACHE_SIZE, &info, + hashFlags); + + return shardConnectionsHash; +} + + +/* + * OpenConnectionsToShardPlacements opens connections to all placements of the + * shard with the given shardId and populates the shardConnectionHash table + * accordingly. + */ +void +OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, + char *userName) +{ + bool shardConnectionsFound = false; + + /* get existing connections to the shard placements, if any */ + ShardConnections *shardConnections = GetShardConnections(shardConnectionHash, + shardId, + &shardConnectionsFound); + + List *shardPlacementList = FinalizedShardPlacementList(shardId); + ListCell *shardPlacementCell = NULL; + List *connectionList = NIL; + + Assert(!shardConnectionsFound); + + if (shardPlacementList == NIL) + { + ereport(ERROR, (errmsg("could not find any shard placements for the shard " + UINT64_FORMAT, shardId))); + } + + foreach(shardPlacementCell, shardPlacementList) + { + ShardPlacement *shardPlacement = (ShardPlacement *) lfirst( + shardPlacementCell); + char *workerName = shardPlacement->nodeName; + uint32 workerPort = shardPlacement->nodePort; + PGconn *connection = ConnectToNode(workerName, workerPort, userName); + TransactionConnection *transactionConnection = NULL; + + if (connection == NULL) + { + List *abortConnectionList = ConnectionList(shardConnectionHash); + CloseConnections(abortConnectionList); + + ereport(ERROR, (errmsg("could not establish a connection to all " + "placements of shard %lu", shardId))); + } + + transactionConnection = palloc0(sizeof(TransactionConnection)); + + transactionConnection->connectionId = shardConnections->shardId; + transactionConnection->transactionState = TRANSACTION_STATE_INVALID; + transactionConnection->connection = connection; + + connectionList = lappend(connectionList, transactionConnection); + } + + shardConnections->connectionList = connectionList; +} + + +/* + * GetShardConnections finds existing connections for a shard in the hash. + * If not found, then a ShardConnections structure with empty connectionList + * is returned. + */ +ShardConnections * +GetShardConnections(HTAB *shardConnectionHash, int64 shardId, + bool *shardConnectionsFound) +{ + ShardConnections *shardConnections = NULL; + + shardConnections = (ShardConnections *) hash_search(shardConnectionHash, + &shardId, + HASH_ENTER, + shardConnectionsFound); + if (!*shardConnectionsFound) + { + shardConnections->shardId = shardId; + shardConnections->connectionList = NIL; + } + + return shardConnections; +} + + +/* + * ConnectionList flattens the connection hash to a list of placement connections. + */ +List * +ConnectionList(HTAB *connectionHash) +{ + List *connectionList = NIL; + HASH_SEQ_STATUS status; + ShardConnections *shardConnections = NULL; + + hash_seq_init(&status, connectionHash); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + while (shardConnections != NULL) + { + List *shardConnectionsList = list_copy(shardConnections->connectionList); + connectionList = list_concat(connectionList, shardConnectionsList); + + shardConnections = (ShardConnections *) hash_seq_search(&status); + } + + return connectionList; +} + + +/* + * EnableXactCallback ensures the XactCallback for committing/aborting + * remote worker transactions is registered. + */ +void +RegisterShardPlacementXactCallback(void) +{ + if (!isXactCallbackRegistered) + { + RegisterXactCallback(CompleteShardPlacementTransactions, NULL); + isXactCallbackRegistered = true; + } +} + + +/* + * CloseConnections closes all connections in connectionList. + */ +void +CloseConnections(List *connectionList) +{ + ListCell *connectionCell = NULL; + + foreach(connectionCell, connectionList) + { + TransactionConnection *transactionConnection = + (TransactionConnection *) lfirst(connectionCell); + PGconn *connection = transactionConnection->connection; + + PQfinish(connection); + } +} diff --git a/src/include/distributed/multi_transaction.h b/src/include/distributed/commit_protocol.h similarity index 71% rename from src/include/distributed/multi_transaction.h rename to src/include/distributed/commit_protocol.h index 7a50eb274..346de8ca9 100644 --- a/src/include/distributed/multi_transaction.h +++ b/src/include/distributed/commit_protocol.h @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * multi_transaction.h + * commit_protocol.h * Type and function declarations used in performing transactions across * shard placements. * @@ -9,10 +9,11 @@ *------------------------------------------------------------------------- */ -#ifndef MULTI_TRANSACTION_H -#define MULTI_TRANSACTION_H +#ifndef COMMIT_PROTOCOL_H +#define COMMIT_PROTOCOL_H +#include "access/xact.h" #include "libpq-fe.h" #include "lib/stringinfo.h" #include "nodes/pg_list.h" @@ -47,29 +48,15 @@ typedef struct TransactionConnection } TransactionConnection; -/* ShardConnections represents a set of connections for each placement of a shard */ -typedef struct ShardConnections -{ - int64 shardId; - List *connectionList; -} ShardConnections; - - /* config variable managed via guc.c */ extern int MultiShardCommitProtocol; /* Functions declarations for transaction and connection management */ extern void InitializeDistributedTransaction(void); +extern void CompleteShardPlacementTransactions(XactEvent event, void *arg); extern void PrepareRemoteTransactions(List *connectionList); extern void AbortRemoteTransactions(List *connectionList); extern void CommitRemoteTransactions(List *connectionList, bool stopOnFailure); -extern void CloseConnections(List *connectionList); -extern HTAB * CreateShardConnectionHash(void); -extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, - int64 shardId, - bool *shardConnectionsFound); -extern List * ConnectionList(HTAB *connectionHash); - -#endif /* MULTI_TRANSACTION_H */ +#endif /* COMMIT_PROTOCOL_H */ diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 3cc44092b..d27926bbf 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -15,11 +15,6 @@ #include "nodes/parsenodes.h" - -/* config variable managed via guc.c */ -extern int MultiShardCommitProtocol; - - /* * A smaller version of copy.c's CopyStateData, trimmed to the elements * necessary to copy out results. While it'd be a bit nicer to share code, diff --git a/src/include/distributed/multi_shard_transaction.h b/src/include/distributed/multi_shard_transaction.h new file mode 100644 index 000000000..2f7c30fd9 --- /dev/null +++ b/src/include/distributed/multi_shard_transaction.h @@ -0,0 +1,43 @@ +/*------------------------------------------------------------------------- + * + * multi_shard_transaction.h + * Type and function declarations used in performing transactions across + * shard placements. + * + * Copyright (c) 2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef MULTI_SHARD_TRANSACTION_H +#define MULTI_SHARD_TRANSACTION_H + + +#include "access/xact.h" +#include "utils/hsearch.h" +#include "nodes/pg_list.h" + + +/* ShardConnections represents a set of connections for each placement of a shard */ +typedef struct ShardConnections +{ + int64 shardId; + List *connectionList; +} ShardConnections; + + +extern List *shardPlacementConnectionList; + + +extern HTAB * OpenTransactionsToAllShardPlacements(List *shardIdList, + char *relationOwner); +extern HTAB * CreateShardConnectionHash(void); +extern void OpenConnectionsToShardPlacements(uint64 shardId, HTAB *shardConnectionHash, + char *nodeUser); +extern ShardConnections * GetShardConnections(HTAB *shardConnectionHash, + int64 shardId, + bool *shardConnectionsFound); +extern List * ConnectionList(HTAB *connectionHash); +extern void CloseConnections(List *connectionList); + +#endif /* MULTI_SHARD_TRANSACTION_H */ diff --git a/src/test/regress/expected/multi_join_order_additional.out b/src/test/regress/expected/multi_join_order_additional.out index 20698676a..aaf2165e6 100644 --- a/src/test/regress/expected/multi_join_order_additional.out +++ b/src/test/regress/expected/multi_join_order_additional.out @@ -42,9 +42,14 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1); (1 row) CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate); +DEBUG: switching to 2PC for the transaction DEBUG: applied command on shard 650000 on node localhost:57637 DEBUG: applied command on shard 650001 on node localhost:57638 DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash" +DEBUG: sent PREPARE TRANSACTION over connection 650000 +DEBUG: sent PREPARE TRANSACTION over connection 650001 +DEBUG: sent COMMIT PREPARED over connection 650000 +DEBUG: sent COMMIT PREPARED over connection 650001 CREATE TABLE orders_hash ( o_orderkey bigint not null, o_custkey integer not null, diff --git a/src/test/regress/expected/multi_shard_modify.out b/src/test/regress/expected/multi_shard_modify.out index 1394563d3..fc4866d04 100644 --- a/src/test/regress/expected/multi_shard_modify.out +++ b/src/test/regress/expected/multi_shard_modify.out @@ -103,6 +103,10 @@ SELECT master_modify_multiple_shards('DELETE FROM multi_shard_modify_test WHERE DEBUG: predicate pruning for shardId 350001 DEBUG: predicate pruning for shardId 350002 DEBUG: predicate pruning for shardId 350003 +DEBUG: sent PREPARE TRANSACTION over connection 350000 +DEBUG: sent PREPARE TRANSACTION over connection 350000 +DEBUG: sent COMMIT PREPARED over connection 350000 +DEBUG: sent COMMIT PREPARED over connection 350000 master_modify_multiple_shards ------------------------------- 1 diff --git a/src/test/regress/input/multi_alter_table_statements.source b/src/test/regress/input/multi_alter_table_statements.source index 3df73f6db..4584c31fd 100644 --- a/src/test/regress/input/multi_alter_table_statements.source +++ b/src/test/regress/input/multi_alter_table_statements.source @@ -156,6 +156,42 @@ ALTER TABLE IF EXISTS lineitem_alter RENAME l_orderkey TO l_orderkey_renamed; -- node \d lineitem_alter +-- verify that non-propagated ddl commands are allowed inside a transaction block +SET citus.enable_ddl_propagation to false; +BEGIN; +CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; +DROP INDEX temp_index_1; + +-- verify that distributed ddl commands are not allowed inside a transaction block +SET citus.enable_ddl_propagation to true; +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; +DROP INDEX temp_index_2; + +-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc' +SET citus.multi_shard_commit_protocol TO '1pc'; +SET client_min_messages TO DEBUG2; +CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey); +RESET client_min_messages; +DROP INDEX temp_index_3; + +-- verify that citus.multi_shard_commit_protocol value is not changed +SHOW citus.multi_shard_commit_protocol; + +-- verify that not any of shard placements are marked as failed when a query failure occurs +CREATE TABLE test_ab (a int, b int); +SELECT master_create_distributed_table('test_ab', 'a', 'hash'); +SELECT master_create_worker_shards('test_ab', 8, 2); +INSERT INTO test_ab VALUES (2, 10); +INSERT INTO test_ab VALUES (2, 11); +CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); +SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE logicalrelid='test_ab'::regclass AND shardstate=3; + -- Check that the schema on the worker still looks reasonable \c - - - :worker_1_port SELECT attname, atttypid::regtype diff --git a/src/test/regress/output/multi_alter_table_statements.source b/src/test/regress/output/multi_alter_table_statements.source index 2688ce896..5af6ff985 100644 --- a/src/test/regress/output/multi_alter_table_statements.source +++ b/src/test/regress/output/multi_alter_table_statements.source @@ -412,6 +412,127 @@ ERROR: renaming distributed tables or their objects is currently unsupported l_comment | character varying(44) | not null null_column | integer | +-- verify that non-propagated ddl commands are allowed inside a transaction block +SET citus.enable_ddl_propagation to false; +BEGIN; +CREATE INDEX temp_index_1 ON lineitem_alter(l_linenumber); +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +--------------+---------------- + temp_index_1 | lineitem_alter +(1 row) + +DROP INDEX temp_index_1; +-- verify that distributed ddl commands are not allowed inside a transaction block +SET citus.enable_ddl_propagation to true; +BEGIN; +CREATE INDEX temp_index_2 ON lineitem_alter(l_orderkey); +ERROR: distributed DDL commands cannot run inside a transaction block +COMMIT; +SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter'; + indexname | tablename +-----------+----------- +(0 rows) + +DROP INDEX temp_index_2; +ERROR: index "temp_index_2" does not exist +-- verify that distributed ddl commands use 2PC, even the value of citus.multi_shard_commit_protocol is '1pc' +SET citus.multi_shard_commit_protocol TO '1pc'; +SET client_min_messages TO DEBUG2; +CREATE INDEX temp_index_3 ON lineitem_alter(l_orderkey); +DEBUG: switching to 2PC for the transaction +DEBUG: applied command on shard 220000 on node localhost:57638 +DEBUG: applied command on shard 220000 on node localhost:57637 +DEBUG: applied command on shard 220003 on node localhost:57637 +DEBUG: applied command on shard 220003 on node localhost:57638 +DEBUG: applied command on shard 220007 on node localhost:57637 +DEBUG: applied command on shard 220007 on node localhost:57638 +DEBUG: applied command on shard 220004 on node localhost:57638 +DEBUG: applied command on shard 220004 on node localhost:57637 +DEBUG: applied command on shard 220008 on node localhost:57638 +DEBUG: applied command on shard 220008 on node localhost:57637 +DEBUG: applied command on shard 220001 on node localhost:57637 +DEBUG: applied command on shard 220001 on node localhost:57638 +DEBUG: applied command on shard 220002 on node localhost:57638 +DEBUG: applied command on shard 220002 on node localhost:57637 +DEBUG: applied command on shard 220005 on node localhost:57637 +DEBUG: applied command on shard 220005 on node localhost:57638 +DEBUG: applied command on shard 220009 on node localhost:57637 +DEBUG: applied command on shard 220009 on node localhost:57638 +DEBUG: building index "temp_index_3" on table "lineitem_alter" +DEBUG: sent PREPARE TRANSACTION over connection 220007 +DEBUG: sent PREPARE TRANSACTION over connection 220007 +DEBUG: sent PREPARE TRANSACTION over connection 220002 +DEBUG: sent PREPARE TRANSACTION over connection 220002 +DEBUG: sent PREPARE TRANSACTION over connection 220004 +DEBUG: sent PREPARE TRANSACTION over connection 220004 +DEBUG: sent PREPARE TRANSACTION over connection 220000 +DEBUG: sent PREPARE TRANSACTION over connection 220000 +DEBUG: sent PREPARE TRANSACTION over connection 220008 +DEBUG: sent PREPARE TRANSACTION over connection 220008 +DEBUG: sent PREPARE TRANSACTION over connection 220009 +DEBUG: sent PREPARE TRANSACTION over connection 220009 +DEBUG: sent PREPARE TRANSACTION over connection 220001 +DEBUG: sent PREPARE TRANSACTION over connection 220001 +DEBUG: sent PREPARE TRANSACTION over connection 220005 +DEBUG: sent PREPARE TRANSACTION over connection 220005 +DEBUG: sent PREPARE TRANSACTION over connection 220003 +DEBUG: sent PREPARE TRANSACTION over connection 220003 +DEBUG: sent COMMIT PREPARED over connection 220007 +DEBUG: sent COMMIT PREPARED over connection 220007 +DEBUG: sent COMMIT PREPARED over connection 220002 +DEBUG: sent COMMIT PREPARED over connection 220002 +DEBUG: sent COMMIT PREPARED over connection 220004 +DEBUG: sent COMMIT PREPARED over connection 220004 +DEBUG: sent COMMIT PREPARED over connection 220000 +DEBUG: sent COMMIT PREPARED over connection 220000 +DEBUG: sent COMMIT PREPARED over connection 220008 +DEBUG: sent COMMIT PREPARED over connection 220008 +DEBUG: sent COMMIT PREPARED over connection 220009 +DEBUG: sent COMMIT PREPARED over connection 220009 +DEBUG: sent COMMIT PREPARED over connection 220001 +DEBUG: sent COMMIT PREPARED over connection 220001 +DEBUG: sent COMMIT PREPARED over connection 220005 +DEBUG: sent COMMIT PREPARED over connection 220005 +DEBUG: sent COMMIT PREPARED over connection 220003 +DEBUG: sent COMMIT PREPARED over connection 220003 +RESET client_min_messages; +DROP INDEX temp_index_3; +-- verify that citus.multi_shard_commit_protocol value is not changed +SHOW citus.multi_shard_commit_protocol; + citus.multi_shard_commit_protocol +----------------------------------- + 1pc +(1 row) + +-- verify that not any of shard placements are marked as failed when a query failure occurs +CREATE TABLE test_ab (a int, b int); +SELECT master_create_distributed_table('test_ab', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('test_ab', 8, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +INSERT INTO test_ab VALUES (2, 10); +INSERT INTO test_ab VALUES (2, 11); +CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a); +WARNING: could not create unique index "temp_unique_index_1_220016" +DETAIL: Key (a)=(2) is duplicated. +CONTEXT: while executing command on localhost:57638 +ERROR: could not execute DDL command on worker node shards +SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard +WHERE logicalrelid='test_ab'::regclass AND shardstate=3; + shardid +--------- +(0 rows) + -- Check that the schema on the worker still looks reasonable \c - - - :worker_1_port SELECT attname, atttypid::regtype